Coverage for mcpgateway / middleware / rbac.py: 100%

351 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/middleware/rbac.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7RBAC Permission Checking Middleware. 

8 

9This module provides middleware for FastAPI to enforce role-based access control 

10on API endpoints. It includes permission decorators and dependency injection 

11functions for protecting routes. 

12""" 

13 

14# Standard 

15import functools 

16from functools import wraps 

17import logging 

18from typing import Callable, Generator, List, Optional 

19import uuid 

20import warnings 

21 

22# Third-Party 

23from fastapi import Cookie, Depends, HTTPException, Request, status 

24from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer 

25from sqlalchemy.orm import Session 

26 

27# First-Party 

28from mcpgateway.auth import get_current_user 

29from mcpgateway.config import settings 

30from mcpgateway.db import fresh_db_session, SessionLocal 

31from mcpgateway.services.permission_service import PermissionService 

32from mcpgateway.utils.trace_context import ( 

33 clear_trace_context, 

34 set_trace_auth_method, 

35 set_trace_context_from_teams, 

36 set_trace_team_scope, 

37 set_trace_user_email, 

38 set_trace_user_is_admin, 

39) 

40from mcpgateway.utils.verify_credentials import is_proxy_auth_trust_active 

41 

42logger = logging.getLogger(__name__) 

43 

44# Generic 403 message — intentionally vague to avoid leaking permission names to callers 

45_ACCESS_DENIED_MSG = "Access denied" 

46 

47# HTTP Bearer security scheme for token extraction 

48security = HTTPBearer(auto_error=False) 

49 

50 

51def get_db(request: Request = None) -> Generator[Session, None, None]: 

52 """Get database session for dependency injection. 

53 

54 DEPRECATED: This function is deprecated and will be removed in a future version. 

55 New code should use the request-scoped session from request.state.db or 

56 get_db() from main.py. 

57 

58 For backwards compatibility, this function now reuses the middleware session 

59 when available, eliminating duplicate session creation (Issue #3622). 

60 

61 **Migration Path**: 

62 - Route handlers: Use `db: Session = Depends(get_db)` from main.py 

63 - RBAC checks: Access request.state.db directly in middleware context 

64 

65 Args: 

66 request: Optional FastAPI request object (automatically injected by FastAPI 

67 dependency system when used with Depends()) 

68 

69 Yields: 

70 Session: SQLAlchemy database session 

71 

72 Raises: 

73 Exception: Re-raises any exception after rolling back the transaction. 

74 

75 Note: 

76 When used as a FastAPI dependency via Depends(get_db), the request parameter 

77 is automatically provided by FastAPI's dependency injection system. 

78 

79 Examples: 

80 >>> gen = get_db() 

81 >>> db = next(gen) 

82 >>> hasattr(db, 'query') 

83 True 

84 """ 

85 warnings.warn( 

86 "rbac.get_db() is deprecated. Use request.state.db or get_db() from main.py", 

87 DeprecationWarning, 

88 stacklevel=2, 

89 ) 

90 

91 # Check if middleware already created a request-scoped session 

92 # This matches the pattern from main.py:get_db() (line 3089) 

93 db = None 

94 owned = False 

95 

96 if request is not None: 

97 db = getattr(request.state, "db", None) 

98 if db is not None: 

99 logger.debug(f"[RBAC] Reusing session from middleware: {id(db)}") 

100 

101 # Fallback: create own session (legacy behavior) 

102 if db is None: 

103 logger.debug("[RBAC] Creating new session (no middleware session available)") 

104 db = SessionLocal() 

105 owned = True 

106 

107 try: 

108 yield db 

109 # Only commit if we own the session (backwards compatibility) 

110 if owned: 

111 db.commit() 

112 except Exception: 

113 try: 

114 if owned: 

115 db.rollback() 

116 except Exception: 

117 try: 

118 db.invalidate() 

119 except Exception: 

120 pass # nosec B110 - Best effort cleanup on connection failure 

121 raise 

122 finally: 

123 if owned: 

124 db.close() 

125 

126 

127async def get_permission_service(db: Session = Depends(get_db)) -> PermissionService: 

128 """Get permission service instance for dependency injection. 

129 

130 DEPRECATED: Use PermissionService(db) directly with fresh_db_session() context manager instead. 

131 This function is kept for backwards compatibility with endpoints that still use dependency injection. 

132 

133 Args: 

134 db: Database session 

135 

136 Returns: 

137 PermissionService: Permission checking service instance 

138 

139 Examples: 

140 >>> import asyncio 

141 >>> asyncio.iscoroutinefunction(get_permission_service) 

142 True 

143 """ 

144 return PermissionService(db) 

145 

146 

147async def get_current_user_with_permissions(request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), jwt_token: Optional[str] = Cookie(default=None)): 

148 """Extract current user from JWT token and prepare for permission checking. 

149 

150 Uses fresh_db_session() context manager to avoid session accumulation under high load. 

151 Database sessions are created only when needed and closed immediately after use. 

152 

153 Args: 

154 request: FastAPI request object for IP/user-agent extraction 

155 credentials: HTTP Bearer credentials 

156 jwt_token: JWT token from cookie 

157 

158 Returns: 

159 dict: User information with permission checking context 

160 

161 Raises: 

162 HTTPException: If authentication fails 

163 

164 Examples: 

165 Use as FastAPI dependency:: 

166 

167 @app.get("/protected-endpoint") 

168 async def protected_route(user = Depends(get_current_user_with_permissions)): 

169 return {"user": user["email"]} 

170 """ 

171 

172 def _set_trace_context_for_identity(*, email: Optional[str], is_admin: bool, auth_method: str, token_teams: Optional[List[str]] = None, team_scope_known: bool = False) -> None: 

173 clear_trace_context() 

174 set_trace_user_email(email) 

175 set_trace_user_is_admin(is_admin) 

176 set_trace_auth_method(auth_method) 

177 trace_team_name = getattr(request.state, "trace_team_name", None) 

178 if team_scope_known: 

179 set_trace_context_from_teams(token_teams, user_email=email, is_admin=is_admin, auth_method=auth_method, team_name=trace_team_name) 

180 elif is_admin: 

181 set_trace_team_scope("admin") 

182 

183 # Check for proxy authentication first (if MCP client auth is disabled) 

184 if not settings.mcp_client_auth_enabled: 

185 # Read plugin context from request.state for cross-hook context sharing 

186 # (set by HttpAuthMiddleware for passing contexts between different hook types) 

187 plugin_context_table = getattr(request.state, "plugin_context_table", None) 

188 plugin_global_context = getattr(request.state, "plugin_global_context", None) 

189 

190 if is_proxy_auth_trust_active(settings): 

191 # Extract user from proxy header 

192 proxy_user = request.headers.get(settings.proxy_user_header) 

193 if proxy_user: 

194 # Lookup user in DB to get is_admin status, or check platform_admin_email 

195 is_admin = False 

196 full_name = proxy_user 

197 if proxy_user == settings.platform_admin_email: 

198 is_admin = True 

199 full_name = "Platform Admin" 

200 else: 

201 # Try to lookup user in EmailUser table for is_admin status 

202 try: 

203 # Third-Party 

204 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

205 

206 # First-Party 

207 from mcpgateway.db import EmailUser # pylint: disable=import-outside-toplevel 

208 

209 # Use fresh_db_session for short-lived database access 

210 with fresh_db_session() as db: 

211 user = db.execute(select(EmailUser).where(EmailUser.email == proxy_user)).scalar_one_or_none() 

212 if user: 

213 is_admin = user.is_admin 

214 full_name = user.full_name or proxy_user 

215 except Exception as e: 

216 logger.debug(f"Could not lookup proxy user in DB: {e}") 

217 # Continue with is_admin=False if lookup fails 

218 

219 _set_trace_context_for_identity(email=proxy_user, is_admin=is_admin, auth_method="proxy") 

220 return { 

221 "email": proxy_user, 

222 "full_name": full_name, 

223 "is_admin": is_admin, 

224 "ip_address": request.client.host if request.client else None, 

225 "user_agent": request.headers.get("user-agent"), 

226 "db": None, # Session closed; use endpoint's db param instead 

227 "auth_method": "proxy", 

228 "request_id": getattr(request.state, "request_id", None), 

229 "team_id": getattr(request.state, "team_id", None), 

230 "plugin_context_table": plugin_context_table, 

231 "plugin_global_context": plugin_global_context, 

232 } 

233 

234 # No proxy header - check auth_required to align with WebSocket behavior 

235 # For browser requests, redirect to login; for API requests, return 401 

236 if settings.auth_required: 

237 accept_header = request.headers.get("accept", "") 

238 is_htmx = request.headers.get("hx-request") == "true" 

239 if "text/html" in accept_header or is_htmx: 

240 raise HTTPException( 

241 status_code=status.HTTP_302_FOUND, 

242 detail="Authentication required", 

243 headers={"Location": f"{settings.app_root_path}/admin/login"}, 

244 ) 

245 raise HTTPException( 

246 status_code=status.HTTP_401_UNAUTHORIZED, 

247 detail="Proxy authentication header required", 

248 ) 

249 

250 # auth_required=false: allow anonymous access 

251 

252 _set_trace_context_for_identity(email="anonymous", is_admin=False, auth_method="anonymous", token_teams=[], team_scope_known=True) 

253 return { 

254 "email": "anonymous", 

255 "full_name": "Anonymous User", 

256 "is_admin": False, 

257 "ip_address": request.client.host if request.client else None, 

258 "user_agent": request.headers.get("user-agent"), 

259 "db": None, # Session closed; use endpoint's db param instead 

260 "auth_method": "anonymous", 

261 "request_id": getattr(request.state, "request_id", None), 

262 "team_id": getattr(request.state, "team_id", None), 

263 "plugin_context_table": plugin_context_table, 

264 "plugin_global_context": plugin_global_context, 

265 } 

266 

267 # Warning: MCP auth disabled without proxy trust - security risk! 

268 # This case is already warned about in config validation 

269 # Still check auth_required for consistency 

270 if settings.auth_required: 

271 accept_header = request.headers.get("accept", "") 

272 is_htmx = request.headers.get("hx-request") == "true" 

273 if "text/html" in accept_header or is_htmx: 

274 raise HTTPException( 

275 status_code=status.HTTP_302_FOUND, 

276 detail="Authentication required", 

277 headers={"Location": f"{settings.app_root_path}/admin/login"}, 

278 ) 

279 raise HTTPException( 

280 status_code=status.HTTP_401_UNAUTHORIZED, 

281 detail="Authentication required but no auth method configured", 

282 ) 

283 

284 _set_trace_context_for_identity(email="anonymous", is_admin=False, auth_method="anonymous", token_teams=[], team_scope_known=True) 

285 return { 

286 "email": "anonymous", 

287 "full_name": "Anonymous User", 

288 "is_admin": False, 

289 "ip_address": request.client.host if request.client else None, 

290 "user_agent": request.headers.get("user-agent"), 

291 "db": None, # Session closed; use endpoint's db param instead 

292 "auth_method": "anonymous", 

293 "request_id": getattr(request.state, "request_id", None), 

294 "team_id": getattr(request.state, "team_id", None), 

295 "plugin_context_table": plugin_context_table, 

296 "plugin_global_context": plugin_global_context, 

297 } 

298 

299 # Standard JWT authentication flow 

300 # Try multiple sources for the token, prioritizing Authorization header for API requests 

301 token = None 

302 token_from_cookie = False 

303 

304 # 1. First try Authorization header (preferred for API requests) 

305 if credentials and credentials.credentials: 

306 token = credentials.credentials 

307 

308 # 2. Try manual cookie reading (for browser requests) 

309 if not token and request.cookies: 

310 # Try both jwt_token and access_token cookie names 

311 manual_token = request.cookies.get("jwt_token") or request.cookies.get("access_token") 

312 if manual_token: 

313 token = manual_token 

314 token_from_cookie = True 

315 

316 # 3. Finally try FastAPI Cookie dependency (fallback) 

317 if not token and jwt_token: 

318 token = jwt_token 

319 token_from_cookie = True 

320 

321 # Check if this is a browser/admin-UI request (not an external API request) 

322 accept_header = request.headers.get("accept", "") 

323 is_htmx = request.headers.get("hx-request") == "true" 

324 referer = request.headers.get("referer", "") 

325 is_admin_ui_request = "/admin" in referer 

326 is_browser_request = "text/html" in accept_header or is_htmx or is_admin_ui_request 

327 

328 # SECURITY: Reject cookie-only authentication for API requests 

329 # Cookies should only be used for browser/HTML requests (including admin UI fetch calls) 

330 if token_from_cookie and not is_browser_request: 

331 raise HTTPException( 

332 status_code=status.HTTP_401_UNAUTHORIZED, 

333 detail="Cookie authentication not allowed for API requests. Use Authorization header.", 

334 headers={"WWW-Authenticate": "Bearer"}, 

335 ) 

336 

337 if not token: 

338 # For browser requests (HTML Accept header or HTMX), redirect to login 

339 if is_browser_request: 

340 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{settings.app_root_path}/admin/login"}) 

341 

342 # AUTH_REQUIRED=false no longer implies admin access. 

343 # Preserve explicit unsafe override for local-only compatibility. 

344 if not settings.auth_required and getattr(settings, "allow_unauthenticated_admin", False) is True: 

345 _set_trace_context_for_identity(email=settings.platform_admin_email, is_admin=True, auth_method="disabled") 

346 return { 

347 "email": settings.platform_admin_email, 

348 "full_name": "Platform Admin", 

349 "is_admin": True, 

350 "ip_address": request.client.host if request.client else None, 

351 "user_agent": request.headers.get("user-agent"), 

352 "db": None, # Session closed; use endpoint's db param instead 

353 "auth_method": "disabled", 

354 "request_id": getattr(request.state, "request_id", None), 

355 "team_id": getattr(request.state, "team_id", None), 

356 } 

357 

358 if not settings.auth_required: 

359 _set_trace_context_for_identity(email="anonymous", is_admin=False, auth_method="anonymous", token_teams=[], team_scope_known=True) 

360 return { 

361 "email": "anonymous", 

362 "full_name": "Anonymous User", 

363 "is_admin": False, 

364 "ip_address": request.client.host if request.client else None, 

365 "user_agent": request.headers.get("user-agent"), 

366 "db": None, # Session closed; use endpoint's db param instead 

367 "auth_method": "anonymous", 

368 "request_id": getattr(request.state, "request_id", None), 

369 "team_id": getattr(request.state, "team_id", None), 

370 "plugin_context_table": getattr(request.state, "plugin_context_table", None), 

371 "plugin_global_context": getattr(request.state, "plugin_global_context", None), 

372 } 

373 

374 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authorization token required") 

375 

376 try: 

377 # Create credentials object if we got token from cookie 

378 if not credentials: 

379 credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token) 

380 

381 # Extract user from token using the email auth function 

382 # Pass request to get_current_user so plugins can store auth_method in request.state 

383 user = await get_current_user(credentials, request=request) 

384 

385 # Read auth_method and request_id from request.state 

386 # (auth_method set by plugin in get_current_user, request_id set by HTTP middleware) 

387 auth_method = getattr(request.state, "auth_method", None) 

388 request_id = getattr(request.state, "request_id", None) 

389 team_id = getattr(request.state, "team_id", None) 

390 token_teams = getattr(request.state, "token_teams", None) 

391 

392 # Read plugin context data from request.state for cross-hook context sharing 

393 # (set by HttpAuthMiddleware for passing contexts between different hook types) 

394 plugin_context_table = getattr(request.state, "plugin_context_table", None) 

395 plugin_global_context = getattr(request.state, "plugin_global_context", None) 

396 

397 # Get token_use from request.state (set by get_current_user) 

398 token_use = getattr(request.state, "token_use", None) 

399 

400 # Add request context for permission auditing 

401 return { 

402 "email": user.email, 

403 "full_name": user.full_name, 

404 "is_admin": user.is_admin, 

405 "ip_address": request.client.host if request.client else None, 

406 "user_agent": request.headers.get("user-agent"), 

407 "db": None, # Session closed; use endpoint's db param instead 

408 "auth_method": auth_method, # Include auth_method from plugin 

409 "request_id": request_id, # Include request_id from middleware 

410 "team_id": team_id, # Include team_id from token 

411 "token_teams": token_teams, # Include token teams for query-level scoping 

412 "token_use": token_use, # Include token_use for RBAC team derivation 

413 "plugin_context_table": plugin_context_table, # Plugin contexts for cross-hook sharing 

414 "plugin_global_context": plugin_global_context, # Global context for consistency 

415 } 

416 except Exception as e: 

417 logger.error(f"Authentication failed: {type(e).__name__}: {e}") 

418 

419 # For browser requests (HTML Accept header or HTMX), redirect to login 

420 accept_header = request.headers.get("accept", "") 

421 is_htmx = request.headers.get("hx-request") == "true" 

422 if "text/html" in accept_header or is_htmx: 

423 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{settings.app_root_path}/admin/login"}) 

424 

425 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials") 

426 

427 

428# --- Team derivation helpers for multi-team session tokens --- 

429 

430 

431@functools.lru_cache(maxsize=1) 

432def _get_resource_param_to_model(): 

433 """Lazy-initialize the resource param to model mapping. 

434 

435 Returns: 

436 dict: Mapping of URL parameter names to SQLAlchemy model classes. 

437 """ 

438 # First-Party 

439 from mcpgateway.db import A2AAgent, Gateway, Prompt, Resource, Server, Tool # pylint: disable=import-outside-toplevel 

440 

441 return { 

442 "tool_id": Tool, 

443 "server_id": Server, 

444 "resource_id": Resource, 

445 "prompt_id": Prompt, 

446 "gateway_id": Gateway, 

447 "agent_id": A2AAgent, 

448 } 

449 

450 

451def _derive_team_from_resource(kwargs, db_session) -> Optional[str]: 

452 """Look up resource's team_id from DB for RBAC context (Tier 1). 

453 

454 For endpoints that target a specific resource (get, update, delete, execute), 

455 derive the team context from the resource's owner team. 

456 

457 Args: 

458 kwargs: Endpoint function kwargs containing resource ID params 

459 db_session: Active SQLAlchemy session 

460 

461 Returns: 

462 team_id string if found, None otherwise 

463 """ 

464 mapping = _get_resource_param_to_model() 

465 for param_name, model_cls in mapping.items(): 

466 resource_id = kwargs.get(param_name) 

467 if resource_id: 

468 try: 

469 resource = db_session.get(model_cls, resource_id) 

470 if resource: 

471 return getattr(resource, "team_id", None) 

472 except Exception: # nosec B110 - DB lookup failure falls through to None 

473 pass 

474 return None # Resource not found; let endpoint handle 404 

475 return None # No resource ID param 

476 

477 

478async def _derive_team_from_payload(kwargs) -> Optional[str]: 

479 """Extract team_id from create payload objects or form data (Tier 3). 

480 

481 For create endpoints, derive team context from the Pydantic payload or form data. 

482 

483 Args: 

484 kwargs: Endpoint function kwargs 

485 

486 Returns: 

487 team_id string if found, None otherwise 

488 """ 

489 # Try Pydantic payload objects (API endpoints) 

490 for param_name in ("gateway", "tool", "server", "resource", "prompt", "agent"): 

491 payload_obj = kwargs.get(param_name) 

492 if payload_obj and hasattr(payload_obj, "team_id"): 

493 tid = getattr(payload_obj, "team_id", None) 

494 if tid: 

495 return tid 

496 

497 # Try request form data (admin UI endpoints) 

498 # Note: use 'is not None' rather than truthiness check because some 

499 # objects (e.g. Pydantic models) may be truthy yet lack .headers. 

500 request = kwargs.get("request") 

501 if request is not None and isinstance(request, Request): 

502 content_type = request.headers.get("content-type", "") 

503 if "form" in content_type: 

504 try: 

505 form = await request.form() 

506 tid = form.get("team_id") 

507 if tid: 

508 return tid 

509 except Exception: # nosec B110 - Form parse failure is non-fatal 

510 pass 

511 

512 return None 

513 

514 

515# Permissions that indicate create/mutate operations (not safe for "any-team" aggregation) 

516_MUTATE_PERMISSION_ACTIONS = frozenset( 

517 { 

518 "create", 

519 "update", 

520 "delete", 

521 "execute", 

522 "invoke", 

523 "toggle", 

524 "set_state", 

525 "revoke", 

526 "manage_members", 

527 "join", 

528 "manage", 

529 "share", 

530 "invite", 

531 "use", 

532 } 

533) 

534 

535 

536def _is_mutate_permission(permission: str) -> bool: 

537 """Check if a permission string represents a mutate operation. 

538 

539 Handles both dot-separated (tools.create) and colon-separated 

540 (admin.sso_providers:create) permission formats. 

541 

542 Args: 

543 permission: Permission string like 'tools.create' or 'admin.sso_providers:create'. 

544 

545 Returns: 

546 bool: True if the permission's action component is a mutating operation. 

547 """ 

548 # Handle colon separator: admin.sso_providers:create → action is "create" 

549 if ":" in permission: 

550 action = permission.rsplit(":", 1)[-1] 

551 return action in _MUTATE_PERMISSION_ACTIONS 

552 parts = permission.split(".") 

553 return parts[-1] in _MUTATE_PERMISSION_ACTIONS if len(parts) >= 2 else False 

554 

555 

556def require_permission(permission: str, resource_type: Optional[str] = None, allow_admin_bypass: bool = True): 

557 """Decorator to require specific permission for accessing an endpoint. 

558 

559 Args: 

560 permission: Required permission (e.g., 'tools.create') 

561 resource_type: Optional resource type for resource-specific permissions 

562 allow_admin_bypass: If True (default), admin users bypass all permission checks. 

563 If False, even admins must have explicit permissions. 

564 Use False for admin UI routes to enforce granular RBAC. 

565 

566 Returns: 

567 Callable: Decorated function that enforces the permission requirement 

568 

569 Examples: 

570 >>> decorator = require_permission("tools.create", "tools") 

571 >>> callable(decorator) 

572 True 

573 

574 Execute wrapped function when permission granted: 

575 >>> import asyncio 

576 >>> class DummyPS: 

577 ... def __init__(self, db): 

578 ... pass 

579 ... async def check_permission(self, **kwargs): 

580 ... return True 

581 >>> @require_permission("tools.read") 

582 ... async def demo(user=None): 

583 ... return "ok" 

584 >>> from unittest.mock import patch 

585 >>> with patch('mcpgateway.middleware.rbac.PermissionService', DummyPS): 

586 ... asyncio.run(demo(user={"email": "u", "db": object()})) 

587 'ok' 

588 """ 

589 

590 def decorator(func: Callable) -> Callable: 

591 """Decorator function that wraps the original function with permission checking. 

592 

593 Args: 

594 func: The function to be decorated 

595 

596 Returns: 

597 Callable: The wrapped function with permission checking 

598 """ 

599 

600 @wraps(func) 

601 async def wrapper(*args, **kwargs): 

602 """Async wrapper function that performs permission check before calling original function. 

603 

604 Args: 

605 *args: Positional arguments passed to the wrapped function 

606 **kwargs: Keyword arguments passed to the wrapped function 

607 

608 Returns: 

609 Any: Result from the wrapped function if permission check passes 

610 

611 Raises: 

612 HTTPException: If user authentication or permission check fails 

613 """ 

614 # Extract user context from named kwargs only (security: avoid picking up request body dicts) 

615 user_context = kwargs.get("user") or kwargs.get("_user") or kwargs.get("current_user") or kwargs.get("current_user_ctx") 

616 if not user_context or not isinstance(user_context, dict) or "email" not in user_context: 

617 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User authentication required") 

618 

619 # Extract team_id from path parameters if available 

620 team_id = kwargs.get("team_id") 

621 

622 # If team_id is None or blank in kwargs then check 

623 if not team_id: 

624 # check if user_context has team_id 

625 team_id = user_context.get("team_id", None) 

626 

627 # For multi-team session tokens (team_id is None), derive team from context 

628 check_any_team = False 

629 if not team_id and user_context.get("token_use") == "session": 

630 db_session = kwargs.get("db") or user_context.get("db") 

631 if db_session: 

632 # Tier 1: Try to derive team from existing resource 

633 team_id = _derive_team_from_resource(kwargs, db_session) 

634 # Tier 3: Try to derive team from create payload / form 

635 if team_id is None: 

636 team_id = await _derive_team_from_payload(kwargs) 

637 # If still no team_id, check permission across all of the user's teams. 

638 # This separates authorization ("does this user have the permission?") 

639 # from resource scoping ("which team owns this resource?"). Team 

640 # assignment is enforced downstream by endpoint logic (e.g. 

641 # verify_team_for_user, token team membership checks). 

642 if not team_id: 

643 check_any_team = True 

644 

645 # First, check if any plugins want to handle permission checking 

646 # First-Party 

647 from mcpgateway.plugins.framework import get_plugin_manager, GlobalContext, HttpAuthCheckPermissionPayload, HttpHookType # pylint: disable=import-outside-toplevel 

648 

649 plugin_manager = get_plugin_manager() 

650 if plugin_manager and plugin_manager.has_hooks_for(HttpHookType.HTTP_AUTH_CHECK_PERMISSION): 

651 # Get plugin contexts from user_context (stored in request.state by HttpAuthMiddleware) 

652 # These enable cross-hook context sharing between HTTP_PRE_REQUEST and HTTP_AUTH_CHECK_PERMISSION 

653 plugin_context_table = user_context.get("plugin_context_table") 

654 plugin_global_context = user_context.get("plugin_global_context") 

655 

656 # Reuse existing global context from middleware if available for consistency 

657 # Otherwise create a new one (fallback for cases where middleware didn't run) 

658 if plugin_global_context: 

659 global_context = plugin_global_context 

660 else: 

661 request_id = user_context.get("request_id") or uuid.uuid4().hex 

662 global_context = GlobalContext( 

663 request_id=request_id, 

664 server_id=None, 

665 tenant_id=None, 

666 ) 

667 

668 # Invoke permission check hook, passing plugin contexts from HTTP_PRE_REQUEST hook 

669 result, _ = await plugin_manager.invoke_hook( 

670 HttpHookType.HTTP_AUTH_CHECK_PERMISSION, 

671 payload=HttpAuthCheckPermissionPayload( 

672 user_email=user_context["email"], 

673 permission=permission, 

674 resource_type=resource_type, 

675 team_id=team_id, 

676 is_admin=user_context.get("is_admin", False), 

677 auth_method=user_context.get("auth_method"), 

678 client_host=user_context.get("ip_address"), 

679 user_agent=user_context.get("user_agent"), 

680 ), 

681 global_context=global_context, 

682 local_contexts=plugin_context_table, # Pass context table for cross-hook state 

683 ) 

684 

685 # If a plugin made a decision, respect it 

686 if result and result.modified_payload and hasattr(result.modified_payload, "granted"): 

687 decision_plugin = "unknown" 

688 decision_reason = getattr(result.modified_payload, "reason", None) 

689 result_metadata = result.metadata if isinstance(result.metadata, dict) else {} 

690 if result_metadata.get("_decision_plugin"): 

691 decision_plugin = str(result_metadata["_decision_plugin"]) 

692 for key in ("plugin_name", "plugin", "source_plugin", "handler"): 

693 if decision_plugin != "unknown": 

694 break 

695 plugin_name = result_metadata.get(key) 

696 if plugin_name: 

697 decision_plugin = str(plugin_name) 

698 

699 logger.info( 

700 "Plugin permission decision: plugin=%s user=%s permission=%s granted=%s reason=%s", 

701 decision_plugin, 

702 user_context["email"], 

703 permission, 

704 result.modified_payload.granted, 

705 decision_reason, 

706 ) 

707 

708 if result.modified_payload.granted: 

709 if settings.plugins_can_override_rbac: 

710 logger.warning( 

711 "Plugin RBAC grant override applied: plugin=%s user=%s permission=%s reason=%s", 

712 decision_plugin, 

713 user_context["email"], 

714 permission, 

715 decision_reason, 

716 ) 

717 return await func(*args, **kwargs) 

718 

719 logger.info( 

720 "Plugin RBAC grant decision ignored by default policy: plugin=%s user=%s permission=%s", 

721 decision_plugin, 

722 user_context["email"], 

723 permission, 

724 ) 

725 else: 

726 logger.warning( 

727 "Permission denied by plugin: plugin=%s user=%s permission=%s reason=%s", 

728 decision_plugin, 

729 user_context["email"], 

730 permission, 

731 decision_reason, 

732 ) 

733 raise HTTPException( 

734 status_code=status.HTTP_403_FORBIDDEN, 

735 detail=_ACCESS_DENIED_MSG, 

736 ) 

737 

738 # No plugin handled it, fall through to standard RBAC check 

739 # Get db session: prefer endpoint's db param, then user_context["db"], then create fresh 

740 db_session = kwargs.get("db") or user_context.get("db") 

741 if db_session: 

742 # Use existing session from endpoint or user_context 

743 permission_service = PermissionService(db_session) 

744 granted = await permission_service.check_permission( 

745 user_email=user_context["email"], 

746 permission=permission, 

747 resource_type=resource_type, 

748 team_id=team_id, 

749 token_teams=user_context.get("token_teams"), 

750 ip_address=user_context.get("ip_address"), 

751 user_agent=user_context.get("user_agent"), 

752 allow_admin_bypass=allow_admin_bypass, 

753 check_any_team=check_any_team, 

754 ) 

755 else: 

756 # Create fresh db session for permission check 

757 with fresh_db_session() as db: 

758 permission_service = PermissionService(db) 

759 granted = await permission_service.check_permission( 

760 user_email=user_context["email"], 

761 permission=permission, 

762 resource_type=resource_type, 

763 team_id=team_id, 

764 token_teams=user_context.get("token_teams"), 

765 ip_address=user_context.get("ip_address"), 

766 user_agent=user_context.get("user_agent"), 

767 allow_admin_bypass=allow_admin_bypass, 

768 check_any_team=check_any_team, 

769 ) 

770 

771 if not granted: 

772 logger.warning(f"Permission denied: user={user_context['email']}, permission={permission}, resource_type={resource_type}") 

773 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG) 

774 

775 # Permission granted, execute the original function 

776 return await func(*args, **kwargs) 

777 

778 # Store permission metadata as function attributes for introspection 

779 # This enables validation tools to extract permissions without fragile closure inspection 

780 # Using setattr() to avoid pylint protected-access warnings 

781 setattr(wrapper, "_required_permission", permission) 

782 setattr(wrapper, "_resource_type", resource_type) 

783 setattr(wrapper, "_allow_admin_bypass", allow_admin_bypass) 

784 

785 return wrapper 

786 

787 return decorator 

788 

789 

790def require_admin_permission(): 

791 """Decorator to require admin permissions for accessing an endpoint. 

792 

793 Returns: 

794 Callable: Decorated function that enforces admin permission requirement 

795 

796 Examples: 

797 >>> decorator = require_admin_permission() 

798 >>> callable(decorator) 

799 True 

800 

801 Execute when admin permission granted: 

802 >>> import asyncio 

803 >>> class DummyPS: 

804 ... def __init__(self, db): 

805 ... pass 

806 ... async def check_admin_permission(self, email, token_teams=None): 

807 ... return True 

808 >>> @require_admin_permission() 

809 ... async def demo(user=None): 

810 ... return "admin-ok" 

811 >>> from unittest.mock import patch 

812 >>> with patch('mcpgateway.middleware.rbac.PermissionService', DummyPS): 

813 ... asyncio.run(demo(user={"email": "u", "db": object()})) 

814 'admin-ok' 

815 """ 

816 

817 def decorator(func: Callable) -> Callable: 

818 """Decorator function that wraps the original function with admin permission checking. 

819 

820 Args: 

821 func: The function to be decorated 

822 

823 Returns: 

824 Callable: The wrapped function with admin permission checking 

825 """ 

826 

827 @wraps(func) 

828 async def wrapper(*args, **kwargs): 

829 """Async wrapper function that performs admin permission check before calling original function. 

830 

831 Args: 

832 *args: Positional arguments passed to the wrapped function 

833 **kwargs: Keyword arguments passed to the wrapped function 

834 

835 Returns: 

836 Any: Result from the wrapped function if admin permission check passes 

837 

838 Raises: 

839 HTTPException: If user authentication or admin permission check fails 

840 """ 

841 # Extract user context from named kwargs only (security: avoid picking up request body dicts) 

842 user_context = kwargs.get("user") or kwargs.get("_user") or kwargs.get("current_user") or kwargs.get("current_user_ctx") 

843 if not user_context or not isinstance(user_context, dict) or "email" not in user_context: 

844 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User authentication required") 

845 

846 # Get db session: prefer endpoint's db param, then user_context["db"], then create fresh 

847 db_session = kwargs.get("db") or user_context.get("db") 

848 token_teams = user_context.get("token_teams") # Forward token scope 

849 if db_session: 

850 # Use existing session from endpoint or user_context 

851 permission_service = PermissionService(db_session) 

852 has_admin_permission = await permission_service.check_admin_permission(user_context["email"], token_teams=token_teams) 

853 else: 

854 # Create fresh db session for permission check 

855 with fresh_db_session() as db: 

856 permission_service = PermissionService(db) 

857 has_admin_permission = await permission_service.check_admin_permission(user_context["email"], token_teams=token_teams) 

858 

859 if not has_admin_permission: 

860 logger.warning(f"Admin permission denied: user={user_context['email']}") 

861 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG) 

862 

863 # Admin permission granted, execute the original function 

864 return await func(*args, **kwargs) 

865 

866 return wrapper 

867 

868 return decorator 

869 

870 

871def require_any_permission(permissions: List[str], resource_type: Optional[str] = None, allow_admin_bypass: bool = True): 

872 """Decorator to require any of the specified permissions for accessing an endpoint. 

873 

874 Args: 

875 permissions: List of permissions, user needs at least one 

876 resource_type: Optional resource type for resource-specific permissions 

877 allow_admin_bypass: If True (default), admin users bypass all permission checks. 

878 If False, even admins must have explicit permissions. 

879 

880 Returns: 

881 Callable: Decorated function that enforces the permission requirements 

882 

883 Examples: 

884 >>> decorator = require_any_permission(["tools.read", "tools.execute"], "tools") 

885 >>> callable(decorator) 

886 True 

887 

888 Execute when any permission granted: 

889 >>> import asyncio 

890 >>> class DummyPS: 

891 ... def __init__(self, db): 

892 ... pass 

893 ... async def check_permission(self, **kwargs): 

894 ... return True 

895 >>> @require_any_permission(["tools.read", "tools.execute"], "tools") 

896 ... async def demo(user=None): 

897 ... return "any-ok" 

898 >>> from unittest.mock import patch 

899 >>> with patch('mcpgateway.middleware.rbac.PermissionService', DummyPS): 

900 ... asyncio.run(demo(user={"email": "u", "db": object()})) 

901 'any-ok' 

902 """ 

903 

904 def decorator(func: Callable) -> Callable: 

905 """Decorator function that wraps the original function with any-permission checking. 

906 

907 Args: 

908 func: The function to be decorated 

909 

910 Returns: 

911 Callable: The wrapped function with any-permission checking 

912 """ 

913 

914 @wraps(func) 

915 async def wrapper(*args, **kwargs): 

916 """Async wrapper function that performs any-permission check before calling original function. 

917 

918 Args: 

919 *args: Positional arguments passed to the wrapped function 

920 **kwargs: Keyword arguments passed to the wrapped function 

921 

922 Returns: 

923 Any: Result from the wrapped function if any-permission check passes 

924 

925 Raises: 

926 HTTPException: If user authentication or any-permission check fails 

927 """ 

928 # Extract user context from named kwargs only (security: avoid picking up request body dicts) 

929 user_context = kwargs.get("user") or kwargs.get("_user") or kwargs.get("current_user") or kwargs.get("current_user_ctx") 

930 if not user_context or not isinstance(user_context, dict) or "email" not in user_context: 

931 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User authentication required") 

932 

933 # Extract team_id from path parameters if available 

934 team_id = kwargs.get("team_id") 

935 

936 # If team_id is None or blank in kwargs then check 

937 if not team_id: 

938 # check if user_context has team_id 

939 team_id = user_context.get("team_id", None) 

940 

941 # For multi-team session tokens (team_id is None), derive team from context 

942 check_any_team = False 

943 if not team_id and user_context.get("token_use") == "session": 

944 db_session = kwargs.get("db") or user_context.get("db") 

945 if db_session: 

946 # Tier 1: Try to derive team from existing resource 

947 team_id = _derive_team_from_resource(kwargs, db_session) 

948 # Tier 3: Try to derive team from create payload / form 

949 if team_id is None: 

950 team_id = await _derive_team_from_payload(kwargs) 

951 # If still no team_id, check permission across all of the user's teams. 

952 # Authorization ("does this user have the permission?") is separate 

953 # from resource scoping ("which team owns this resource?"). 

954 if not team_id: 

955 check_any_team = True 

956 

957 # Get db session: prefer endpoint's db param, then user_context["db"], then create fresh 

958 db_session = kwargs.get("db") or user_context.get("db") 

959 if db_session: 

960 # Use existing session from endpoint or user_context 

961 permission_service = PermissionService(db_session) 

962 # Check if user has any of the required permissions 

963 granted = False 

964 for permission in permissions: 

965 if await permission_service.check_permission( 

966 user_email=user_context["email"], 

967 permission=permission, 

968 resource_type=resource_type, 

969 team_id=team_id, 

970 token_teams=user_context.get("token_teams"), 

971 ip_address=user_context.get("ip_address"), 

972 user_agent=user_context.get("user_agent"), 

973 allow_admin_bypass=allow_admin_bypass, 

974 check_any_team=check_any_team, 

975 ): 

976 granted = True 

977 break 

978 else: 

979 # Create fresh db session for permission check 

980 with fresh_db_session() as db: 

981 permission_service = PermissionService(db) 

982 # Check if user has any of the required permissions 

983 granted = False 

984 for permission in permissions: 

985 if await permission_service.check_permission( 

986 user_email=user_context["email"], 

987 permission=permission, 

988 resource_type=resource_type, 

989 team_id=team_id, 

990 token_teams=user_context.get("token_teams"), 

991 ip_address=user_context.get("ip_address"), 

992 user_agent=user_context.get("user_agent"), 

993 allow_admin_bypass=allow_admin_bypass, 

994 check_any_team=check_any_team, 

995 ): 

996 granted = True 

997 break 

998 

999 if not granted: 

1000 logger.warning(f"Permission denied: user={user_context['email']}, permissions={permissions}, resource_type={resource_type}") 

1001 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG) 

1002 

1003 # Permission granted, execute the original function 

1004 return await func(*args, **kwargs) 

1005 

1006 return wrapper 

1007 

1008 return decorator 

1009 

1010 

1011class PermissionChecker: 

1012 """Context manager for manual permission checking. 

1013 

1014 Useful for complex permission logic that can't be handled by decorators. 

1015 

1016 Examples: 

1017 >>> from unittest.mock import Mock 

1018 >>> checker = PermissionChecker({"email": "user@example.com", "db": Mock()}) 

1019 >>> hasattr(checker, 'has_permission') and hasattr(checker, 'has_admin_permission') 

1020 True 

1021 """ 

1022 

1023 def __init__(self, user_context: dict): 

1024 """Initialize permission checker with user context. 

1025 

1026 Args: 

1027 user_context: User context from get_current_user_with_permissions 

1028 """ 

1029 self.user_context = user_context 

1030 self.db_session = user_context.get("db") 

1031 

1032 async def has_permission(self, permission: str, resource_type: Optional[str] = None, resource_id: Optional[str] = None, team_id: Optional[str] = None, check_any_team: bool = False) -> bool: 

1033 """Check if user has specific permission. 

1034 

1035 Args: 

1036 permission: Permission to check 

1037 resource_type: Optional resource type 

1038 resource_id: Optional resource ID 

1039 team_id: Optional team context 

1040 check_any_team: If True, check across all teams the user belongs to 

1041 

1042 Returns: 

1043 bool: True if user has permission 

1044 """ 

1045 if self.db_session: 

1046 # Use existing session 

1047 permission_service = PermissionService(self.db_session) 

1048 return await permission_service.check_permission( 

1049 user_email=self.user_context["email"], 

1050 permission=permission, 

1051 resource_type=resource_type, 

1052 resource_id=resource_id, 

1053 team_id=team_id, 

1054 token_teams=self.user_context.get("token_teams"), 

1055 ip_address=self.user_context.get("ip_address"), 

1056 user_agent=self.user_context.get("user_agent"), 

1057 check_any_team=check_any_team, 

1058 ) 

1059 # Create fresh db session 

1060 with fresh_db_session() as db: 

1061 permission_service = PermissionService(db) 

1062 return await permission_service.check_permission( 

1063 user_email=self.user_context["email"], 

1064 permission=permission, 

1065 resource_type=resource_type, 

1066 resource_id=resource_id, 

1067 team_id=team_id, 

1068 token_teams=self.user_context.get("token_teams"), 

1069 ip_address=self.user_context.get("ip_address"), 

1070 user_agent=self.user_context.get("user_agent"), 

1071 check_any_team=check_any_team, 

1072 ) 

1073 

1074 async def has_admin_permission(self) -> bool: 

1075 """Check if user has admin permissions. 

1076 

1077 Returns: 

1078 bool: True if user has admin permissions 

1079 """ 

1080 token_teams = self.user_context.get("token_teams") 

1081 if self.db_session: 

1082 # Use existing session 

1083 permission_service = PermissionService(self.db_session) 

1084 return await permission_service.check_admin_permission(self.user_context["email"], token_teams=token_teams) 

1085 # Create fresh db session 

1086 with fresh_db_session() as db: 

1087 permission_service = PermissionService(db) 

1088 return await permission_service.check_admin_permission(self.user_context["email"], token_teams=token_teams) 

1089 

1090 async def has_any_permission(self, permissions: List[str], resource_type: Optional[str] = None, team_id: Optional[str] = None) -> bool: 

1091 """Check if user has any of the specified permissions. 

1092 

1093 Args: 

1094 permissions: List of permissions to check 

1095 resource_type: Optional resource type 

1096 team_id: Optional team context 

1097 

1098 Returns: 

1099 bool: True if user has at least one permission 

1100 """ 

1101 if self.db_session: 

1102 # Use existing session for all checks 

1103 permission_service = PermissionService(self.db_session) 

1104 for permission in permissions: 

1105 if await permission_service.check_permission( 

1106 user_email=self.user_context["email"], 

1107 permission=permission, 

1108 resource_type=resource_type, 

1109 team_id=team_id, 

1110 token_teams=self.user_context.get("token_teams"), 

1111 ip_address=self.user_context.get("ip_address"), 

1112 user_agent=self.user_context.get("user_agent"), 

1113 ): 

1114 return True 

1115 return False 

1116 # Create single fresh session for all checks (avoid N sessions for N permissions) 

1117 with fresh_db_session() as db: 

1118 permission_service = PermissionService(db) 

1119 for permission in permissions: 

1120 if await permission_service.check_permission( 

1121 user_email=self.user_context["email"], 

1122 permission=permission, 

1123 resource_type=resource_type, 

1124 team_id=team_id, 

1125 token_teams=self.user_context.get("token_teams"), 

1126 ip_address=self.user_context.get("ip_address"), 

1127 user_agent=self.user_context.get("user_agent"), 

1128 ): 

1129 return True 

1130 return False 

1131 

1132 async def require_permission(self, permission: str, resource_type: Optional[str] = None, resource_id: Optional[str] = None, team_id: Optional[str] = None) -> None: 

1133 """Require specific permission, raise HTTPException if not granted. 

1134 

1135 Args: 

1136 permission: Required permission 

1137 resource_type: Optional resource type 

1138 resource_id: Optional resource ID 

1139 team_id: Optional team context 

1140 

1141 Raises: 

1142 HTTPException: If permission is not granted 

1143 """ 

1144 if not await self.has_permission(permission, resource_type, resource_id, team_id): 

1145 logger.warning(f"{_ACCESS_DENIED_MSG}: user '{self.user_context.get('email')}' missing permission '{permission}'") 

1146 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)