Coverage for mcpgateway / middleware / rbac.py: 100%

299 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/middleware/rbac.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7RBAC Permission Checking Middleware. 

8 

9This module provides middleware for FastAPI to enforce role-based access control 

10on API endpoints. It includes permission decorators and dependency injection 

11functions for protecting routes. 

12""" 

13 

14# Standard 

15import functools 

16from functools import wraps 

17import logging 

18from typing import Callable, Generator, List, Optional 

19import uuid 

20 

21# Third-Party 

22from fastapi import Cookie, Depends, HTTPException, Request, status 

23from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer 

24from sqlalchemy.orm import Session 

25 

26# First-Party 

27from mcpgateway.auth import get_current_user 

28from mcpgateway.config import settings 

29from mcpgateway.db import fresh_db_session, SessionLocal 

30from mcpgateway.services.permission_service import PermissionService 

31 

32logger = logging.getLogger(__name__) 

33 

34# HTTP Bearer security scheme for token extraction 

35security = HTTPBearer(auto_error=False) 

36 

37 

38def get_db() -> Generator[Session, None, None]: 

39 """Get database session for dependency injection. 

40 

41 DEPRECATED: Use fresh_db_session() context manager instead to avoid session accumulation. 

42 This function is kept for backwards compatibility with endpoints that still use Depends(get_db). 

43 

44 Commits the transaction on successful completion to avoid implicit rollbacks 

45 for read-only operations. Rolls back explicitly on exception. 

46 

47 Yields: 

48 Session: SQLAlchemy database session 

49 

50 Raises: 

51 Exception: Re-raises any exception after rolling back the transaction. 

52 

53 Examples: 

54 >>> gen = get_db() 

55 >>> db = next(gen) 

56 >>> hasattr(db, 'query') 

57 True 

58 """ 

59 db = SessionLocal() 

60 try: 

61 yield db 

62 db.commit() 

63 except Exception: 

64 try: 

65 db.rollback() 

66 except Exception: 

67 try: 

68 db.invalidate() 

69 except Exception: 

70 pass # nosec B110 - Best effort cleanup on connection failure 

71 raise 

72 finally: 

73 db.close() 

74 

75 

76async def get_permission_service(db: Session = Depends(get_db)) -> PermissionService: 

77 """Get permission service instance for dependency injection. 

78 

79 DEPRECATED: Use PermissionService(db) directly with fresh_db_session() context manager instead. 

80 This function is kept for backwards compatibility with endpoints that still use dependency injection. 

81 

82 Args: 

83 db: Database session 

84 

85 Returns: 

86 PermissionService: Permission checking service instance 

87 

88 Examples: 

89 >>> import asyncio 

90 >>> asyncio.iscoroutinefunction(get_permission_service) 

91 True 

92 """ 

93 return PermissionService(db) 

94 

95 

96async def get_current_user_with_permissions(request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), jwt_token: Optional[str] = Cookie(default=None)): 

97 """Extract current user from JWT token and prepare for permission checking. 

98 

99 Uses fresh_db_session() context manager to avoid session accumulation under high load. 

100 Database sessions are created only when needed and closed immediately after use. 

101 

102 Args: 

103 request: FastAPI request object for IP/user-agent extraction 

104 credentials: HTTP Bearer credentials 

105 jwt_token: JWT token from cookie 

106 

107 Returns: 

108 dict: User information with permission checking context 

109 

110 Raises: 

111 HTTPException: If authentication fails 

112 

113 Examples: 

114 Use as FastAPI dependency:: 

115 

116 @app.get("/protected-endpoint") 

117 async def protected_route(user = Depends(get_current_user_with_permissions)): 

118 return {"user": user["email"]} 

119 """ 

120 # Check for proxy authentication first (if MCP client auth is disabled) 

121 if not settings.mcp_client_auth_enabled: 

122 # Read plugin context from request.state for cross-hook context sharing 

123 # (set by HttpAuthMiddleware for passing contexts between different hook types) 

124 plugin_context_table = getattr(request.state, "plugin_context_table", None) 

125 plugin_global_context = getattr(request.state, "plugin_global_context", None) 

126 

127 if settings.trust_proxy_auth: 

128 # Extract user from proxy header 

129 proxy_user = request.headers.get(settings.proxy_user_header) 

130 if proxy_user: 

131 # Lookup user in DB to get is_admin status, or check platform_admin_email 

132 is_admin = False 

133 full_name = proxy_user 

134 if proxy_user == settings.platform_admin_email: 

135 is_admin = True 

136 full_name = "Platform Admin" 

137 else: 

138 # Try to lookup user in EmailUser table for is_admin status 

139 try: 

140 # Third-Party 

141 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

142 

143 # First-Party 

144 from mcpgateway.db import EmailUser # pylint: disable=import-outside-toplevel 

145 

146 # Use fresh_db_session for short-lived database access 

147 with fresh_db_session() as db: 

148 user = db.execute(select(EmailUser).where(EmailUser.email == proxy_user)).scalar_one_or_none() 

149 if user: 

150 is_admin = user.is_admin 

151 full_name = user.full_name or proxy_user 

152 except Exception as e: 

153 logger.debug(f"Could not lookup proxy user in DB: {e}") 

154 # Continue with is_admin=False if lookup fails 

155 

156 return { 

157 "email": proxy_user, 

158 "full_name": full_name, 

159 "is_admin": is_admin, 

160 "ip_address": request.client.host if request.client else None, 

161 "user_agent": request.headers.get("user-agent"), 

162 "db": None, # Session closed; use endpoint's db param instead 

163 "auth_method": "proxy", 

164 "request_id": getattr(request.state, "request_id", None), 

165 "team_id": getattr(request.state, "team_id", None), 

166 "plugin_context_table": plugin_context_table, 

167 "plugin_global_context": plugin_global_context, 

168 } 

169 

170 # No proxy header - check auth_required to align with WebSocket behavior 

171 # For browser requests, redirect to login; for API requests, return 401 

172 if settings.auth_required: 

173 accept_header = request.headers.get("accept", "") 

174 is_htmx = request.headers.get("hx-request") == "true" 

175 if "text/html" in accept_header or is_htmx: 

176 raise HTTPException( 

177 status_code=status.HTTP_302_FOUND, 

178 detail="Authentication required", 

179 headers={"Location": f"{settings.app_root_path}/admin/login"}, 

180 ) 

181 raise HTTPException( 

182 status_code=status.HTTP_401_UNAUTHORIZED, 

183 detail="Proxy authentication header required", 

184 ) 

185 

186 # auth_required=false: allow anonymous access 

187 

188 return { 

189 "email": "anonymous", 

190 "full_name": "Anonymous User", 

191 "is_admin": False, 

192 "ip_address": request.client.host if request.client else None, 

193 "user_agent": request.headers.get("user-agent"), 

194 "db": None, # Session closed; use endpoint's db param instead 

195 "auth_method": "anonymous", 

196 "request_id": getattr(request.state, "request_id", None), 

197 "team_id": getattr(request.state, "team_id", None), 

198 "plugin_context_table": plugin_context_table, 

199 "plugin_global_context": plugin_global_context, 

200 } 

201 

202 # Warning: MCP auth disabled without proxy trust - security risk! 

203 # This case is already warned about in config validation 

204 # Still check auth_required for consistency 

205 if settings.auth_required: 

206 accept_header = request.headers.get("accept", "") 

207 is_htmx = request.headers.get("hx-request") == "true" 

208 if "text/html" in accept_header or is_htmx: 

209 raise HTTPException( 

210 status_code=status.HTTP_302_FOUND, 

211 detail="Authentication required", 

212 headers={"Location": f"{settings.app_root_path}/admin/login"}, 

213 ) 

214 raise HTTPException( 

215 status_code=status.HTTP_401_UNAUTHORIZED, 

216 detail="Authentication required but no auth method configured", 

217 ) 

218 

219 return { 

220 "email": "anonymous", 

221 "full_name": "Anonymous User", 

222 "is_admin": False, 

223 "ip_address": request.client.host if request.client else None, 

224 "user_agent": request.headers.get("user-agent"), 

225 "db": None, # Session closed; use endpoint's db param instead 

226 "auth_method": "anonymous", 

227 "request_id": getattr(request.state, "request_id", None), 

228 "team_id": getattr(request.state, "team_id", None), 

229 "plugin_context_table": plugin_context_table, 

230 "plugin_global_context": plugin_global_context, 

231 } 

232 

233 # Standard JWT authentication flow 

234 # Try multiple sources for the token, prioritizing Authorization header for API requests 

235 token = None 

236 token_from_cookie = False 

237 

238 # 1. First try Authorization header (preferred for API requests) 

239 if credentials and credentials.credentials: 

240 token = credentials.credentials 

241 

242 # 2. Try manual cookie reading (for browser requests) 

243 if not token and request.cookies: 

244 # Try both jwt_token and access_token cookie names 

245 manual_token = request.cookies.get("jwt_token") or request.cookies.get("access_token") 

246 if manual_token: 

247 token = manual_token 

248 token_from_cookie = True 

249 

250 # 3. Finally try FastAPI Cookie dependency (fallback) 

251 if not token and jwt_token: 

252 token = jwt_token 

253 token_from_cookie = True 

254 

255 # Check if this is a browser/admin-UI request (not an external API request) 

256 accept_header = request.headers.get("accept", "") 

257 is_htmx = request.headers.get("hx-request") == "true" 

258 referer = request.headers.get("referer", "") 

259 is_admin_ui_request = "/admin" in referer 

260 is_browser_request = "text/html" in accept_header or is_htmx or is_admin_ui_request 

261 

262 # SECURITY: Reject cookie-only authentication for API requests 

263 # Cookies should only be used for browser/HTML requests (including admin UI fetch calls) 

264 if token_from_cookie and not is_browser_request: 

265 raise HTTPException( 

266 status_code=status.HTTP_401_UNAUTHORIZED, 

267 detail="Cookie authentication not allowed for API requests. Use Authorization header.", 

268 headers={"WWW-Authenticate": "Bearer"}, 

269 ) 

270 

271 if not token: 

272 # For browser requests (HTML Accept header or HTMX), redirect to login 

273 if is_browser_request: 

274 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{settings.app_root_path}/admin/login"}) 

275 

276 # If auth is disabled, return the stock admin user 

277 if not settings.auth_required: 

278 return { 

279 "email": settings.platform_admin_email, 

280 "full_name": "Platform Admin", 

281 "is_admin": True, 

282 "ip_address": request.client.host if request.client else None, 

283 "user_agent": request.headers.get("user-agent"), 

284 "db": None, # Session closed; use endpoint's db param instead 

285 "auth_method": "disabled", 

286 "request_id": getattr(request.state, "request_id", None), 

287 "team_id": getattr(request.state, "team_id", None), 

288 } 

289 

290 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authorization token required") 

291 

292 try: 

293 # Create credentials object if we got token from cookie 

294 if not credentials: 

295 credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token) 

296 

297 # Extract user from token using the email auth function 

298 # Pass request to get_current_user so plugins can store auth_method in request.state 

299 user = await get_current_user(credentials, request=request) 

300 

301 # Read auth_method and request_id from request.state 

302 # (auth_method set by plugin in get_current_user, request_id set by HTTP middleware) 

303 auth_method = getattr(request.state, "auth_method", None) 

304 request_id = getattr(request.state, "request_id", None) 

305 team_id = getattr(request.state, "team_id", None) 

306 

307 # Read plugin context data from request.state for cross-hook context sharing 

308 # (set by HttpAuthMiddleware for passing contexts between different hook types) 

309 plugin_context_table = getattr(request.state, "plugin_context_table", None) 

310 plugin_global_context = getattr(request.state, "plugin_global_context", None) 

311 

312 # Get token_use from request.state (set by get_current_user) 

313 token_use = getattr(request.state, "token_use", None) 

314 

315 # Add request context for permission auditing 

316 return { 

317 "email": user.email, 

318 "full_name": user.full_name, 

319 "is_admin": user.is_admin, 

320 "ip_address": request.client.host if request.client else None, 

321 "user_agent": request.headers.get("user-agent"), 

322 "db": None, # Session closed; use endpoint's db param instead 

323 "auth_method": auth_method, # Include auth_method from plugin 

324 "request_id": request_id, # Include request_id from middleware 

325 "team_id": team_id, # Include team_id from token 

326 "token_use": token_use, # Include token_use for RBAC team derivation 

327 "plugin_context_table": plugin_context_table, # Plugin contexts for cross-hook sharing 

328 "plugin_global_context": plugin_global_context, # Global context for consistency 

329 } 

330 except Exception as e: 

331 logger.error(f"Authentication failed: {type(e).__name__}: {e}") 

332 

333 # For browser requests (HTML Accept header or HTMX), redirect to login 

334 accept_header = request.headers.get("accept", "") 

335 is_htmx = request.headers.get("hx-request") == "true" 

336 if "text/html" in accept_header or is_htmx: 

337 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{settings.app_root_path}/admin/login"}) 

338 

339 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials") 

340 

341 

342# --- Team derivation helpers for multi-team session tokens --- 

343 

344 

345@functools.lru_cache(maxsize=1) 

346def _get_resource_param_to_model(): 

347 """Lazy-initialize the resource param to model mapping. 

348 

349 Returns: 

350 dict: Mapping of URL parameter names to SQLAlchemy model classes. 

351 """ 

352 # First-Party 

353 from mcpgateway.db import A2AAgent, Gateway, Prompt, Resource, Server, Tool # pylint: disable=import-outside-toplevel 

354 

355 return { 

356 "tool_id": Tool, 

357 "server_id": Server, 

358 "resource_id": Resource, 

359 "prompt_id": Prompt, 

360 "gateway_id": Gateway, 

361 "agent_id": A2AAgent, 

362 } 

363 

364 

365def _derive_team_from_resource(kwargs, db_session) -> Optional[str]: 

366 """Look up resource's team_id from DB for RBAC context (Tier 1). 

367 

368 For endpoints that target a specific resource (get, update, delete, execute), 

369 derive the team context from the resource's owner team. 

370 

371 Args: 

372 kwargs: Endpoint function kwargs containing resource ID params 

373 db_session: Active SQLAlchemy session 

374 

375 Returns: 

376 team_id string if found, None otherwise 

377 """ 

378 mapping = _get_resource_param_to_model() 

379 for param_name, model_cls in mapping.items(): 

380 resource_id = kwargs.get(param_name) 

381 if resource_id: 

382 try: 

383 resource = db_session.get(model_cls, resource_id) 

384 if resource: 

385 return getattr(resource, "team_id", None) 

386 except Exception: # nosec B110 - DB lookup failure falls through to None 

387 pass 

388 return None # Resource not found; let endpoint handle 404 

389 return None # No resource ID param 

390 

391 

392async def _derive_team_from_payload(kwargs) -> Optional[str]: 

393 """Extract team_id from create payload objects or form data (Tier 3). 

394 

395 For create endpoints, derive team context from the Pydantic payload or form data. 

396 

397 Args: 

398 kwargs: Endpoint function kwargs 

399 

400 Returns: 

401 team_id string if found, None otherwise 

402 """ 

403 # Try Pydantic payload objects (API endpoints) 

404 for param_name in ("gateway", "tool", "server", "resource", "prompt", "agent"): 

405 payload_obj = kwargs.get(param_name) 

406 if payload_obj and hasattr(payload_obj, "team_id"): 

407 tid = getattr(payload_obj, "team_id", None) 

408 if tid: 

409 return tid 

410 

411 # Try request form data (admin UI endpoints) 

412 # Note: use 'is not None' rather than truthiness check because some 

413 # objects (e.g. Pydantic models) may be truthy yet lack .headers. 

414 request = kwargs.get("request") 

415 if request is not None and isinstance(request, Request): 

416 content_type = request.headers.get("content-type", "") 

417 if "form" in content_type: 

418 try: 

419 form = await request.form() 

420 tid = form.get("team_id") 

421 if tid: 

422 return tid 

423 except Exception: # nosec B110 - Form parse failure is non-fatal 

424 pass 

425 

426 return None 

427 

428 

429# Permissions that indicate create/mutate operations (not safe for "any-team" aggregation) 

430_MUTATE_PERMISSION_ACTIONS = frozenset( 

431 { 

432 "create", 

433 "update", 

434 "delete", 

435 "execute", 

436 "invoke", 

437 "toggle", 

438 "set_state", 

439 "revoke", 

440 "manage_members", 

441 "join", 

442 "manage", 

443 "share", 

444 "invite", 

445 "use", 

446 } 

447) 

448 

449 

450def _is_mutate_permission(permission: str) -> bool: 

451 """Check if a permission string represents a mutate operation. 

452 

453 Handles both dot-separated (tools.create) and colon-separated 

454 (admin.sso_providers:create) permission formats. 

455 

456 Args: 

457 permission: Permission string like 'tools.create' or 'admin.sso_providers:create'. 

458 

459 Returns: 

460 bool: True if the permission's action component is a mutating operation. 

461 """ 

462 # Handle colon separator: admin.sso_providers:create → action is "create" 

463 if ":" in permission: 

464 action = permission.rsplit(":", 1)[-1] 

465 return action in _MUTATE_PERMISSION_ACTIONS 

466 parts = permission.split(".") 

467 return parts[-1] in _MUTATE_PERMISSION_ACTIONS if len(parts) >= 2 else False 

468 

469 

470def require_permission(permission: str, resource_type: Optional[str] = None, allow_admin_bypass: bool = True): 

471 """Decorator to require specific permission for accessing an endpoint. 

472 

473 Args: 

474 permission: Required permission (e.g., 'tools.create') 

475 resource_type: Optional resource type for resource-specific permissions 

476 allow_admin_bypass: If True (default), admin users bypass all permission checks. 

477 If False, even admins must have explicit permissions. 

478 Use False for admin UI routes to enforce granular RBAC. 

479 

480 Returns: 

481 Callable: Decorated function that enforces the permission requirement 

482 

483 Examples: 

484 >>> decorator = require_permission("tools.create", "tools") 

485 >>> callable(decorator) 

486 True 

487 

488 Execute wrapped function when permission granted: 

489 >>> import asyncio 

490 >>> class DummyPS: 

491 ... def __init__(self, db): 

492 ... pass 

493 ... async def check_permission(self, **kwargs): 

494 ... return True 

495 >>> @require_permission("tools.read") 

496 ... async def demo(user=None): 

497 ... return "ok" 

498 >>> from unittest.mock import patch 

499 >>> with patch('mcpgateway.middleware.rbac.PermissionService', DummyPS): 

500 ... asyncio.run(demo(user={"email": "u", "db": object()})) 

501 'ok' 

502 """ 

503 

504 def decorator(func: Callable) -> Callable: 

505 """Decorator function that wraps the original function with permission checking. 

506 

507 Args: 

508 func: The function to be decorated 

509 

510 Returns: 

511 Callable: The wrapped function with permission checking 

512 """ 

513 

514 @wraps(func) 

515 async def wrapper(*args, **kwargs): 

516 """Async wrapper function that performs permission check before calling original function. 

517 

518 Args: 

519 *args: Positional arguments passed to the wrapped function 

520 **kwargs: Keyword arguments passed to the wrapped function 

521 

522 Returns: 

523 Any: Result from the wrapped function if permission check passes 

524 

525 Raises: 

526 HTTPException: If user authentication or permission check fails 

527 """ 

528 # Extract user context from named kwargs only (security: avoid picking up request body dicts) 

529 user_context = kwargs.get("user") or kwargs.get("_user") or kwargs.get("current_user") or kwargs.get("current_user_ctx") 

530 if not user_context or not isinstance(user_context, dict) or "email" not in user_context: 

531 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User authentication required") 

532 

533 # Extract team_id from path parameters if available 

534 team_id = kwargs.get("team_id") 

535 

536 # If team_id is None or blank in kwargs then check 

537 if not team_id: 

538 # check if user_context has team_id 

539 team_id = user_context.get("team_id", None) 

540 

541 # For multi-team session tokens (team_id is None), derive team from context 

542 check_any_team = False 

543 if not team_id and user_context.get("token_use") == "session": 

544 db_session = kwargs.get("db") or user_context.get("db") 

545 if db_session: 

546 # Tier 1: Try to derive team from existing resource 

547 team_id = _derive_team_from_resource(kwargs, db_session) 

548 # Tier 3: Try to derive team from create payload / form 

549 if team_id is None: 

550 team_id = await _derive_team_from_payload(kwargs) 

551 # If still no team_id: Tier 2 for read/list, fail-closed-for-teams for mutate 

552 if not team_id: 

553 if _is_mutate_permission(permission): 

554 # Mutate without team context: proceed with team_id=None which 

555 # restricts RBAC to global + personal roles only. Team-scoped 

556 # roles cannot match (fail-closed for team grants), but global 

557 # roles (e.g. platform_admin) still work as intended. 

558 pass 

559 else: 

560 # List/read endpoint: check if user has permission in any team 

561 check_any_team = True 

562 

563 # First, check if any plugins want to handle permission checking 

564 # First-Party 

565 from mcpgateway.plugins.framework import get_plugin_manager, GlobalContext, HttpAuthCheckPermissionPayload, HttpHookType # pylint: disable=import-outside-toplevel 

566 

567 plugin_manager = get_plugin_manager() 

568 if plugin_manager and plugin_manager.has_hooks_for(HttpHookType.HTTP_AUTH_CHECK_PERMISSION): 

569 # Get plugin contexts from user_context (stored in request.state by HttpAuthMiddleware) 

570 # These enable cross-hook context sharing between HTTP_PRE_REQUEST and HTTP_AUTH_CHECK_PERMISSION 

571 plugin_context_table = user_context.get("plugin_context_table") 

572 plugin_global_context = user_context.get("plugin_global_context") 

573 

574 # Reuse existing global context from middleware if available for consistency 

575 # Otherwise create a new one (fallback for cases where middleware didn't run) 

576 if plugin_global_context: 

577 global_context = plugin_global_context 

578 else: 

579 request_id = user_context.get("request_id") or uuid.uuid4().hex 

580 global_context = GlobalContext( 

581 request_id=request_id, 

582 server_id=None, 

583 tenant_id=None, 

584 ) 

585 

586 # Invoke permission check hook, passing plugin contexts from HTTP_PRE_REQUEST hook 

587 result, _ = await plugin_manager.invoke_hook( 

588 HttpHookType.HTTP_AUTH_CHECK_PERMISSION, 

589 payload=HttpAuthCheckPermissionPayload( 

590 user_email=user_context["email"], 

591 permission=permission, 

592 resource_type=resource_type, 

593 team_id=team_id, 

594 is_admin=user_context.get("is_admin", False), 

595 auth_method=user_context.get("auth_method"), 

596 client_host=user_context.get("ip_address"), 

597 user_agent=user_context.get("user_agent"), 

598 ), 

599 global_context=global_context, 

600 local_contexts=plugin_context_table, # Pass context table for cross-hook state 

601 ) 

602 

603 # If a plugin made a decision, respect it 

604 if result and result.modified_payload: 

605 if result.modified_payload.granted: 

606 logger.info(f"Permission granted by plugin: user={user_context['email']}, " f"permission={permission}, reason={result.modified_payload.reason}") 

607 return await func(*args, **kwargs) 

608 logger.warning(f"Permission denied by plugin: user={user_context['email']}, " f"permission={permission}, reason={result.modified_payload.reason}") 

609 raise HTTPException( 

610 status_code=status.HTTP_403_FORBIDDEN, 

611 detail=f"Insufficient permissions. Required: {permission}", 

612 ) 

613 

614 # No plugin handled it, fall through to standard RBAC check 

615 # Get db session: prefer endpoint's db param, then user_context["db"], then create fresh 

616 db_session = kwargs.get("db") or user_context.get("db") 

617 if db_session: 

618 # Use existing session from endpoint or user_context 

619 permission_service = PermissionService(db_session) 

620 granted = await permission_service.check_permission( 

621 user_email=user_context["email"], 

622 permission=permission, 

623 resource_type=resource_type, 

624 team_id=team_id, 

625 ip_address=user_context.get("ip_address"), 

626 user_agent=user_context.get("user_agent"), 

627 allow_admin_bypass=allow_admin_bypass, 

628 check_any_team=check_any_team, 

629 ) 

630 else: 

631 # Create fresh db session for permission check 

632 with fresh_db_session() as db: 

633 permission_service = PermissionService(db) 

634 granted = await permission_service.check_permission( 

635 user_email=user_context["email"], 

636 permission=permission, 

637 resource_type=resource_type, 

638 team_id=team_id, 

639 ip_address=user_context.get("ip_address"), 

640 user_agent=user_context.get("user_agent"), 

641 allow_admin_bypass=allow_admin_bypass, 

642 check_any_team=check_any_team, 

643 ) 

644 

645 if not granted: 

646 logger.warning(f"Permission denied: user={user_context['email']}, permission={permission}, resource_type={resource_type}") 

647 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"Insufficient permissions. Required: {permission}") 

648 

649 # Permission granted, execute the original function 

650 return await func(*args, **kwargs) 

651 

652 return wrapper 

653 

654 return decorator 

655 

656 

657def require_admin_permission(): 

658 """Decorator to require admin permissions for accessing an endpoint. 

659 

660 Returns: 

661 Callable: Decorated function that enforces admin permission requirement 

662 

663 Examples: 

664 >>> decorator = require_admin_permission() 

665 >>> callable(decorator) 

666 True 

667 

668 Execute when admin permission granted: 

669 >>> import asyncio 

670 >>> class DummyPS: 

671 ... def __init__(self, db): 

672 ... pass 

673 ... async def check_admin_permission(self, email): 

674 ... return True 

675 >>> @require_admin_permission() 

676 ... async def demo(user=None): 

677 ... return "admin-ok" 

678 >>> from unittest.mock import patch 

679 >>> with patch('mcpgateway.middleware.rbac.PermissionService', DummyPS): 

680 ... asyncio.run(demo(user={"email": "u", "db": object()})) 

681 'admin-ok' 

682 """ 

683 

684 def decorator(func: Callable) -> Callable: 

685 """Decorator function that wraps the original function with admin permission checking. 

686 

687 Args: 

688 func: The function to be decorated 

689 

690 Returns: 

691 Callable: The wrapped function with admin permission checking 

692 """ 

693 

694 @wraps(func) 

695 async def wrapper(*args, **kwargs): 

696 """Async wrapper function that performs admin permission check before calling original function. 

697 

698 Args: 

699 *args: Positional arguments passed to the wrapped function 

700 **kwargs: Keyword arguments passed to the wrapped function 

701 

702 Returns: 

703 Any: Result from the wrapped function if admin permission check passes 

704 

705 Raises: 

706 HTTPException: If user authentication or admin permission check fails 

707 """ 

708 # Extract user context from named kwargs only (security: avoid picking up request body dicts) 

709 user_context = kwargs.get("user") or kwargs.get("_user") or kwargs.get("current_user") or kwargs.get("current_user_ctx") 

710 if not user_context or not isinstance(user_context, dict) or "email" not in user_context: 

711 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User authentication required") 

712 

713 # Get db session: prefer endpoint's db param, then user_context["db"], then create fresh 

714 db_session = kwargs.get("db") or user_context.get("db") 

715 if db_session: 

716 # Use existing session from endpoint or user_context 

717 permission_service = PermissionService(db_session) 

718 has_admin_permission = await permission_service.check_admin_permission(user_context["email"]) 

719 else: 

720 # Create fresh db session for permission check 

721 with fresh_db_session() as db: 

722 permission_service = PermissionService(db) 

723 has_admin_permission = await permission_service.check_admin_permission(user_context["email"]) 

724 

725 if not has_admin_permission: 

726 logger.warning(f"Admin permission denied: user={user_context['email']}") 

727 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin permissions required") 

728 

729 # Admin permission granted, execute the original function 

730 return await func(*args, **kwargs) 

731 

732 return wrapper 

733 

734 return decorator 

735 

736 

737def require_any_permission(permissions: List[str], resource_type: Optional[str] = None, allow_admin_bypass: bool = True): 

738 """Decorator to require any of the specified permissions for accessing an endpoint. 

739 

740 Args: 

741 permissions: List of permissions, user needs at least one 

742 resource_type: Optional resource type for resource-specific permissions 

743 allow_admin_bypass: If True (default), admin users bypass all permission checks. 

744 If False, even admins must have explicit permissions. 

745 

746 Returns: 

747 Callable: Decorated function that enforces the permission requirements 

748 

749 Examples: 

750 >>> decorator = require_any_permission(["tools.read", "tools.execute"], "tools") 

751 >>> callable(decorator) 

752 True 

753 

754 Execute when any permission granted: 

755 >>> import asyncio 

756 >>> class DummyPS: 

757 ... def __init__(self, db): 

758 ... pass 

759 ... async def check_permission(self, **kwargs): 

760 ... return True 

761 >>> @require_any_permission(["tools.read", "tools.execute"], "tools") 

762 ... async def demo(user=None): 

763 ... return "any-ok" 

764 >>> from unittest.mock import patch 

765 >>> with patch('mcpgateway.middleware.rbac.PermissionService', DummyPS): 

766 ... asyncio.run(demo(user={"email": "u", "db": object()})) 

767 'any-ok' 

768 """ 

769 

770 def decorator(func: Callable) -> Callable: 

771 """Decorator function that wraps the original function with any-permission checking. 

772 

773 Args: 

774 func: The function to be decorated 

775 

776 Returns: 

777 Callable: The wrapped function with any-permission checking 

778 """ 

779 

780 @wraps(func) 

781 async def wrapper(*args, **kwargs): 

782 """Async wrapper function that performs any-permission check before calling original function. 

783 

784 Args: 

785 *args: Positional arguments passed to the wrapped function 

786 **kwargs: Keyword arguments passed to the wrapped function 

787 

788 Returns: 

789 Any: Result from the wrapped function if any-permission check passes 

790 

791 Raises: 

792 HTTPException: If user authentication or any-permission check fails 

793 """ 

794 # Extract user context from named kwargs only (security: avoid picking up request body dicts) 

795 user_context = kwargs.get("user") or kwargs.get("_user") or kwargs.get("current_user") or kwargs.get("current_user_ctx") 

796 if not user_context or not isinstance(user_context, dict) or "email" not in user_context: 

797 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User authentication required") 

798 

799 # Extract team_id from path parameters if available 

800 team_id = kwargs.get("team_id") 

801 

802 # If team_id is None or blank in kwargs then check 

803 if not team_id: 

804 # check if user_context has team_id 

805 team_id = user_context.get("team_id", None) 

806 

807 # For multi-team session tokens (team_id is None), derive team from context 

808 check_any_team = False 

809 if not team_id and user_context.get("token_use") == "session": 

810 db_session = kwargs.get("db") or user_context.get("db") 

811 if db_session: 

812 # Tier 1: Try to derive team from existing resource 

813 team_id = _derive_team_from_resource(kwargs, db_session) 

814 # Tier 3: Try to derive team from create payload / form 

815 if team_id is None: 

816 team_id = await _derive_team_from_payload(kwargs) 

817 # If still no team_id: check if any permission is read-only 

818 if not team_id: 

819 # If ALL permissions are mutating, fail closed (team_id=None, global+personal only) 

820 # If ANY permission is non-mutating, use check_any_team for broader access 

821 if any(not _is_mutate_permission(p) for p in permissions): 

822 check_any_team = True 

823 

824 # Get db session: prefer endpoint's db param, then user_context["db"], then create fresh 

825 db_session = kwargs.get("db") or user_context.get("db") 

826 if db_session: 

827 # Use existing session from endpoint or user_context 

828 permission_service = PermissionService(db_session) 

829 # Check if user has any of the required permissions 

830 granted = False 

831 for permission in permissions: 

832 if await permission_service.check_permission( 

833 user_email=user_context["email"], 

834 permission=permission, 

835 resource_type=resource_type, 

836 team_id=team_id, 

837 ip_address=user_context.get("ip_address"), 

838 user_agent=user_context.get("user_agent"), 

839 allow_admin_bypass=allow_admin_bypass, 

840 check_any_team=check_any_team, 

841 ): 

842 granted = True 

843 break 

844 else: 

845 # Create fresh db session for permission check 

846 with fresh_db_session() as db: 

847 permission_service = PermissionService(db) 

848 # Check if user has any of the required permissions 

849 granted = False 

850 for permission in permissions: 

851 if await permission_service.check_permission( 

852 user_email=user_context["email"], 

853 permission=permission, 

854 resource_type=resource_type, 

855 team_id=team_id, 

856 ip_address=user_context.get("ip_address"), 

857 user_agent=user_context.get("user_agent"), 

858 allow_admin_bypass=allow_admin_bypass, 

859 check_any_team=check_any_team, 

860 ): 

861 granted = True 

862 break 

863 

864 if not granted: 

865 logger.warning(f"Permission denied: user={user_context['email']}, permissions={permissions}, resource_type={resource_type}") 

866 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"Insufficient permissions. Required one of: {', '.join(permissions)}") 

867 

868 # Permission granted, execute the original function 

869 return await func(*args, **kwargs) 

870 

871 return wrapper 

872 

873 return decorator 

874 

875 

876class PermissionChecker: 

877 """Context manager for manual permission checking. 

878 

879 Useful for complex permission logic that can't be handled by decorators. 

880 

881 Examples: 

882 >>> from unittest.mock import Mock 

883 >>> checker = PermissionChecker({"email": "user@example.com", "db": Mock()}) 

884 >>> hasattr(checker, 'has_permission') and hasattr(checker, 'has_admin_permission') 

885 True 

886 """ 

887 

888 def __init__(self, user_context: dict): 

889 """Initialize permission checker with user context. 

890 

891 Args: 

892 user_context: User context from get_current_user_with_permissions 

893 """ 

894 self.user_context = user_context 

895 self.db_session = user_context.get("db") 

896 

897 async def has_permission(self, permission: str, resource_type: Optional[str] = None, resource_id: Optional[str] = None, team_id: Optional[str] = None) -> bool: 

898 """Check if user has specific permission. 

899 

900 Args: 

901 permission: Permission to check 

902 resource_type: Optional resource type 

903 resource_id: Optional resource ID 

904 team_id: Optional team context 

905 

906 Returns: 

907 bool: True if user has permission 

908 """ 

909 if self.db_session: 

910 # Use existing session 

911 permission_service = PermissionService(self.db_session) 

912 return await permission_service.check_permission( 

913 user_email=self.user_context["email"], 

914 permission=permission, 

915 resource_type=resource_type, 

916 resource_id=resource_id, 

917 team_id=team_id, 

918 ip_address=self.user_context.get("ip_address"), 

919 user_agent=self.user_context.get("user_agent"), 

920 ) 

921 # Create fresh db session 

922 with fresh_db_session() as db: 

923 permission_service = PermissionService(db) 

924 return await permission_service.check_permission( 

925 user_email=self.user_context["email"], 

926 permission=permission, 

927 resource_type=resource_type, 

928 resource_id=resource_id, 

929 team_id=team_id, 

930 ip_address=self.user_context.get("ip_address"), 

931 user_agent=self.user_context.get("user_agent"), 

932 ) 

933 

934 async def has_admin_permission(self) -> bool: 

935 """Check if user has admin permissions. 

936 

937 Returns: 

938 bool: True if user has admin permissions 

939 """ 

940 if self.db_session: 

941 # Use existing session 

942 permission_service = PermissionService(self.db_session) 

943 return await permission_service.check_admin_permission(self.user_context["email"]) 

944 # Create fresh db session 

945 with fresh_db_session() as db: 

946 permission_service = PermissionService(db) 

947 return await permission_service.check_admin_permission(self.user_context["email"]) 

948 

949 async def has_any_permission(self, permissions: List[str], resource_type: Optional[str] = None, team_id: Optional[str] = None) -> bool: 

950 """Check if user has any of the specified permissions. 

951 

952 Args: 

953 permissions: List of permissions to check 

954 resource_type: Optional resource type 

955 team_id: Optional team context 

956 

957 Returns: 

958 bool: True if user has at least one permission 

959 """ 

960 if self.db_session: 

961 # Use existing session for all checks 

962 permission_service = PermissionService(self.db_session) 

963 for permission in permissions: 

964 if await permission_service.check_permission( 

965 user_email=self.user_context["email"], 

966 permission=permission, 

967 resource_type=resource_type, 

968 team_id=team_id, 

969 ip_address=self.user_context.get("ip_address"), 

970 user_agent=self.user_context.get("user_agent"), 

971 ): 

972 return True 

973 return False 

974 # Create single fresh session for all checks (avoid N sessions for N permissions) 

975 with fresh_db_session() as db: 

976 permission_service = PermissionService(db) 

977 for permission in permissions: 

978 if await permission_service.check_permission( 

979 user_email=self.user_context["email"], 

980 permission=permission, 

981 resource_type=resource_type, 

982 team_id=team_id, 

983 ip_address=self.user_context.get("ip_address"), 

984 user_agent=self.user_context.get("user_agent"), 

985 ): 

986 return True 

987 return False 

988 

989 async def require_permission(self, permission: str, resource_type: Optional[str] = None, resource_id: Optional[str] = None, team_id: Optional[str] = None) -> None: 

990 """Require specific permission, raise HTTPException if not granted. 

991 

992 Args: 

993 permission: Required permission 

994 resource_type: Optional resource type 

995 resource_id: Optional resource ID 

996 team_id: Optional team context 

997 

998 Raises: 

999 HTTPException: If permission is not granted 

1000 """ 

1001 if not await self.has_permission(permission, resource_type, resource_id, team_id): 

1002 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"Insufficient permissions. Required: {permission}")