Coverage for mcpgateway / middleware / rbac.py: 100%
351 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/middleware/rbac.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7RBAC Permission Checking Middleware.
9This module provides middleware for FastAPI to enforce role-based access control
10on API endpoints. It includes permission decorators and dependency injection
11functions for protecting routes.
12"""
14# Standard
15import functools
16from functools import wraps
17import logging
18from typing import Callable, Generator, List, Optional
19import uuid
20import warnings
22# Third-Party
23from fastapi import Cookie, Depends, HTTPException, Request, status
24from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
25from sqlalchemy.orm import Session
27# First-Party
28from mcpgateway.auth import get_current_user
29from mcpgateway.config import settings
30from mcpgateway.db import fresh_db_session, SessionLocal
31from mcpgateway.services.permission_service import PermissionService
32from mcpgateway.utils.trace_context import (
33 clear_trace_context,
34 set_trace_auth_method,
35 set_trace_context_from_teams,
36 set_trace_team_scope,
37 set_trace_user_email,
38 set_trace_user_is_admin,
39)
40from mcpgateway.utils.verify_credentials import is_proxy_auth_trust_active
42logger = logging.getLogger(__name__)
44# Generic 403 message — intentionally vague to avoid leaking permission names to callers
45_ACCESS_DENIED_MSG = "Access denied"
47# HTTP Bearer security scheme for token extraction
48security = HTTPBearer(auto_error=False)
51def get_db(request: Request = None) -> Generator[Session, None, None]:
52 """Get database session for dependency injection.
54 DEPRECATED: This function is deprecated and will be removed in a future version.
55 New code should use the request-scoped session from request.state.db or
56 get_db() from main.py.
58 For backwards compatibility, this function now reuses the middleware session
59 when available, eliminating duplicate session creation (Issue #3622).
61 **Migration Path**:
62 - Route handlers: Use `db: Session = Depends(get_db)` from main.py
63 - RBAC checks: Access request.state.db directly in middleware context
65 Args:
66 request: Optional FastAPI request object (automatically injected by FastAPI
67 dependency system when used with Depends())
69 Yields:
70 Session: SQLAlchemy database session
72 Raises:
73 Exception: Re-raises any exception after rolling back the transaction.
75 Note:
76 When used as a FastAPI dependency via Depends(get_db), the request parameter
77 is automatically provided by FastAPI's dependency injection system.
79 Examples:
80 >>> gen = get_db()
81 >>> db = next(gen)
82 >>> hasattr(db, 'query')
83 True
84 """
85 warnings.warn(
86 "rbac.get_db() is deprecated. Use request.state.db or get_db() from main.py",
87 DeprecationWarning,
88 stacklevel=2,
89 )
91 # Check if middleware already created a request-scoped session
92 # This matches the pattern from main.py:get_db() (line 3089)
93 db = None
94 owned = False
96 if request is not None:
97 db = getattr(request.state, "db", None)
98 if db is not None:
99 logger.debug(f"[RBAC] Reusing session from middleware: {id(db)}")
101 # Fallback: create own session (legacy behavior)
102 if db is None:
103 logger.debug("[RBAC] Creating new session (no middleware session available)")
104 db = SessionLocal()
105 owned = True
107 try:
108 yield db
109 # Only commit if we own the session (backwards compatibility)
110 if owned:
111 db.commit()
112 except Exception:
113 try:
114 if owned:
115 db.rollback()
116 except Exception:
117 try:
118 db.invalidate()
119 except Exception:
120 pass # nosec B110 - Best effort cleanup on connection failure
121 raise
122 finally:
123 if owned:
124 db.close()
127async def get_permission_service(db: Session = Depends(get_db)) -> PermissionService:
128 """Get permission service instance for dependency injection.
130 DEPRECATED: Use PermissionService(db) directly with fresh_db_session() context manager instead.
131 This function is kept for backwards compatibility with endpoints that still use dependency injection.
133 Args:
134 db: Database session
136 Returns:
137 PermissionService: Permission checking service instance
139 Examples:
140 >>> import asyncio
141 >>> asyncio.iscoroutinefunction(get_permission_service)
142 True
143 """
144 return PermissionService(db)
147async def get_current_user_with_permissions(request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), jwt_token: Optional[str] = Cookie(default=None)):
148 """Extract current user from JWT token and prepare for permission checking.
150 Uses fresh_db_session() context manager to avoid session accumulation under high load.
151 Database sessions are created only when needed and closed immediately after use.
153 Args:
154 request: FastAPI request object for IP/user-agent extraction
155 credentials: HTTP Bearer credentials
156 jwt_token: JWT token from cookie
158 Returns:
159 dict: User information with permission checking context
161 Raises:
162 HTTPException: If authentication fails
164 Examples:
165 Use as FastAPI dependency::
167 @app.get("/protected-endpoint")
168 async def protected_route(user = Depends(get_current_user_with_permissions)):
169 return {"user": user["email"]}
170 """
172 def _set_trace_context_for_identity(*, email: Optional[str], is_admin: bool, auth_method: str, token_teams: Optional[List[str]] = None, team_scope_known: bool = False) -> None:
173 clear_trace_context()
174 set_trace_user_email(email)
175 set_trace_user_is_admin(is_admin)
176 set_trace_auth_method(auth_method)
177 trace_team_name = getattr(request.state, "trace_team_name", None)
178 if team_scope_known:
179 set_trace_context_from_teams(token_teams, user_email=email, is_admin=is_admin, auth_method=auth_method, team_name=trace_team_name)
180 elif is_admin:
181 set_trace_team_scope("admin")
183 # Check for proxy authentication first (if MCP client auth is disabled)
184 if not settings.mcp_client_auth_enabled:
185 # Read plugin context from request.state for cross-hook context sharing
186 # (set by HttpAuthMiddleware for passing contexts between different hook types)
187 plugin_context_table = getattr(request.state, "plugin_context_table", None)
188 plugin_global_context = getattr(request.state, "plugin_global_context", None)
190 if is_proxy_auth_trust_active(settings):
191 # Extract user from proxy header
192 proxy_user = request.headers.get(settings.proxy_user_header)
193 if proxy_user:
194 # Lookup user in DB to get is_admin status, or check platform_admin_email
195 is_admin = False
196 full_name = proxy_user
197 if proxy_user == settings.platform_admin_email:
198 is_admin = True
199 full_name = "Platform Admin"
200 else:
201 # Try to lookup user in EmailUser table for is_admin status
202 try:
203 # Third-Party
204 from sqlalchemy import select # pylint: disable=import-outside-toplevel
206 # First-Party
207 from mcpgateway.db import EmailUser # pylint: disable=import-outside-toplevel
209 # Use fresh_db_session for short-lived database access
210 with fresh_db_session() as db:
211 user = db.execute(select(EmailUser).where(EmailUser.email == proxy_user)).scalar_one_or_none()
212 if user:
213 is_admin = user.is_admin
214 full_name = user.full_name or proxy_user
215 except Exception as e:
216 logger.debug(f"Could not lookup proxy user in DB: {e}")
217 # Continue with is_admin=False if lookup fails
219 _set_trace_context_for_identity(email=proxy_user, is_admin=is_admin, auth_method="proxy")
220 return {
221 "email": proxy_user,
222 "full_name": full_name,
223 "is_admin": is_admin,
224 "ip_address": request.client.host if request.client else None,
225 "user_agent": request.headers.get("user-agent"),
226 "db": None, # Session closed; use endpoint's db param instead
227 "auth_method": "proxy",
228 "request_id": getattr(request.state, "request_id", None),
229 "team_id": getattr(request.state, "team_id", None),
230 "plugin_context_table": plugin_context_table,
231 "plugin_global_context": plugin_global_context,
232 }
234 # No proxy header - check auth_required to align with WebSocket behavior
235 # For browser requests, redirect to login; for API requests, return 401
236 if settings.auth_required:
237 accept_header = request.headers.get("accept", "")
238 is_htmx = request.headers.get("hx-request") == "true"
239 if "text/html" in accept_header or is_htmx:
240 raise HTTPException(
241 status_code=status.HTTP_302_FOUND,
242 detail="Authentication required",
243 headers={"Location": f"{settings.app_root_path}/admin/login"},
244 )
245 raise HTTPException(
246 status_code=status.HTTP_401_UNAUTHORIZED,
247 detail="Proxy authentication header required",
248 )
250 # auth_required=false: allow anonymous access
252 _set_trace_context_for_identity(email="anonymous", is_admin=False, auth_method="anonymous", token_teams=[], team_scope_known=True)
253 return {
254 "email": "anonymous",
255 "full_name": "Anonymous User",
256 "is_admin": False,
257 "ip_address": request.client.host if request.client else None,
258 "user_agent": request.headers.get("user-agent"),
259 "db": None, # Session closed; use endpoint's db param instead
260 "auth_method": "anonymous",
261 "request_id": getattr(request.state, "request_id", None),
262 "team_id": getattr(request.state, "team_id", None),
263 "plugin_context_table": plugin_context_table,
264 "plugin_global_context": plugin_global_context,
265 }
267 # Warning: MCP auth disabled without proxy trust - security risk!
268 # This case is already warned about in config validation
269 # Still check auth_required for consistency
270 if settings.auth_required:
271 accept_header = request.headers.get("accept", "")
272 is_htmx = request.headers.get("hx-request") == "true"
273 if "text/html" in accept_header or is_htmx:
274 raise HTTPException(
275 status_code=status.HTTP_302_FOUND,
276 detail="Authentication required",
277 headers={"Location": f"{settings.app_root_path}/admin/login"},
278 )
279 raise HTTPException(
280 status_code=status.HTTP_401_UNAUTHORIZED,
281 detail="Authentication required but no auth method configured",
282 )
284 _set_trace_context_for_identity(email="anonymous", is_admin=False, auth_method="anonymous", token_teams=[], team_scope_known=True)
285 return {
286 "email": "anonymous",
287 "full_name": "Anonymous User",
288 "is_admin": False,
289 "ip_address": request.client.host if request.client else None,
290 "user_agent": request.headers.get("user-agent"),
291 "db": None, # Session closed; use endpoint's db param instead
292 "auth_method": "anonymous",
293 "request_id": getattr(request.state, "request_id", None),
294 "team_id": getattr(request.state, "team_id", None),
295 "plugin_context_table": plugin_context_table,
296 "plugin_global_context": plugin_global_context,
297 }
299 # Standard JWT authentication flow
300 # Try multiple sources for the token, prioritizing Authorization header for API requests
301 token = None
302 token_from_cookie = False
304 # 1. First try Authorization header (preferred for API requests)
305 if credentials and credentials.credentials:
306 token = credentials.credentials
308 # 2. Try manual cookie reading (for browser requests)
309 if not token and request.cookies:
310 # Try both jwt_token and access_token cookie names
311 manual_token = request.cookies.get("jwt_token") or request.cookies.get("access_token")
312 if manual_token:
313 token = manual_token
314 token_from_cookie = True
316 # 3. Finally try FastAPI Cookie dependency (fallback)
317 if not token and jwt_token:
318 token = jwt_token
319 token_from_cookie = True
321 # Check if this is a browser/admin-UI request (not an external API request)
322 accept_header = request.headers.get("accept", "")
323 is_htmx = request.headers.get("hx-request") == "true"
324 referer = request.headers.get("referer", "")
325 is_admin_ui_request = "/admin" in referer
326 is_browser_request = "text/html" in accept_header or is_htmx or is_admin_ui_request
328 # SECURITY: Reject cookie-only authentication for API requests
329 # Cookies should only be used for browser/HTML requests (including admin UI fetch calls)
330 if token_from_cookie and not is_browser_request:
331 raise HTTPException(
332 status_code=status.HTTP_401_UNAUTHORIZED,
333 detail="Cookie authentication not allowed for API requests. Use Authorization header.",
334 headers={"WWW-Authenticate": "Bearer"},
335 )
337 if not token:
338 # For browser requests (HTML Accept header or HTMX), redirect to login
339 if is_browser_request:
340 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{settings.app_root_path}/admin/login"})
342 # AUTH_REQUIRED=false no longer implies admin access.
343 # Preserve explicit unsafe override for local-only compatibility.
344 if not settings.auth_required and getattr(settings, "allow_unauthenticated_admin", False) is True:
345 _set_trace_context_for_identity(email=settings.platform_admin_email, is_admin=True, auth_method="disabled")
346 return {
347 "email": settings.platform_admin_email,
348 "full_name": "Platform Admin",
349 "is_admin": True,
350 "ip_address": request.client.host if request.client else None,
351 "user_agent": request.headers.get("user-agent"),
352 "db": None, # Session closed; use endpoint's db param instead
353 "auth_method": "disabled",
354 "request_id": getattr(request.state, "request_id", None),
355 "team_id": getattr(request.state, "team_id", None),
356 }
358 if not settings.auth_required:
359 _set_trace_context_for_identity(email="anonymous", is_admin=False, auth_method="anonymous", token_teams=[], team_scope_known=True)
360 return {
361 "email": "anonymous",
362 "full_name": "Anonymous User",
363 "is_admin": False,
364 "ip_address": request.client.host if request.client else None,
365 "user_agent": request.headers.get("user-agent"),
366 "db": None, # Session closed; use endpoint's db param instead
367 "auth_method": "anonymous",
368 "request_id": getattr(request.state, "request_id", None),
369 "team_id": getattr(request.state, "team_id", None),
370 "plugin_context_table": getattr(request.state, "plugin_context_table", None),
371 "plugin_global_context": getattr(request.state, "plugin_global_context", None),
372 }
374 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authorization token required")
376 try:
377 # Create credentials object if we got token from cookie
378 if not credentials:
379 credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
381 # Extract user from token using the email auth function
382 # Pass request to get_current_user so plugins can store auth_method in request.state
383 user = await get_current_user(credentials, request=request)
385 # Read auth_method and request_id from request.state
386 # (auth_method set by plugin in get_current_user, request_id set by HTTP middleware)
387 auth_method = getattr(request.state, "auth_method", None)
388 request_id = getattr(request.state, "request_id", None)
389 team_id = getattr(request.state, "team_id", None)
390 token_teams = getattr(request.state, "token_teams", None)
392 # Read plugin context data from request.state for cross-hook context sharing
393 # (set by HttpAuthMiddleware for passing contexts between different hook types)
394 plugin_context_table = getattr(request.state, "plugin_context_table", None)
395 plugin_global_context = getattr(request.state, "plugin_global_context", None)
397 # Get token_use from request.state (set by get_current_user)
398 token_use = getattr(request.state, "token_use", None)
400 # Add request context for permission auditing
401 return {
402 "email": user.email,
403 "full_name": user.full_name,
404 "is_admin": user.is_admin,
405 "ip_address": request.client.host if request.client else None,
406 "user_agent": request.headers.get("user-agent"),
407 "db": None, # Session closed; use endpoint's db param instead
408 "auth_method": auth_method, # Include auth_method from plugin
409 "request_id": request_id, # Include request_id from middleware
410 "team_id": team_id, # Include team_id from token
411 "token_teams": token_teams, # Include token teams for query-level scoping
412 "token_use": token_use, # Include token_use for RBAC team derivation
413 "plugin_context_table": plugin_context_table, # Plugin contexts for cross-hook sharing
414 "plugin_global_context": plugin_global_context, # Global context for consistency
415 }
416 except Exception as e:
417 logger.error(f"Authentication failed: {type(e).__name__}: {e}")
419 # For browser requests (HTML Accept header or HTMX), redirect to login
420 accept_header = request.headers.get("accept", "")
421 is_htmx = request.headers.get("hx-request") == "true"
422 if "text/html" in accept_header or is_htmx:
423 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{settings.app_root_path}/admin/login"})
425 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials")
428# --- Team derivation helpers for multi-team session tokens ---
431@functools.lru_cache(maxsize=1)
432def _get_resource_param_to_model():
433 """Lazy-initialize the resource param to model mapping.
435 Returns:
436 dict: Mapping of URL parameter names to SQLAlchemy model classes.
437 """
438 # First-Party
439 from mcpgateway.db import A2AAgent, Gateway, Prompt, Resource, Server, Tool # pylint: disable=import-outside-toplevel
441 return {
442 "tool_id": Tool,
443 "server_id": Server,
444 "resource_id": Resource,
445 "prompt_id": Prompt,
446 "gateway_id": Gateway,
447 "agent_id": A2AAgent,
448 }
451def _derive_team_from_resource(kwargs, db_session) -> Optional[str]:
452 """Look up resource's team_id from DB for RBAC context (Tier 1).
454 For endpoints that target a specific resource (get, update, delete, execute),
455 derive the team context from the resource's owner team.
457 Args:
458 kwargs: Endpoint function kwargs containing resource ID params
459 db_session: Active SQLAlchemy session
461 Returns:
462 team_id string if found, None otherwise
463 """
464 mapping = _get_resource_param_to_model()
465 for param_name, model_cls in mapping.items():
466 resource_id = kwargs.get(param_name)
467 if resource_id:
468 try:
469 resource = db_session.get(model_cls, resource_id)
470 if resource:
471 return getattr(resource, "team_id", None)
472 except Exception: # nosec B110 - DB lookup failure falls through to None
473 pass
474 return None # Resource not found; let endpoint handle 404
475 return None # No resource ID param
478async def _derive_team_from_payload(kwargs) -> Optional[str]:
479 """Extract team_id from create payload objects or form data (Tier 3).
481 For create endpoints, derive team context from the Pydantic payload or form data.
483 Args:
484 kwargs: Endpoint function kwargs
486 Returns:
487 team_id string if found, None otherwise
488 """
489 # Try Pydantic payload objects (API endpoints)
490 for param_name in ("gateway", "tool", "server", "resource", "prompt", "agent"):
491 payload_obj = kwargs.get(param_name)
492 if payload_obj and hasattr(payload_obj, "team_id"):
493 tid = getattr(payload_obj, "team_id", None)
494 if tid:
495 return tid
497 # Try request form data (admin UI endpoints)
498 # Note: use 'is not None' rather than truthiness check because some
499 # objects (e.g. Pydantic models) may be truthy yet lack .headers.
500 request = kwargs.get("request")
501 if request is not None and isinstance(request, Request):
502 content_type = request.headers.get("content-type", "")
503 if "form" in content_type:
504 try:
505 form = await request.form()
506 tid = form.get("team_id")
507 if tid:
508 return tid
509 except Exception: # nosec B110 - Form parse failure is non-fatal
510 pass
512 return None
515# Permissions that indicate create/mutate operations (not safe for "any-team" aggregation)
516_MUTATE_PERMISSION_ACTIONS = frozenset(
517 {
518 "create",
519 "update",
520 "delete",
521 "execute",
522 "invoke",
523 "toggle",
524 "set_state",
525 "revoke",
526 "manage_members",
527 "join",
528 "manage",
529 "share",
530 "invite",
531 "use",
532 }
533)
536def _is_mutate_permission(permission: str) -> bool:
537 """Check if a permission string represents a mutate operation.
539 Handles both dot-separated (tools.create) and colon-separated
540 (admin.sso_providers:create) permission formats.
542 Args:
543 permission: Permission string like 'tools.create' or 'admin.sso_providers:create'.
545 Returns:
546 bool: True if the permission's action component is a mutating operation.
547 """
548 # Handle colon separator: admin.sso_providers:create → action is "create"
549 if ":" in permission:
550 action = permission.rsplit(":", 1)[-1]
551 return action in _MUTATE_PERMISSION_ACTIONS
552 parts = permission.split(".")
553 return parts[-1] in _MUTATE_PERMISSION_ACTIONS if len(parts) >= 2 else False
556def require_permission(permission: str, resource_type: Optional[str] = None, allow_admin_bypass: bool = True):
557 """Decorator to require specific permission for accessing an endpoint.
559 Args:
560 permission: Required permission (e.g., 'tools.create')
561 resource_type: Optional resource type for resource-specific permissions
562 allow_admin_bypass: If True (default), admin users bypass all permission checks.
563 If False, even admins must have explicit permissions.
564 Use False for admin UI routes to enforce granular RBAC.
566 Returns:
567 Callable: Decorated function that enforces the permission requirement
569 Examples:
570 >>> decorator = require_permission("tools.create", "tools")
571 >>> callable(decorator)
572 True
574 Execute wrapped function when permission granted:
575 >>> import asyncio
576 >>> class DummyPS:
577 ... def __init__(self, db):
578 ... pass
579 ... async def check_permission(self, **kwargs):
580 ... return True
581 >>> @require_permission("tools.read")
582 ... async def demo(user=None):
583 ... return "ok"
584 >>> from unittest.mock import patch
585 >>> with patch('mcpgateway.middleware.rbac.PermissionService', DummyPS):
586 ... asyncio.run(demo(user={"email": "u", "db": object()}))
587 'ok'
588 """
590 def decorator(func: Callable) -> Callable:
591 """Decorator function that wraps the original function with permission checking.
593 Args:
594 func: The function to be decorated
596 Returns:
597 Callable: The wrapped function with permission checking
598 """
600 @wraps(func)
601 async def wrapper(*args, **kwargs):
602 """Async wrapper function that performs permission check before calling original function.
604 Args:
605 *args: Positional arguments passed to the wrapped function
606 **kwargs: Keyword arguments passed to the wrapped function
608 Returns:
609 Any: Result from the wrapped function if permission check passes
611 Raises:
612 HTTPException: If user authentication or permission check fails
613 """
614 # Extract user context from named kwargs only (security: avoid picking up request body dicts)
615 user_context = kwargs.get("user") or kwargs.get("_user") or kwargs.get("current_user") or kwargs.get("current_user_ctx")
616 if not user_context or not isinstance(user_context, dict) or "email" not in user_context:
617 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User authentication required")
619 # Extract team_id from path parameters if available
620 team_id = kwargs.get("team_id")
622 # If team_id is None or blank in kwargs then check
623 if not team_id:
624 # check if user_context has team_id
625 team_id = user_context.get("team_id", None)
627 # For multi-team session tokens (team_id is None), derive team from context
628 check_any_team = False
629 if not team_id and user_context.get("token_use") == "session":
630 db_session = kwargs.get("db") or user_context.get("db")
631 if db_session:
632 # Tier 1: Try to derive team from existing resource
633 team_id = _derive_team_from_resource(kwargs, db_session)
634 # Tier 3: Try to derive team from create payload / form
635 if team_id is None:
636 team_id = await _derive_team_from_payload(kwargs)
637 # If still no team_id, check permission across all of the user's teams.
638 # This separates authorization ("does this user have the permission?")
639 # from resource scoping ("which team owns this resource?"). Team
640 # assignment is enforced downstream by endpoint logic (e.g.
641 # verify_team_for_user, token team membership checks).
642 if not team_id:
643 check_any_team = True
645 # First, check if any plugins want to handle permission checking
646 # First-Party
647 from mcpgateway.plugins.framework import get_plugin_manager, GlobalContext, HttpAuthCheckPermissionPayload, HttpHookType # pylint: disable=import-outside-toplevel
649 plugin_manager = get_plugin_manager()
650 if plugin_manager and plugin_manager.has_hooks_for(HttpHookType.HTTP_AUTH_CHECK_PERMISSION):
651 # Get plugin contexts from user_context (stored in request.state by HttpAuthMiddleware)
652 # These enable cross-hook context sharing between HTTP_PRE_REQUEST and HTTP_AUTH_CHECK_PERMISSION
653 plugin_context_table = user_context.get("plugin_context_table")
654 plugin_global_context = user_context.get("plugin_global_context")
656 # Reuse existing global context from middleware if available for consistency
657 # Otherwise create a new one (fallback for cases where middleware didn't run)
658 if plugin_global_context:
659 global_context = plugin_global_context
660 else:
661 request_id = user_context.get("request_id") or uuid.uuid4().hex
662 global_context = GlobalContext(
663 request_id=request_id,
664 server_id=None,
665 tenant_id=None,
666 )
668 # Invoke permission check hook, passing plugin contexts from HTTP_PRE_REQUEST hook
669 result, _ = await plugin_manager.invoke_hook(
670 HttpHookType.HTTP_AUTH_CHECK_PERMISSION,
671 payload=HttpAuthCheckPermissionPayload(
672 user_email=user_context["email"],
673 permission=permission,
674 resource_type=resource_type,
675 team_id=team_id,
676 is_admin=user_context.get("is_admin", False),
677 auth_method=user_context.get("auth_method"),
678 client_host=user_context.get("ip_address"),
679 user_agent=user_context.get("user_agent"),
680 ),
681 global_context=global_context,
682 local_contexts=plugin_context_table, # Pass context table for cross-hook state
683 )
685 # If a plugin made a decision, respect it
686 if result and result.modified_payload and hasattr(result.modified_payload, "granted"):
687 decision_plugin = "unknown"
688 decision_reason = getattr(result.modified_payload, "reason", None)
689 result_metadata = result.metadata if isinstance(result.metadata, dict) else {}
690 if result_metadata.get("_decision_plugin"):
691 decision_plugin = str(result_metadata["_decision_plugin"])
692 for key in ("plugin_name", "plugin", "source_plugin", "handler"):
693 if decision_plugin != "unknown":
694 break
695 plugin_name = result_metadata.get(key)
696 if plugin_name:
697 decision_plugin = str(plugin_name)
699 logger.info(
700 "Plugin permission decision: plugin=%s user=%s permission=%s granted=%s reason=%s",
701 decision_plugin,
702 user_context["email"],
703 permission,
704 result.modified_payload.granted,
705 decision_reason,
706 )
708 if result.modified_payload.granted:
709 if settings.plugins_can_override_rbac:
710 logger.warning(
711 "Plugin RBAC grant override applied: plugin=%s user=%s permission=%s reason=%s",
712 decision_plugin,
713 user_context["email"],
714 permission,
715 decision_reason,
716 )
717 return await func(*args, **kwargs)
719 logger.info(
720 "Plugin RBAC grant decision ignored by default policy: plugin=%s user=%s permission=%s",
721 decision_plugin,
722 user_context["email"],
723 permission,
724 )
725 else:
726 logger.warning(
727 "Permission denied by plugin: plugin=%s user=%s permission=%s reason=%s",
728 decision_plugin,
729 user_context["email"],
730 permission,
731 decision_reason,
732 )
733 raise HTTPException(
734 status_code=status.HTTP_403_FORBIDDEN,
735 detail=_ACCESS_DENIED_MSG,
736 )
738 # No plugin handled it, fall through to standard RBAC check
739 # Get db session: prefer endpoint's db param, then user_context["db"], then create fresh
740 db_session = kwargs.get("db") or user_context.get("db")
741 if db_session:
742 # Use existing session from endpoint or user_context
743 permission_service = PermissionService(db_session)
744 granted = await permission_service.check_permission(
745 user_email=user_context["email"],
746 permission=permission,
747 resource_type=resource_type,
748 team_id=team_id,
749 token_teams=user_context.get("token_teams"),
750 ip_address=user_context.get("ip_address"),
751 user_agent=user_context.get("user_agent"),
752 allow_admin_bypass=allow_admin_bypass,
753 check_any_team=check_any_team,
754 )
755 else:
756 # Create fresh db session for permission check
757 with fresh_db_session() as db:
758 permission_service = PermissionService(db)
759 granted = await permission_service.check_permission(
760 user_email=user_context["email"],
761 permission=permission,
762 resource_type=resource_type,
763 team_id=team_id,
764 token_teams=user_context.get("token_teams"),
765 ip_address=user_context.get("ip_address"),
766 user_agent=user_context.get("user_agent"),
767 allow_admin_bypass=allow_admin_bypass,
768 check_any_team=check_any_team,
769 )
771 if not granted:
772 logger.warning(f"Permission denied: user={user_context['email']}, permission={permission}, resource_type={resource_type}")
773 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
775 # Permission granted, execute the original function
776 return await func(*args, **kwargs)
778 # Store permission metadata as function attributes for introspection
779 # This enables validation tools to extract permissions without fragile closure inspection
780 # Using setattr() to avoid pylint protected-access warnings
781 setattr(wrapper, "_required_permission", permission)
782 setattr(wrapper, "_resource_type", resource_type)
783 setattr(wrapper, "_allow_admin_bypass", allow_admin_bypass)
785 return wrapper
787 return decorator
790def require_admin_permission():
791 """Decorator to require admin permissions for accessing an endpoint.
793 Returns:
794 Callable: Decorated function that enforces admin permission requirement
796 Examples:
797 >>> decorator = require_admin_permission()
798 >>> callable(decorator)
799 True
801 Execute when admin permission granted:
802 >>> import asyncio
803 >>> class DummyPS:
804 ... def __init__(self, db):
805 ... pass
806 ... async def check_admin_permission(self, email, token_teams=None):
807 ... return True
808 >>> @require_admin_permission()
809 ... async def demo(user=None):
810 ... return "admin-ok"
811 >>> from unittest.mock import patch
812 >>> with patch('mcpgateway.middleware.rbac.PermissionService', DummyPS):
813 ... asyncio.run(demo(user={"email": "u", "db": object()}))
814 'admin-ok'
815 """
817 def decorator(func: Callable) -> Callable:
818 """Decorator function that wraps the original function with admin permission checking.
820 Args:
821 func: The function to be decorated
823 Returns:
824 Callable: The wrapped function with admin permission checking
825 """
827 @wraps(func)
828 async def wrapper(*args, **kwargs):
829 """Async wrapper function that performs admin permission check before calling original function.
831 Args:
832 *args: Positional arguments passed to the wrapped function
833 **kwargs: Keyword arguments passed to the wrapped function
835 Returns:
836 Any: Result from the wrapped function if admin permission check passes
838 Raises:
839 HTTPException: If user authentication or admin permission check fails
840 """
841 # Extract user context from named kwargs only (security: avoid picking up request body dicts)
842 user_context = kwargs.get("user") or kwargs.get("_user") or kwargs.get("current_user") or kwargs.get("current_user_ctx")
843 if not user_context or not isinstance(user_context, dict) or "email" not in user_context:
844 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User authentication required")
846 # Get db session: prefer endpoint's db param, then user_context["db"], then create fresh
847 db_session = kwargs.get("db") or user_context.get("db")
848 token_teams = user_context.get("token_teams") # Forward token scope
849 if db_session:
850 # Use existing session from endpoint or user_context
851 permission_service = PermissionService(db_session)
852 has_admin_permission = await permission_service.check_admin_permission(user_context["email"], token_teams=token_teams)
853 else:
854 # Create fresh db session for permission check
855 with fresh_db_session() as db:
856 permission_service = PermissionService(db)
857 has_admin_permission = await permission_service.check_admin_permission(user_context["email"], token_teams=token_teams)
859 if not has_admin_permission:
860 logger.warning(f"Admin permission denied: user={user_context['email']}")
861 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
863 # Admin permission granted, execute the original function
864 return await func(*args, **kwargs)
866 return wrapper
868 return decorator
871def require_any_permission(permissions: List[str], resource_type: Optional[str] = None, allow_admin_bypass: bool = True):
872 """Decorator to require any of the specified permissions for accessing an endpoint.
874 Args:
875 permissions: List of permissions, user needs at least one
876 resource_type: Optional resource type for resource-specific permissions
877 allow_admin_bypass: If True (default), admin users bypass all permission checks.
878 If False, even admins must have explicit permissions.
880 Returns:
881 Callable: Decorated function that enforces the permission requirements
883 Examples:
884 >>> decorator = require_any_permission(["tools.read", "tools.execute"], "tools")
885 >>> callable(decorator)
886 True
888 Execute when any permission granted:
889 >>> import asyncio
890 >>> class DummyPS:
891 ... def __init__(self, db):
892 ... pass
893 ... async def check_permission(self, **kwargs):
894 ... return True
895 >>> @require_any_permission(["tools.read", "tools.execute"], "tools")
896 ... async def demo(user=None):
897 ... return "any-ok"
898 >>> from unittest.mock import patch
899 >>> with patch('mcpgateway.middleware.rbac.PermissionService', DummyPS):
900 ... asyncio.run(demo(user={"email": "u", "db": object()}))
901 'any-ok'
902 """
904 def decorator(func: Callable) -> Callable:
905 """Decorator function that wraps the original function with any-permission checking.
907 Args:
908 func: The function to be decorated
910 Returns:
911 Callable: The wrapped function with any-permission checking
912 """
914 @wraps(func)
915 async def wrapper(*args, **kwargs):
916 """Async wrapper function that performs any-permission check before calling original function.
918 Args:
919 *args: Positional arguments passed to the wrapped function
920 **kwargs: Keyword arguments passed to the wrapped function
922 Returns:
923 Any: Result from the wrapped function if any-permission check passes
925 Raises:
926 HTTPException: If user authentication or any-permission check fails
927 """
928 # Extract user context from named kwargs only (security: avoid picking up request body dicts)
929 user_context = kwargs.get("user") or kwargs.get("_user") or kwargs.get("current_user") or kwargs.get("current_user_ctx")
930 if not user_context or not isinstance(user_context, dict) or "email" not in user_context:
931 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User authentication required")
933 # Extract team_id from path parameters if available
934 team_id = kwargs.get("team_id")
936 # If team_id is None or blank in kwargs then check
937 if not team_id:
938 # check if user_context has team_id
939 team_id = user_context.get("team_id", None)
941 # For multi-team session tokens (team_id is None), derive team from context
942 check_any_team = False
943 if not team_id and user_context.get("token_use") == "session":
944 db_session = kwargs.get("db") or user_context.get("db")
945 if db_session:
946 # Tier 1: Try to derive team from existing resource
947 team_id = _derive_team_from_resource(kwargs, db_session)
948 # Tier 3: Try to derive team from create payload / form
949 if team_id is None:
950 team_id = await _derive_team_from_payload(kwargs)
951 # If still no team_id, check permission across all of the user's teams.
952 # Authorization ("does this user have the permission?") is separate
953 # from resource scoping ("which team owns this resource?").
954 if not team_id:
955 check_any_team = True
957 # Get db session: prefer endpoint's db param, then user_context["db"], then create fresh
958 db_session = kwargs.get("db") or user_context.get("db")
959 if db_session:
960 # Use existing session from endpoint or user_context
961 permission_service = PermissionService(db_session)
962 # Check if user has any of the required permissions
963 granted = False
964 for permission in permissions:
965 if await permission_service.check_permission(
966 user_email=user_context["email"],
967 permission=permission,
968 resource_type=resource_type,
969 team_id=team_id,
970 token_teams=user_context.get("token_teams"),
971 ip_address=user_context.get("ip_address"),
972 user_agent=user_context.get("user_agent"),
973 allow_admin_bypass=allow_admin_bypass,
974 check_any_team=check_any_team,
975 ):
976 granted = True
977 break
978 else:
979 # Create fresh db session for permission check
980 with fresh_db_session() as db:
981 permission_service = PermissionService(db)
982 # Check if user has any of the required permissions
983 granted = False
984 for permission in permissions:
985 if await permission_service.check_permission(
986 user_email=user_context["email"],
987 permission=permission,
988 resource_type=resource_type,
989 team_id=team_id,
990 token_teams=user_context.get("token_teams"),
991 ip_address=user_context.get("ip_address"),
992 user_agent=user_context.get("user_agent"),
993 allow_admin_bypass=allow_admin_bypass,
994 check_any_team=check_any_team,
995 ):
996 granted = True
997 break
999 if not granted:
1000 logger.warning(f"Permission denied: user={user_context['email']}, permissions={permissions}, resource_type={resource_type}")
1001 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
1003 # Permission granted, execute the original function
1004 return await func(*args, **kwargs)
1006 return wrapper
1008 return decorator
1011class PermissionChecker:
1012 """Context manager for manual permission checking.
1014 Useful for complex permission logic that can't be handled by decorators.
1016 Examples:
1017 >>> from unittest.mock import Mock
1018 >>> checker = PermissionChecker({"email": "user@example.com", "db": Mock()})
1019 >>> hasattr(checker, 'has_permission') and hasattr(checker, 'has_admin_permission')
1020 True
1021 """
1023 def __init__(self, user_context: dict):
1024 """Initialize permission checker with user context.
1026 Args:
1027 user_context: User context from get_current_user_with_permissions
1028 """
1029 self.user_context = user_context
1030 self.db_session = user_context.get("db")
1032 async def has_permission(self, permission: str, resource_type: Optional[str] = None, resource_id: Optional[str] = None, team_id: Optional[str] = None, check_any_team: bool = False) -> bool:
1033 """Check if user has specific permission.
1035 Args:
1036 permission: Permission to check
1037 resource_type: Optional resource type
1038 resource_id: Optional resource ID
1039 team_id: Optional team context
1040 check_any_team: If True, check across all teams the user belongs to
1042 Returns:
1043 bool: True if user has permission
1044 """
1045 if self.db_session:
1046 # Use existing session
1047 permission_service = PermissionService(self.db_session)
1048 return await permission_service.check_permission(
1049 user_email=self.user_context["email"],
1050 permission=permission,
1051 resource_type=resource_type,
1052 resource_id=resource_id,
1053 team_id=team_id,
1054 token_teams=self.user_context.get("token_teams"),
1055 ip_address=self.user_context.get("ip_address"),
1056 user_agent=self.user_context.get("user_agent"),
1057 check_any_team=check_any_team,
1058 )
1059 # Create fresh db session
1060 with fresh_db_session() as db:
1061 permission_service = PermissionService(db)
1062 return await permission_service.check_permission(
1063 user_email=self.user_context["email"],
1064 permission=permission,
1065 resource_type=resource_type,
1066 resource_id=resource_id,
1067 team_id=team_id,
1068 token_teams=self.user_context.get("token_teams"),
1069 ip_address=self.user_context.get("ip_address"),
1070 user_agent=self.user_context.get("user_agent"),
1071 check_any_team=check_any_team,
1072 )
1074 async def has_admin_permission(self) -> bool:
1075 """Check if user has admin permissions.
1077 Returns:
1078 bool: True if user has admin permissions
1079 """
1080 token_teams = self.user_context.get("token_teams")
1081 if self.db_session:
1082 # Use existing session
1083 permission_service = PermissionService(self.db_session)
1084 return await permission_service.check_admin_permission(self.user_context["email"], token_teams=token_teams)
1085 # Create fresh db session
1086 with fresh_db_session() as db:
1087 permission_service = PermissionService(db)
1088 return await permission_service.check_admin_permission(self.user_context["email"], token_teams=token_teams)
1090 async def has_any_permission(self, permissions: List[str], resource_type: Optional[str] = None, team_id: Optional[str] = None) -> bool:
1091 """Check if user has any of the specified permissions.
1093 Args:
1094 permissions: List of permissions to check
1095 resource_type: Optional resource type
1096 team_id: Optional team context
1098 Returns:
1099 bool: True if user has at least one permission
1100 """
1101 if self.db_session:
1102 # Use existing session for all checks
1103 permission_service = PermissionService(self.db_session)
1104 for permission in permissions:
1105 if await permission_service.check_permission(
1106 user_email=self.user_context["email"],
1107 permission=permission,
1108 resource_type=resource_type,
1109 team_id=team_id,
1110 token_teams=self.user_context.get("token_teams"),
1111 ip_address=self.user_context.get("ip_address"),
1112 user_agent=self.user_context.get("user_agent"),
1113 ):
1114 return True
1115 return False
1116 # Create single fresh session for all checks (avoid N sessions for N permissions)
1117 with fresh_db_session() as db:
1118 permission_service = PermissionService(db)
1119 for permission in permissions:
1120 if await permission_service.check_permission(
1121 user_email=self.user_context["email"],
1122 permission=permission,
1123 resource_type=resource_type,
1124 team_id=team_id,
1125 token_teams=self.user_context.get("token_teams"),
1126 ip_address=self.user_context.get("ip_address"),
1127 user_agent=self.user_context.get("user_agent"),
1128 ):
1129 return True
1130 return False
1132 async def require_permission(self, permission: str, resource_type: Optional[str] = None, resource_id: Optional[str] = None, team_id: Optional[str] = None) -> None:
1133 """Require specific permission, raise HTTPException if not granted.
1135 Args:
1136 permission: Required permission
1137 resource_type: Optional resource type
1138 resource_id: Optional resource ID
1139 team_id: Optional team context
1141 Raises:
1142 HTTPException: If permission is not granted
1143 """
1144 if not await self.has_permission(permission, resource_type, resource_id, team_id):
1145 logger.warning(f"{_ACCESS_DENIED_MSG}: user '{self.user_context.get('email')}' missing permission '{permission}'")
1146 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)