Coverage for mcpgateway / middleware / token_scoping.py: 99%
524 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/middleware/token_scoping.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Token Scoping Middleware.
8This middleware enforces token scoping restrictions at the API level,
9including server_id restrictions, IP restrictions, permission checks,
10and time-based restrictions.
11"""
13# Standard
14from datetime import datetime, timedelta, timezone
15from functools import lru_cache
16import hashlib
17import hmac
18import ipaddress
19import re
20from typing import List, Optional, Pattern, Tuple
22# Third-Party
23from fastapi import HTTPException, Request, status
24from fastapi.security import HTTPBearer
25from sqlalchemy import and_, func, select
27# First-Party
28from mcpgateway.auth import normalize_token_teams, resolve_session_teams
29from mcpgateway.common.validators import SecurityValidator
30from mcpgateway.config import settings
31from mcpgateway.db import Permissions
32from mcpgateway.middleware.rbac import _ACCESS_DENIED_MSG
33from mcpgateway.services.logging_service import LoggingService
34from mcpgateway.utils.orjson_response import ORJSONResponse
35from mcpgateway.utils.verify_credentials import verify_jwt_token_cached
37# Security scheme
38bearer_scheme = HTTPBearer(auto_error=False)
40# Initialize logging service first
41logging_service = LoggingService()
42logger = logging_service.get_logger(__name__)
44# ============================================================================
45# Precompiled regex patterns (compiled once at module load for performance)
46# ============================================================================
48# Server path extraction patterns
49_SERVER_PATH_PATTERNS: List[Pattern[str]] = [
50 re.compile(r"^/servers/([^/]+)(?:$|/)"),
51 re.compile(r"^/sse/([^/?]+)(?:$|\?)"),
52 re.compile(r"^/ws/([^/?]+)(?:$|\?)"),
53]
55# Resource ID extraction patterns (IDs are UUID hex strings)
56_RESOURCE_PATTERNS: List[Tuple[Pattern[str], str]] = [
57 (re.compile(r"/servers/?([a-f0-9\-]+)"), "server"),
58 (re.compile(r"/tools/?([a-f0-9\-]+)"), "tool"),
59 (re.compile(r"/resources/?([a-f0-9\-]+)"), "resource"),
60 (re.compile(r"/prompts/?([a-f0-9\-]+)"), "prompt"),
61 (re.compile(r"/gateways/?([a-f0-9\-]+)"), "gateway"),
62]
63_AUTH_COOKIE_NAMES = ("jwt_token", "access_token")
64_INTERNAL_MCP_PATH_PREFIX = "/_internal/mcp"
65_INTERNAL_MCP_RUNTIME_HEADER = "x-contextforge-mcp-runtime"
66_INTERNAL_MCP_AUTH_CONTEXT_HEADER = "x-contextforge-auth-context"
67_INTERNAL_MCP_RUNTIME_AUTH_HEADER = "x-contextforge-mcp-runtime-auth"
68_INTERNAL_MCP_RUNTIME_AUTH_CONTEXT = "contextforge-internal-mcp-runtime-v1"
70# Permission map with precompiled patterns
71# Maps (HTTP method, path pattern) to required permission
72_PERMISSION_PATTERNS: List[Tuple[str, Pattern[str], str]] = [
73 # Tools permissions
74 ("GET", re.compile(r"^/tools(?:$|/)"), Permissions.TOOLS_READ),
75 ("POST", re.compile(r"^/tools/?$"), Permissions.TOOLS_CREATE), # Only exact /tools or /tools/
76 ("POST", re.compile(r"^/tools/[^/]+/"), Permissions.TOOLS_UPDATE), # POST to sub-resources (state, toggle)
77 ("PUT", re.compile(r"^/tools/[^/]+(?:$|/)"), Permissions.TOOLS_UPDATE),
78 ("DELETE", re.compile(r"^/tools/[^/]+(?:$|/)"), Permissions.TOOLS_DELETE),
79 ("GET", re.compile(r"^/servers/[^/]+/tools(?:$|/)"), Permissions.TOOLS_READ),
80 ("POST", re.compile(r"^/servers/[^/]+/tools/[^/]+/call(?:$|/)"), Permissions.TOOLS_EXECUTE),
81 # JSON-RPC endpoint — multiplexes tools/call, resources/list, initialize, etc.
82 # Fine-grained per-method RBAC is enforced downstream by _ensure_rpc_permission();
83 # the middleware only gates transport-level access via servers.use.
84 ("POST", re.compile(r"^/rpc(?:$|/)"), Permissions.SERVERS_USE),
85 # SSE transport — like /rpc and /mcp, this is a transport-level endpoint;
86 # the handler's own @require_permission enforces fine-grained RBAC.
87 ("GET", re.compile(r"^/sse(?:$|/)"), Permissions.SERVERS_USE),
88 # Streamable HTTP MCP transport (POST=send, GET=SSE stream, DELETE=session termination)
89 ("POST", re.compile(r"^/mcp(?:$|/)"), Permissions.SERVERS_USE),
90 ("GET", re.compile(r"^/mcp(?:$|/)"), Permissions.SERVERS_USE),
91 ("DELETE", re.compile(r"^/mcp(?:$|/)"), Permissions.SERVERS_USE),
92 # Resources permissions
93 ("GET", re.compile(r"^/resources(?:$|/)"), Permissions.RESOURCES_READ),
94 ("POST", re.compile(r"^/resources/?$"), Permissions.RESOURCES_CREATE), # Only exact /resources or /resources/
95 ("POST", re.compile(r"^/resources/subscribe(?:$|/)"), Permissions.RESOURCES_READ), # SSE subscription
96 ("POST", re.compile(r"^/resources/[^/]+/"), Permissions.RESOURCES_UPDATE), # POST to sub-resources (state, toggle)
97 ("PUT", re.compile(r"^/resources/[^/]+(?:$|/)"), Permissions.RESOURCES_UPDATE),
98 ("DELETE", re.compile(r"^/resources/[^/]+(?:$|/)"), Permissions.RESOURCES_DELETE),
99 ("GET", re.compile(r"^/servers/[^/]+/resources(?:$|/)"), Permissions.RESOURCES_READ),
100 # Prompts permissions
101 ("GET", re.compile(r"^/prompts(?:$|/)"), Permissions.PROMPTS_READ),
102 ("POST", re.compile(r"^/prompts/?$"), Permissions.PROMPTS_CREATE), # Only exact /prompts or /prompts/
103 ("POST", re.compile(r"^/prompts/[^/]+/"), Permissions.PROMPTS_UPDATE), # POST to sub-resources (state, toggle)
104 ("POST", re.compile(r"^/prompts/[^/]+$"), Permissions.PROMPTS_READ), # MCP spec prompt retrieval (POST /prompts/{id})
105 ("PUT", re.compile(r"^/prompts/[^/]+(?:$|/)"), Permissions.PROMPTS_UPDATE),
106 ("DELETE", re.compile(r"^/prompts/[^/]+(?:$|/)"), Permissions.PROMPTS_DELETE),
107 # Server management permissions
108 ("GET", re.compile(r"^/servers/[^/]+/sse(?:$|/)"), Permissions.SERVERS_USE), # Server SSE access endpoint
109 ("GET", re.compile(r"^/servers(?:$|/)"), Permissions.SERVERS_READ),
110 ("POST", re.compile(r"^/servers/?$"), Permissions.SERVERS_CREATE), # Only exact /servers or /servers/
111 ("POST", re.compile(r"^/servers/[^/]+/(?:state|toggle)(?:$|/)"), Permissions.SERVERS_UPDATE), # Server management sub-resources
112 ("POST", re.compile(r"^/servers/[^/]+/message(?:$|/)"), Permissions.SERVERS_USE), # Server message access endpoint
113 ("POST", re.compile(r"^/servers/[^/]+/mcp(?:$|/)"), Permissions.SERVERS_USE), # Server MCP access endpoint
114 ("PUT", re.compile(r"^/servers/[^/]+(?:$|/)"), Permissions.SERVERS_UPDATE),
115 ("DELETE", re.compile(r"^/servers/[^/]+(?:$|/)"), Permissions.SERVERS_DELETE),
116 # Gateway permissions
117 ("GET", re.compile(r"^/gateways(?:$|/)"), Permissions.GATEWAYS_READ),
118 ("POST", re.compile(r"^/gateways/?$"), Permissions.GATEWAYS_CREATE), # Only exact /gateways or /gateways/
119 ("POST", re.compile(r"^/gateways/[^/]+/"), Permissions.GATEWAYS_UPDATE), # POST to sub-resources (state, toggle, refresh)
120 ("PUT", re.compile(r"^/gateways/[^/]+(?:$|/)"), Permissions.GATEWAYS_UPDATE),
121 ("DELETE", re.compile(r"^/gateways/[^/]+(?:$|/)"), Permissions.GATEWAYS_DELETE),
122 # Metrics permissions
123 ("GET", re.compile(r"^/metrics(?:$|/)"), Permissions.ADMIN_METRICS),
124 ("POST", re.compile(r"^/metrics/reset(?:$|/)"), Permissions.ADMIN_METRICS),
125 # Token permissions
126 ("GET", re.compile(r"^/tokens(?:$|/)"), Permissions.TOKENS_READ),
127 ("POST", re.compile(r"^/tokens/?$"), Permissions.TOKENS_CREATE), # Only exact /tokens or /tokens/
128 ("POST", re.compile(r"^/tokens/teams/[^/]+(?:$|/)"), Permissions.TOKENS_CREATE),
129 ("PUT", re.compile(r"^/tokens/[^/]+(?:$|/)"), Permissions.TOKENS_UPDATE),
130 ("DELETE", re.compile(r"^/tokens/[^/]+(?:$|/)"), Permissions.TOKENS_REVOKE),
131]
133# Admin route permission map (granular by route group).
134# IMPORTANT: Unmatched /admin/* paths are denied by default (fail-secure).
135_ADMIN_PERMISSION_PATTERNS: List[Tuple[str, Pattern[str], str]] = [
136 # Dashboard/overview surfaces
137 ("GET", re.compile(r"^/admin/?$"), Permissions.ADMIN_DASHBOARD),
138 ("GET", re.compile(r"^/admin/search(?:$|/)"), Permissions.ADMIN_DASHBOARD),
139 ("GET", re.compile(r"^/admin/overview(?:$|/)"), Permissions.ADMIN_OVERVIEW),
140 # User management
141 ("GET", re.compile(r"^/admin/users(?:$|/)"), Permissions.ADMIN_USER_MANAGEMENT),
142 ("POST", re.compile(r"^/admin/users(?:$|/)"), Permissions.ADMIN_USER_MANAGEMENT),
143 ("DELETE", re.compile(r"^/admin/users(?:$|/)"), Permissions.ADMIN_USER_MANAGEMENT),
144 # Team management
145 ("POST", re.compile(r"^/admin/teams/?$"), Permissions.TEAMS_CREATE),
146 ("DELETE", re.compile(r"^/admin/teams/[^/]+/join-request/[^/]+(?:$|/)"), Permissions.TEAMS_JOIN),
147 ("DELETE", re.compile(r"^/admin/teams/[^/]+(?:$|/)"), Permissions.TEAMS_DELETE),
148 ("GET", re.compile(r"^/admin/teams/[^/]+/edit(?:$|/)"), Permissions.TEAMS_UPDATE),
149 ("POST", re.compile(r"^/admin/teams/[^/]+/update(?:$|/)"), Permissions.TEAMS_UPDATE),
150 ("GET", re.compile(r"^/admin/teams/[^/]+/(?:members/add|members/partial|non-members/partial|join-requests)(?:$|/)"), Permissions.TEAMS_MANAGE_MEMBERS),
151 ("POST", re.compile(r"^/admin/teams/[^/]+/(?:add-member|update-member-role|remove-member|join-requests/[^/]+/(?:approve|reject))(?:$|/)"), Permissions.TEAMS_MANAGE_MEMBERS),
152 ("POST", re.compile(r"^/admin/teams/[^/]+/(?:leave|join-request(?:/[^/]+)?)(?:$|/)"), Permissions.TEAMS_JOIN),
153 ("GET", re.compile(r"^/admin/teams(?:$|/)"), Permissions.TEAMS_READ),
154 # Tool management
155 ("POST", re.compile(r"^/admin/tools/?$"), Permissions.TOOLS_CREATE),
156 ("POST", re.compile(r"^/admin/tools/import(?:$|/)"), Permissions.TOOLS_CREATE),
157 ("POST", re.compile(r"^/admin/tools/[^/]+/delete(?:$|/)"), Permissions.TOOLS_DELETE),
158 ("POST", re.compile(r"^/admin/tools/[^/]+/(?:edit|state)(?:$|/)"), Permissions.TOOLS_UPDATE),
159 ("GET", re.compile(r"^/admin/tools(?:$|/)"), Permissions.TOOLS_READ),
160 # Resource management
161 ("POST", re.compile(r"^/admin/resources/?$"), Permissions.RESOURCES_CREATE),
162 ("POST", re.compile(r"^/admin/resources/[^/]+/delete(?:$|/)"), Permissions.RESOURCES_DELETE),
163 ("POST", re.compile(r"^/admin/resources/[^/]+/(?:edit|state)(?:$|/)"), Permissions.RESOURCES_UPDATE),
164 ("GET", re.compile(r"^/admin/resources(?:$|/)"), Permissions.RESOURCES_READ),
165 # Prompt management
166 ("POST", re.compile(r"^/admin/prompts/?$"), Permissions.PROMPTS_CREATE),
167 ("POST", re.compile(r"^/admin/prompts/[^/]+/delete(?:$|/)"), Permissions.PROMPTS_DELETE),
168 ("POST", re.compile(r"^/admin/prompts/[^/]+/(?:edit|state)(?:$|/)"), Permissions.PROMPTS_UPDATE),
169 ("GET", re.compile(r"^/admin/prompts(?:$|/)"), Permissions.PROMPTS_READ),
170 # Gateway management
171 ("POST", re.compile(r"^/admin/gateways/test(?:$|/)"), Permissions.GATEWAYS_READ),
172 ("POST", re.compile(r"^/admin/gateways/?$"), Permissions.GATEWAYS_CREATE),
173 ("POST", re.compile(r"^/admin/gateways/[^/]+/delete(?:$|/)"), Permissions.GATEWAYS_DELETE),
174 ("POST", re.compile(r"^/admin/gateways/[^/]+/(?:edit|state)(?:$|/)"), Permissions.GATEWAYS_UPDATE),
175 ("GET", re.compile(r"^/admin/gateways(?:$|/)"), Permissions.GATEWAYS_READ),
176 # Server management
177 ("POST", re.compile(r"^/admin/servers/?$"), Permissions.SERVERS_CREATE),
178 ("POST", re.compile(r"^/admin/servers/[^/]+/delete(?:$|/)"), Permissions.SERVERS_DELETE),
179 ("POST", re.compile(r"^/admin/servers/[^/]+/(?:edit|state)(?:$|/)"), Permissions.SERVERS_UPDATE),
180 ("GET", re.compile(r"^/admin/servers(?:$|/)"), Permissions.SERVERS_READ),
181 # Token/tag read surfaces
182 ("GET", re.compile(r"^/admin/tokens(?:$|/)"), Permissions.TOKENS_READ),
183 ("GET", re.compile(r"^/admin/tags(?:$|/)"), Permissions.TAGS_READ),
184 # A2A management
185 ("POST", re.compile(r"^/admin/a2a/?$"), Permissions.A2A_CREATE),
186 ("POST", re.compile(r"^/admin/a2a/[^/]+/delete(?:$|/)"), Permissions.A2A_DELETE),
187 ("POST", re.compile(r"^/admin/a2a/[^/]+/(?:edit|state)(?:$|/)"), Permissions.A2A_UPDATE),
188 ("POST", re.compile(r"^/admin/a2a/[^/]+/test(?:$|/)"), Permissions.A2A_INVOKE),
189 ("GET", re.compile(r"^/admin/a2a(?:$|/)"), Permissions.A2A_READ),
190 # Section partials
191 ("GET", re.compile(r"^/admin/sections/resources(?:$|/)"), Permissions.RESOURCES_READ),
192 ("GET", re.compile(r"^/admin/sections/prompts(?:$|/)"), Permissions.PROMPTS_READ),
193 ("GET", re.compile(r"^/admin/sections/servers(?:$|/)"), Permissions.SERVERS_READ),
194 ("GET", re.compile(r"^/admin/sections/gateways(?:$|/)"), Permissions.GATEWAYS_READ),
195 # Specialized admin domains
196 ("GET", re.compile(r"^/admin/events(?:$|/)"), Permissions.ADMIN_EVENTS),
197 ("GET", re.compile(r"^/admin/grpc(?:$|/)"), Permissions.ADMIN_GRPC),
198 ("POST", re.compile(r"^/admin/grpc(?:$|/)"), Permissions.ADMIN_GRPC),
199 ("PUT", re.compile(r"^/admin/grpc(?:$|/)"), Permissions.ADMIN_GRPC),
200 ("GET", re.compile(r"^/admin/plugins(?:$|/)"), Permissions.ADMIN_PLUGINS),
201 ("POST", re.compile(r"^/admin/plugins(?:$|/)"), Permissions.ADMIN_PLUGINS),
202 ("PUT", re.compile(r"^/admin/plugins(?:$|/)"), Permissions.ADMIN_PLUGINS),
203 ("DELETE", re.compile(r"^/admin/plugins(?:$|/)"), Permissions.ADMIN_PLUGINS),
204 # System configuration/admin operations
205 (
206 "GET",
207 re.compile(r"^/admin/(?:config|cache|mcp-pool|roots|metrics|logs|export|import|mcp-registry|system|support-bundle|maintenance|observability|performance|llm)(?:$|/)"),
208 Permissions.ADMIN_SYSTEM_CONFIG,
209 ),
210 (
211 "POST",
212 re.compile(r"^/admin/(?:config|cache|mcp-pool|roots|metrics|logs|export|import|mcp-registry|system|support-bundle|maintenance|observability|performance|llm)(?:$|/)"),
213 Permissions.ADMIN_SYSTEM_CONFIG,
214 ),
215 (
216 "PUT",
217 re.compile(r"^/admin/(?:config|cache|mcp-pool|roots|metrics|logs|export|import|mcp-registry|system|support-bundle|maintenance|observability|performance|llm)(?:$|/)"),
218 Permissions.ADMIN_SYSTEM_CONFIG,
219 ),
220 (
221 "DELETE",
222 re.compile(r"^/admin/(?:config|cache|mcp-pool|roots|metrics|logs|export|import|mcp-registry|system|support-bundle|maintenance|observability|performance|llm)(?:$|/)"),
223 Permissions.ADMIN_SYSTEM_CONFIG,
224 ),
225]
228def _normalize_llm_api_prefix(prefix: Optional[str]) -> str:
229 """Normalize llm_api_prefix to a canonical path prefix.
231 Args:
232 prefix: Raw LLM API prefix setting value.
234 Returns:
235 str: Normalized path prefix, or empty string when prefix is empty or "/".
236 """
237 if not prefix:
238 return ""
239 normalized = "/" + str(prefix).strip().strip("/")
240 return "" if normalized == "/" else normalized
243def _normalize_scope_path(scope_path: str, root_path: str) -> str:
244 """Strip ``root_path`` from ``scope_path`` when the incoming path includes it.
246 Args:
247 scope_path: Request path observed by middleware.
248 root_path: Application root path prefix, if configured.
250 Returns:
251 Path value normalized for permission and scope pattern matching.
252 """
253 if root_path and len(root_path) > 1:
254 root_path = root_path.rstrip("/")
255 if root_path and len(root_path) > 1 and scope_path.startswith(root_path):
256 rest = scope_path[len(root_path) :]
257 # root_path="/app" must not strip from "/application/..."
258 if rest == "" or rest.startswith("/"):
259 return rest or "/"
260 return scope_path
263@lru_cache(maxsize=16)
264def _get_llm_permission_patterns(prefix: str) -> Tuple[Tuple[str, Pattern[str], str], ...]:
265 """Build precompiled permission patterns for LLM proxy endpoints.
267 Args:
268 prefix: LLM API prefix used to mount proxy routes.
270 Returns:
271 Tuple[Tuple[str, Pattern[str], str], ...]: Method/path regex to required permission mappings.
272 """
273 normalized_prefix = _normalize_llm_api_prefix(prefix)
274 escaped_prefix = re.escape(normalized_prefix)
275 return (
276 # LLM proxy routes are exact endpoints (optionally with a trailing slash),
277 # unlike many REST resources that intentionally include sub-resources.
278 ("POST", re.compile(rf"^{escaped_prefix}/chat/completions/?$"), Permissions.LLM_INVOKE),
279 ("GET", re.compile(rf"^{escaped_prefix}/models/?$"), Permissions.LLM_READ),
280 )
283class TokenScopingMiddleware:
284 """Middleware to enforce token scoping restrictions.
286 Examples:
287 >>> middleware = TokenScopingMiddleware()
288 >>> isinstance(middleware, TokenScopingMiddleware)
289 True
290 """
292 def __init__(self):
293 """Initialize token scoping middleware.
295 Examples:
296 >>> middleware = TokenScopingMiddleware()
297 >>> hasattr(middleware, '_extract_token_scopes')
298 True
299 """
301 def _normalize_teams(self, teams) -> list:
302 """Normalize teams from token payload to list of team IDs.
304 Handles various team formats:
305 - None -> []
306 - List of strings -> as-is
307 - List of dicts with 'id' key -> extract IDs
309 Args:
310 teams: Raw teams value from JWT payload
312 Returns:
313 List of team ID strings
314 """
315 if not teams:
316 return []
317 normalized = []
318 for team in teams:
319 if isinstance(team, dict):
320 team_id = team.get("id")
321 if team_id:
322 normalized.append(team_id)
323 elif isinstance(team, str):
324 normalized.append(team)
325 return normalized
327 def _normalize_path_for_matching(self, request_path: str) -> str:
328 """Normalize a path for team scoping and permission matching.
330 Args:
331 request_path: Raw request path.
333 Returns:
334 Normalized absolute path suitable for route matching.
335 """
336 normalized = _normalize_scope_path(request_path or "/", settings.app_root_path or "")
337 if not normalized.startswith("/"):
338 return f"/{normalized}"
339 return normalized
341 def _get_normalized_request_path(self, request: Request) -> str:
342 """Resolve request path with APP_ROOT_PATH-aware normalization.
344 Args:
345 request: Request object containing scope and URL data.
347 Returns:
348 Normalized request path suitable for permission checks.
349 """
350 scope = getattr(request, "scope", {}) or {}
351 if not isinstance(scope, dict):
352 scope = {}
353 scope_path = request.url.path or scope.get("path") or "/"
354 root_path = scope.get("root_path") or settings.app_root_path or ""
355 normalized = _normalize_scope_path(scope_path, root_path)
356 if not normalized.startswith("/"):
357 return f"/{normalized}"
358 return normalized
360 def _extract_jwt_token_from_request(self, request: Request) -> Optional[str]:
361 """Extract JWT token from supported cookie names or Bearer auth header.
363 Args:
364 request: Request object carrying cookies and headers.
366 Returns:
367 JWT string when present and validly formatted; otherwise ``None``.
368 """
369 cookies = getattr(request, "cookies", None)
370 if cookies and hasattr(cookies, "get"):
371 for cookie_name in _AUTH_COOKIE_NAMES:
372 cookie_token = cookies.get(cookie_name)
373 if isinstance(cookie_token, str) and cookie_token.strip():
374 return cookie_token.strip()
376 # Get authorization header and parse bearer scheme case-insensitively.
377 auth_header = request.headers.get("Authorization")
378 if not auth_header:
379 return None
381 parts = auth_header.split(" ", 1)
382 if len(parts) != 2 or parts[0].lower() != "bearer":
383 return None
385 token = parts[1].strip()
386 return token or None
388 async def _extract_token_scopes(self, request: Request) -> Optional[dict]:
389 """Extract token scopes from JWT in request.
391 Args:
392 request: FastAPI request object
394 Returns:
395 Dict containing token scopes or None if no valid token
396 """
397 token = self._extract_jwt_token_from_request(request)
398 if not token:
399 return None
401 try:
402 # Use the centralized verify_jwt_token_cached function for consistent JWT validation
403 payload = await verify_jwt_token_cached(token, request)
404 return payload
405 except HTTPException:
406 # Token validation failed (expired, invalid, etc.)
407 return None
408 except Exception:
409 # Any other error in token validation
410 return None
412 def _get_client_ip(self, request: Request) -> str:
413 """Extract client IP address from request.
415 Only trusts X-Forwarded-For / X-Real-IP headers when a trusted proxy
416 configuration is in place (ProxyHeadersMiddleware with specific hosts).
417 Otherwise, uses the direct connection IP to prevent header spoofing.
419 Args:
420 request: FastAPI request object
422 Returns:
423 str: Client IP address
424 """
425 # Use direct client IP as the secure default.
426 # Proxy headers are only trustworthy when Uvicorn/Starlette's
427 # ProxyHeadersMiddleware has already rewritten request.client from a
428 # trusted upstream. That middleware replaces request.client.host with
429 # the real client IP, so we can rely on it directly.
430 return request.client.host if request.client else "unknown"
432 def _check_ip_restrictions(self, client_ip: str, ip_restrictions: list) -> bool:
433 """Check if client IP is allowed by restrictions.
435 Args:
436 client_ip: Client's IP address
437 ip_restrictions: List of allowed IP addresses/CIDR ranges
439 Returns:
440 bool: True if IP is allowed, False otherwise
442 Examples:
443 Allow specific IP:
444 >>> m = TokenScopingMiddleware()
445 >>> m._check_ip_restrictions('192.168.1.10', ['192.168.1.10'])
446 True
448 Allow CIDR range:
449 >>> m._check_ip_restrictions('10.0.0.5', ['10.0.0.0/24'])
450 True
452 Deny when not in list:
453 >>> m._check_ip_restrictions('10.0.1.5', ['10.0.0.0/24'])
454 False
456 Empty restrictions allow all:
457 >>> m._check_ip_restrictions('203.0.113.1', [])
458 True
459 """
460 if not ip_restrictions:
461 return True # No restrictions
463 try:
464 client_ip_obj = ipaddress.ip_address(client_ip)
466 for restriction in ip_restrictions:
467 try:
468 # Check if it's a CIDR range
469 if "/" in restriction:
470 network = ipaddress.ip_network(restriction, strict=False)
471 if client_ip_obj in network:
472 return True
473 else:
474 # Single IP address
475 if client_ip_obj == ipaddress.ip_address(restriction):
476 return True
477 except (ValueError, ipaddress.AddressValueError):
478 continue
480 except (ValueError, ipaddress.AddressValueError):
481 return False
483 return False
485 def _check_time_restrictions(self, time_restrictions: dict) -> bool:
486 """Check if current time is allowed by restrictions.
488 Args:
489 time_restrictions: Dict containing time-based restrictions
491 Returns:
492 bool: True if current time is allowed, False otherwise
494 Examples:
495 No restrictions allow access:
496 >>> m = TokenScopingMiddleware()
497 >>> m._check_time_restrictions({})
498 True
500 Weekdays only: result depends on current weekday (always bool):
501 >>> isinstance(m._check_time_restrictions({'weekdays_only': True}), bool)
502 True
504 Business hours only: result depends on current hour (always bool):
505 >>> isinstance(m._check_time_restrictions({'business_hours_only': True}), bool)
506 True
507 """
508 if not time_restrictions:
509 return True # No restrictions
511 now = datetime.now(tz=timezone.utc)
513 # Check business hours restriction
514 if time_restrictions.get("business_hours_only"):
515 # Assume business hours are 9 AM to 5 PM UTC
516 # This could be made configurable
517 if not 9 <= now.hour < 17:
518 return False
520 # Check day of week restrictions
521 weekdays_only = time_restrictions.get("weekdays_only")
522 if weekdays_only and now.weekday() >= 5: # Saturday=5, Sunday=6
523 return False
525 return True
527 @staticmethod
528 def _parse_positive_limit(value: object) -> Optional[int]:
529 """Parse usage-limit values as positive integers.
531 Args:
532 value: Candidate limit value from token scope configuration.
534 Returns:
535 Parsed positive integer limit, or ``None`` when invalid/non-positive.
536 """
537 try:
538 parsed = int(value)
539 except (TypeError, ValueError):
540 return None
541 return parsed if parsed > 0 else None
543 def _check_usage_limits(self, jti: Optional[str], usage_limits: dict) -> Tuple[bool, Optional[str]]:
544 """Check token usage limits against recorded usage logs.
546 Args:
547 jti: Token JTI identifier.
548 usage_limits: Usage limits from token scope.
550 Returns:
551 Tuple[bool, Optional[str]]: (allowed, denial_reason)
552 """
553 if not isinstance(usage_limits, dict) or not usage_limits or not jti:
554 return True, None
556 requests_per_hour = self._parse_positive_limit(usage_limits.get("requests_per_hour"))
557 requests_per_day = self._parse_positive_limit(usage_limits.get("requests_per_day"))
559 if not requests_per_hour and not requests_per_day:
560 return True, None
562 # First-Party
563 from mcpgateway.db import get_db, TokenUsageLog # pylint: disable=import-outside-toplevel
565 db = next(get_db())
566 try:
567 now = datetime.now(timezone.utc)
569 if requests_per_hour:
570 hour_window_start = now - timedelta(hours=1)
571 hourly_count = db.execute(
572 # Pylint false-positive: SQLAlchemy func namespace is callable at runtime.
573 # pylint: disable=not-callable
574 select(func.count(TokenUsageLog.id)).where(and_(TokenUsageLog.token_jti == jti, TokenUsageLog.timestamp >= hour_window_start))
575 ).scalar()
576 if int(hourly_count or 0) >= requests_per_hour:
577 return False, "Hourly request limit exceeded"
579 if requests_per_day:
580 day_window_start = now - timedelta(days=1)
581 daily_count = db.execute(
582 # Pylint false-positive: SQLAlchemy func namespace is callable at runtime.
583 # pylint: disable=not-callable
584 select(func.count(TokenUsageLog.id)).where(and_(TokenUsageLog.token_jti == jti, TokenUsageLog.timestamp >= day_window_start))
585 ).scalar()
586 if int(daily_count or 0) >= requests_per_day:
587 return False, "Daily request limit exceeded"
588 except Exception as exc:
589 logger.warning("Failed to evaluate token usage limits for jti %s: %s", jti, exc)
590 return True, None
591 finally:
592 try:
593 db.rollback()
594 finally:
595 db.close()
597 return True, None
599 def _check_server_restriction(self, request_path: str, server_id: Optional[str]) -> bool:
600 """Check if request path matches server restriction.
602 Args:
603 request_path: The request path/URL
604 server_id: Required server ID (None means no restriction)
606 Returns:
607 bool: True if request is allowed, False otherwise
609 Examples:
610 Match server paths:
611 >>> m = TokenScopingMiddleware()
612 >>> m._check_server_restriction('/servers/abc/tools', 'abc')
613 True
614 >>> m._check_server_restriction('/sse/xyz', 'xyz')
615 True
616 >>> m._check_server_restriction('/ws/xyz?x=1', 'xyz')
617 True
619 Mismatch denies:
620 >>> m._check_server_restriction('/servers/def', 'abc')
621 False
623 General endpoints allowed:
624 >>> m._check_server_restriction('/health', 'abc')
625 True
626 >>> m._check_server_restriction('/', 'abc')
627 True
628 """
629 request_path = self._normalize_path_for_matching(request_path)
631 if not server_id:
632 return True # No server restriction
634 # Extract server ID from path patterns (uses precompiled regex)
635 # /servers/{server_id}/...
636 # /sse/{server_id}
637 # /ws/{server_id}
638 for pattern in _SERVER_PATH_PATTERNS:
639 match = pattern.search(request_path)
640 if match:
641 path_server_id = match.group(1)
642 return path_server_id == server_id
644 # If no server ID found in path, allow general endpoints
645 general_endpoints = ["/health", "/metrics", "/openapi.json", "/docs", "/redoc", "/rpc", "/mcp", "/sse"]
647 # Check exact root path separately
648 if request_path == "/":
649 return True
651 for endpoint in general_endpoints:
652 if request_path.startswith(endpoint):
653 return True
655 # Default deny for unmatched paths with server restrictions
656 return False
658 def _check_permission_restrictions(self, request_path: str, request_method: str, permissions: list) -> bool:
659 """Check if request is allowed by permission restrictions.
661 Args:
662 request_path: The request path/URL
663 request_method: HTTP method (GET, POST, etc.)
664 permissions: List of allowed permissions
666 Returns:
667 bool: True if request is allowed, False otherwise
669 Examples:
670 Wildcard allows all:
671 >>> m = TokenScopingMiddleware()
672 >>> m._check_permission_restrictions('/tools', 'GET', ['*'])
673 True
675 Requires specific permission:
676 >>> m._check_permission_restrictions('/tools', 'POST', ['tools.create'])
677 True
678 >>> m._check_permission_restrictions('/tools/xyz', 'PUT', ['tools.update'])
679 True
680 >>> m._check_permission_restrictions('/resources', 'GET', ['resources.read'])
681 True
682 >>> m._check_permission_restrictions('/servers/s1/tools/abc/call', 'POST', ['tools.execute'])
683 True
685 Missing permission denies:
686 >>> m._check_permission_restrictions('/tools', 'POST', ['tools.read'])
687 False
688 """
689 request_path = self._normalize_path_for_matching(request_path)
691 if not permissions or "*" in permissions:
692 return True # No restrictions or full access
694 # Handle admin routes with granular route-group mapping.
695 # Unmapped /admin/* paths are denied by default (fail-secure).
696 if request_path.startswith("/admin"):
697 for method, path_pattern, required_permission in _ADMIN_PERMISSION_PATTERNS:
698 if request_method == method and path_pattern.match(request_path):
699 return required_permission in permissions
700 return False
702 # Check each permission mapping (uses precompiled regex patterns)
703 for method, path_pattern, required_permission in _PERMISSION_PATTERNS:
704 if request_method == method and path_pattern.match(request_path):
705 if required_permission in permissions:
706 return True
707 # Runtime compensation: tokens with MCP method permissions
708 # (tools.*, resources.*, prompts.*) implicitly have transport
709 # access (servers.use) — mirrors the generation-time injection
710 # in token_catalog_service._generate_token() for pre-existing tokens.
711 if required_permission == Permissions.SERVERS_USE:
712 if any(p.startswith(Permissions.MCP_METHOD_PREFIXES) for p in permissions):
713 logger.debug("Runtime servers.use compensation applied for token with MCP method permissions: %s", permissions)
714 return True
715 return False
716 return False
718 # LLM proxy permissions (respect configured llm_api_prefix).
719 for method, path_pattern, required_permission in _get_llm_permission_patterns(settings.llm_api_prefix):
720 if request_method == method and path_pattern.match(request_path):
721 return required_permission in permissions
723 # Default deny for unmatched paths (requires explicit permission mapping)
724 return False
726 def _check_team_membership(self, payload: dict, db=None) -> bool:
727 """
728 Check if user still belongs to teams in the token.
730 For public-only tokens (no teams), always returns True.
731 For team-scoped tokens, validates membership with caching.
733 Uses in-memory cache (per gateway instance, 60s TTL) to avoid repeated
734 email_team_members queries for the same user+teams combination.
735 Note: Sync path uses in-memory only for performance; Redis is not
736 consulted to avoid async overhead in the hot path.
738 Args:
739 payload: Decoded JWT payload containing teams
740 db: Optional database session. If provided, caller manages lifecycle.
741 If None, creates and manages its own session.
743 Returns:
744 bool: True if team membership is valid, False otherwise
745 """
746 teams = payload.get("teams", [])
747 user_email = payload.get("sub")
749 # PUBLIC-ONLY TOKEN: No team validation needed
750 if not teams or len(teams) == 0:
751 logger.debug(f"Public-only token for user {SecurityValidator.sanitize_log_message(user_email)} - no team validation required")
752 return True
754 # TEAM-SCOPED TOKEN: Validate membership
755 if not user_email:
756 logger.warning("Token missing user email")
757 return False
759 # Extract team IDs from token (handles both dict and string formats)
760 team_ids = [team["id"] if isinstance(team, dict) else team for team in teams]
762 # First-Party
763 from mcpgateway.cache.auth_cache import get_auth_cache # pylint: disable=import-outside-toplevel
765 # Check cache first (synchronous in-memory lookup)
766 auth_cache = get_auth_cache()
767 cached_result = auth_cache.get_team_membership_valid_sync(user_email, team_ids)
768 if cached_result is not None:
769 if not cached_result:
770 logger.warning(f"Token invalid (cached): User {SecurityValidator.sanitize_log_message(user_email)} no longer member of teams")
771 return cached_result
773 # Cache miss - query database
774 # First-Party
775 from mcpgateway.db import EmailTeamMember, get_db # pylint: disable=import-outside-toplevel
777 # Track if we own the session (and thus must clean it up)
778 owns_session = db is None
779 if owns_session:
780 db = next(get_db())
782 try:
783 # Single query for all teams (fixes N+1 pattern)
784 memberships = (
785 db.execute(
786 select(EmailTeamMember.team_id).where(
787 EmailTeamMember.team_id.in_(team_ids),
788 EmailTeamMember.user_email == user_email,
789 EmailTeamMember.is_active.is_(True),
790 )
791 )
792 .scalars()
793 .all()
794 )
796 # Check if user is member of ALL teams in token
797 valid_team_ids = set(memberships)
798 missing_teams = set(team_ids) - valid_team_ids
800 if missing_teams:
801 logger.warning(f"Token invalid: User {SecurityValidator.sanitize_log_message(user_email)} no longer member of teams: {SecurityValidator.sanitize_log_message(str(missing_teams))}")
802 # Cache negative result
803 auth_cache.set_team_membership_valid_sync(user_email, team_ids, False)
804 return False
806 # Cache positive result
807 auth_cache.set_team_membership_valid_sync(user_email, team_ids, True)
808 return True
809 finally:
810 # Only commit/close if we created the session
811 if owns_session:
812 try:
813 db.commit() # Commit read-only transaction to avoid implicit rollback
814 finally:
815 db.close()
817 def _check_resource_team_ownership(self, request_path: str, token_teams: list, db=None, _user_email: str = None) -> bool: # noqa: PLR0911 # pylint: disable=too-many-return-statements
818 """
819 Check if the requested resource is accessible by the token.
821 Implements Three-Tier Resource Visibility (Public/Team/Private):
822 - PUBLIC: Accessible by all tokens (public-only and team-scoped)
823 - TEAM: Accessible only by tokens scoped to that specific team
824 - PRIVATE: Accessible only by tokens scoped to that specific team
826 Token Access Rules:
827 - Public-only tokens (empty token_teams): Can access public resources + their own resources
828 - Team-scoped tokens: Can access their team's resources + public resources
830 Handles URLs like:
831 - /servers/{id}/mcp
832 - /servers/{id}/sse
833 - /servers/{id}
834 - /tools/{id}/execute
835 - /tools/{id}
836 - /resources/{id}
837 - /prompts/{id}
839 Args:
840 request_path: The request path/URL
841 token_teams: List of team IDs from the token (empty list = public-only token)
842 db: Optional database session. If provided, caller manages lifecycle.
843 If None, creates and manages its own session.
845 Returns:
846 bool: True if resource access is allowed, False otherwise
847 """
848 request_path = self._normalize_path_for_matching(request_path)
850 # Normalize token_teams: extract team IDs from dict objects (backward compatibility)
851 token_team_ids = []
852 for team in token_teams:
853 if isinstance(team, dict):
854 token_team_ids.append(team["id"])
855 else:
856 token_team_ids.append(team)
858 # Determine token type
859 is_public_token = not token_team_ids or len(token_team_ids) == 0
861 if is_public_token:
862 logger.debug("Processing request with PUBLIC-ONLY token")
863 else:
864 logger.debug(f"Processing request with TEAM-SCOPED token (teams: {SecurityValidator.sanitize_log_message(str(token_teams))})")
866 # Extract resource type and ID from path (uses precompiled regex patterns)
867 # IDs are UUID hex strings (32 chars) or UUID with dashes (36 chars)
868 resource_id = None
869 resource_type = None
871 for pattern, rtype in _RESOURCE_PATTERNS:
872 match = pattern.search(request_path)
873 if match:
874 resource_id = match.group(1)
875 resource_type = rtype
876 logger.debug(f"Extracted {rtype} ID: {SecurityValidator.sanitize_log_message(resource_id)} from path: {SecurityValidator.sanitize_log_message(request_path)}")
877 break
879 # If no resource ID in path, allow (general endpoints like /health, /tokens, /metrics)
880 if not resource_id or not resource_type:
881 logger.debug(f"No resource ID found in path {request_path}, allowing access")
882 return True
884 # Import database models
885 # First-Party
886 from mcpgateway.db import Gateway, get_db, Prompt, Resource, Server, Tool # pylint: disable=import-outside-toplevel
888 # Track if we own the session (and thus must clean it up)
889 owns_session = db is None
890 if owns_session:
891 db = next(get_db())
893 try:
894 # Check Virtual Servers
895 if resource_type == "server":
896 server = db.execute(select(Server).where(Server.id == resource_id)).scalar_one_or_none()
898 if not server:
899 logger.warning(f"Server {SecurityValidator.sanitize_log_message(resource_id)} not found in database")
900 return False
902 # Get server visibility (default to 'team' if field doesn't exist)
903 server_visibility = getattr(server, "visibility", "team")
905 # PUBLIC SERVERS: Accessible by everyone (including public-only tokens)
906 if server_visibility == "public":
907 logger.debug(f"Access granted: Server {SecurityValidator.sanitize_log_message(resource_id)} is PUBLIC")
908 return True
910 # PUBLIC-ONLY TOKEN: Can ONLY access public servers (strict public-only policy)
911 # No owner access - if user needs own resources, use a personal team-scoped token
912 if is_public_token:
913 logger.warning(
914 f"Access denied: Public-only token cannot access {SecurityValidator.sanitize_log_message(server_visibility)} server {SecurityValidator.sanitize_log_message(resource_id)}"
915 )
916 return False
918 # TEAM-SCOPED SERVERS: Check if server belongs to token's teams
919 if server_visibility == "team":
920 if server.team_id in token_team_ids:
921 logger.debug(
922 f"Access granted: Team server {SecurityValidator.sanitize_log_message(resource_id)} belongs to token's team {SecurityValidator.sanitize_log_message(str(server.team_id))}"
923 )
924 return True
926 logger.warning(
927 f"Access denied: Server {SecurityValidator.sanitize_log_message(resource_id)} is team-scoped to '{SecurityValidator.sanitize_log_message(str(server.team_id))}', token is scoped to teams {SecurityValidator.sanitize_log_message(str(token_team_ids))}"
928 )
929 return False
931 # PRIVATE SERVERS: Owner-only access (per RBAC doc)
932 if server_visibility == "private":
933 server_owner = getattr(server, "owner_email", None)
934 if server_owner and server_owner == _user_email:
935 logger.debug(f"Access granted: Private server {SecurityValidator.sanitize_log_message(resource_id)} owned by {SecurityValidator.sanitize_log_message(_user_email)}")
936 return True
938 logger.warning(
939 f"Access denied: Server {SecurityValidator.sanitize_log_message(resource_id)} is private, owner is '{SecurityValidator.sanitize_log_message(str(server_owner))}', requester is '{SecurityValidator.sanitize_log_message(_user_email)}'"
940 )
941 return False
943 # Unknown visibility - deny by default
944 logger.warning(f"Access denied: Server {SecurityValidator.sanitize_log_message(resource_id)} has unknown visibility: {SecurityValidator.sanitize_log_message(server_visibility)}")
945 return False
947 # CHECK TOOLS
948 if resource_type == "tool":
949 tool = db.execute(select(Tool).where(Tool.id == resource_id)).scalar_one_or_none()
951 if not tool:
952 logger.warning(f"Tool {SecurityValidator.sanitize_log_message(resource_id)} not found in database")
953 return False
955 # Get tool visibility (default to 'team' if field doesn't exist)
956 tool_visibility = getattr(tool, "visibility", "team")
958 # PUBLIC TOOLS: Accessible by everyone (including public-only tokens)
959 if tool_visibility == "public":
960 logger.debug(f"Access granted: Tool {SecurityValidator.sanitize_log_message(resource_id)} is PUBLIC")
961 return True
963 # PUBLIC-ONLY TOKEN: Can ONLY access public tools (strict public-only policy)
964 # No owner access - if user needs own resources, use a personal team-scoped token
965 if is_public_token:
966 logger.warning(
967 f"Access denied: Public-only token cannot access {SecurityValidator.sanitize_log_message(tool_visibility)} tool {SecurityValidator.sanitize_log_message(resource_id)}"
968 )
969 return False
971 # TEAM TOOLS: Check if tool's team matches token's teams
972 if tool_visibility == "team":
973 tool_team_id = getattr(tool, "team_id", None)
974 if tool_team_id and tool_team_id in token_team_ids:
975 logger.debug(
976 f"Access granted: Team tool {SecurityValidator.sanitize_log_message(resource_id)} belongs to token's team {SecurityValidator.sanitize_log_message(str(tool_team_id))}"
977 )
978 return True
980 logger.warning(
981 f"Access denied: Tool {SecurityValidator.sanitize_log_message(resource_id)} is team-scoped to '{SecurityValidator.sanitize_log_message(str(tool_team_id))}', token is scoped to teams {SecurityValidator.sanitize_log_message(str(token_team_ids))}"
982 )
983 return False
985 # PRIVATE TOOLS: Owner-only access (per RBAC doc)
986 if tool_visibility in ["private", "user"]:
987 tool_owner = getattr(tool, "owner_email", None)
988 if tool_owner and tool_owner == _user_email:
989 logger.debug(f"Access granted: Private tool {SecurityValidator.sanitize_log_message(resource_id)} owned by {SecurityValidator.sanitize_log_message(_user_email)}")
990 return True
992 logger.warning(
993 f"Access denied: Tool {SecurityValidator.sanitize_log_message(resource_id)} is {SecurityValidator.sanitize_log_message(tool_visibility)}, owner is '{SecurityValidator.sanitize_log_message(str(tool_owner))}', requester is '{SecurityValidator.sanitize_log_message(_user_email)}'"
994 )
995 return False
997 # Unknown visibility - deny by default
998 logger.warning(f"Access denied: Tool {SecurityValidator.sanitize_log_message(resource_id)} has unknown visibility: {SecurityValidator.sanitize_log_message(tool_visibility)}")
999 return False
1001 # CHECK RESOURCES
1002 if resource_type == "resource":
1003 resource = db.execute(select(Resource).where(Resource.id == resource_id)).scalar_one_or_none()
1005 if not resource:
1006 logger.warning(f"Resource {SecurityValidator.sanitize_log_message(resource_id)} not found in database")
1007 return False
1009 # Get resource visibility (default to 'team' if field doesn't exist)
1010 resource_visibility = getattr(resource, "visibility", "team")
1012 # PUBLIC RESOURCES: Accessible by everyone (including public-only tokens)
1013 if resource_visibility == "public":
1014 logger.debug(f"Access granted: Resource {SecurityValidator.sanitize_log_message(resource_id)} is PUBLIC")
1015 return True
1017 # PUBLIC-ONLY TOKEN: Can ONLY access public resources (strict public-only policy)
1018 # No owner access - if user needs own resources, use a personal team-scoped token
1019 if is_public_token:
1020 logger.warning(
1021 f"Access denied: Public-only token cannot access {SecurityValidator.sanitize_log_message(resource_visibility)} resource {SecurityValidator.sanitize_log_message(resource_id)}"
1022 )
1023 return False
1025 # TEAM RESOURCES: Check if resource's team matches token's teams
1026 if resource_visibility == "team":
1027 resource_team_id = getattr(resource, "team_id", None)
1028 if resource_team_id and resource_team_id in token_team_ids:
1029 logger.debug(
1030 f"Access granted: Team resource {SecurityValidator.sanitize_log_message(resource_id)} belongs to token's team {SecurityValidator.sanitize_log_message(str(resource_team_id))}"
1031 )
1032 return True
1034 logger.warning(
1035 f"Access denied: Resource {SecurityValidator.sanitize_log_message(resource_id)} is team-scoped to '{SecurityValidator.sanitize_log_message(str(resource_team_id))}', token is scoped to teams {SecurityValidator.sanitize_log_message(str(token_team_ids))}"
1036 )
1037 return False
1039 # PRIVATE RESOURCES: Owner-only access (per RBAC doc)
1040 if resource_visibility in ["private", "user"]:
1041 resource_owner = getattr(resource, "owner_email", None)
1042 if resource_owner and resource_owner == _user_email:
1043 logger.debug(f"Access granted: Private resource {SecurityValidator.sanitize_log_message(resource_id)} owned by {SecurityValidator.sanitize_log_message(_user_email)}")
1044 return True
1046 logger.warning(
1047 f"Access denied: Resource {SecurityValidator.sanitize_log_message(resource_id)} is {SecurityValidator.sanitize_log_message(resource_visibility)}, owner is '{SecurityValidator.sanitize_log_message(str(resource_owner))}', requester is '{SecurityValidator.sanitize_log_message(_user_email)}'"
1048 )
1049 return False
1051 # Unknown visibility - deny by default
1052 logger.warning(f"Access denied: Resource {SecurityValidator.sanitize_log_message(resource_id)} has unknown visibility: {SecurityValidator.sanitize_log_message(resource_visibility)}")
1053 return False
1055 # CHECK PROMPTS
1056 if resource_type == "prompt":
1057 prompt = db.execute(select(Prompt).where(Prompt.id == resource_id)).scalar_one_or_none()
1059 if not prompt:
1060 logger.warning(f"Prompt {SecurityValidator.sanitize_log_message(resource_id)} not found in database")
1061 return False
1063 # Get prompt visibility (default to 'team' if field doesn't exist)
1064 prompt_visibility = getattr(prompt, "visibility", "team")
1066 # PUBLIC PROMPTS: Accessible by everyone (including public-only tokens)
1067 if prompt_visibility == "public":
1068 logger.debug(f"Access granted: Prompt {SecurityValidator.sanitize_log_message(resource_id)} is PUBLIC")
1069 return True
1071 # PUBLIC-ONLY TOKEN: Can ONLY access public prompts (strict public-only policy)
1072 # No owner access - if user needs own resources, use a personal team-scoped token
1073 if is_public_token:
1074 logger.warning(
1075 f"Access denied: Public-only token cannot access {SecurityValidator.sanitize_log_message(prompt_visibility)} prompt {SecurityValidator.sanitize_log_message(resource_id)}"
1076 )
1077 return False
1079 # TEAM PROMPTS: Check if prompt's team matches token's teams
1080 if prompt_visibility == "team":
1081 prompt_team_id = getattr(prompt, "team_id", None)
1082 if prompt_team_id and prompt_team_id in token_team_ids:
1083 logger.debug(
1084 f"Access granted: Team prompt {SecurityValidator.sanitize_log_message(resource_id)} belongs to token's team {SecurityValidator.sanitize_log_message(str(prompt_team_id))}"
1085 )
1086 return True
1088 logger.warning(
1089 f"Access denied: Prompt {SecurityValidator.sanitize_log_message(resource_id)} is team-scoped to '{SecurityValidator.sanitize_log_message(str(prompt_team_id))}', token is scoped to teams {SecurityValidator.sanitize_log_message(str(token_team_ids))}"
1090 )
1091 return False
1093 # PRIVATE PROMPTS: Owner-only access (per RBAC doc)
1094 if prompt_visibility in ["private", "user"]:
1095 prompt_owner = getattr(prompt, "owner_email", None)
1096 if prompt_owner and prompt_owner == _user_email:
1097 logger.debug(f"Access granted: Private prompt {SecurityValidator.sanitize_log_message(resource_id)} owned by {SecurityValidator.sanitize_log_message(_user_email)}")
1098 return True
1100 logger.warning(
1101 f"Access denied: Prompt {SecurityValidator.sanitize_log_message(resource_id)} is {SecurityValidator.sanitize_log_message(prompt_visibility)}, owner is '{SecurityValidator.sanitize_log_message(str(prompt_owner))}', requester is '{SecurityValidator.sanitize_log_message(_user_email)}'"
1102 )
1103 return False
1105 # Unknown visibility - deny by default
1106 logger.warning(f"Access denied: Prompt {SecurityValidator.sanitize_log_message(resource_id)} has unknown visibility: {SecurityValidator.sanitize_log_message(prompt_visibility)}")
1107 return False
1109 # CHECK GATEWAYS
1110 if resource_type == "gateway":
1111 gateway = db.execute(select(Gateway).where(Gateway.id == resource_id)).scalar_one_or_none()
1113 if not gateway:
1114 logger.warning(f"Gateway {SecurityValidator.sanitize_log_message(resource_id)} not found in database")
1115 return False
1117 # Get gateway visibility (default to 'team' if field doesn't exist)
1118 gateway_visibility = getattr(gateway, "visibility", "team")
1120 # PUBLIC GATEWAYS: Accessible by everyone (including public-only tokens)
1121 if gateway_visibility == "public":
1122 logger.debug(f"Access granted: Gateway {SecurityValidator.sanitize_log_message(resource_id)} is PUBLIC")
1123 return True
1125 # PUBLIC-ONLY TOKEN: Can ONLY access public gateways (strict public-only policy)
1126 # No owner access - if user needs own resources, use a personal team-scoped token
1127 if is_public_token:
1128 logger.warning(
1129 f"Access denied: Public-only token cannot access {SecurityValidator.sanitize_log_message(gateway_visibility)} gateway {SecurityValidator.sanitize_log_message(resource_id)}"
1130 )
1131 return False
1133 # TEAM GATEWAYS: Check if gateway's team matches token's teams
1134 if gateway_visibility == "team":
1135 gateway_team_id = getattr(gateway, "team_id", None)
1136 if gateway_team_id and gateway_team_id in token_team_ids:
1137 logger.debug(
1138 f"Access granted: Team gateway {SecurityValidator.sanitize_log_message(resource_id)} belongs to token's team {SecurityValidator.sanitize_log_message(str(gateway_team_id))}"
1139 )
1140 return True
1142 logger.warning(
1143 f"Access denied: Gateway {SecurityValidator.sanitize_log_message(resource_id)} is team-scoped to '{SecurityValidator.sanitize_log_message(str(gateway_team_id))}', token is scoped to teams {SecurityValidator.sanitize_log_message(str(token_team_ids))}"
1144 )
1145 return False
1147 # PRIVATE GATEWAYS: Owner-only access (per RBAC doc)
1148 if gateway_visibility in ["private", "user"]:
1149 gateway_owner = getattr(gateway, "owner_email", None)
1150 if gateway_owner and gateway_owner == _user_email:
1151 logger.debug(f"Access granted: Private gateway {SecurityValidator.sanitize_log_message(resource_id)} owned by {SecurityValidator.sanitize_log_message(_user_email)}")
1152 return True
1154 logger.warning(
1155 f"Access denied: Gateway {SecurityValidator.sanitize_log_message(resource_id)} is {SecurityValidator.sanitize_log_message(gateway_visibility)}, owner is '{SecurityValidator.sanitize_log_message(str(gateway_owner))}', requester is '{SecurityValidator.sanitize_log_message(_user_email)}'"
1156 )
1157 return False
1159 # Unknown visibility - deny by default
1160 logger.warning(f"Access denied: Gateway {SecurityValidator.sanitize_log_message(resource_id)} has unknown visibility: {SecurityValidator.sanitize_log_message(gateway_visibility)}")
1161 return False
1163 # UNKNOWN RESOURCE TYPE
1164 logger.warning(f"Unknown resource type '{SecurityValidator.sanitize_log_message(str(resource_type))}' for path: {SecurityValidator.sanitize_log_message(request_path)}")
1165 return False
1167 except Exception as e:
1168 logger.error(f"Error checking resource team ownership for {request_path}: {e}", exc_info=True)
1169 # Fail securely - deny access on error
1170 return False
1171 finally:
1172 # Only commit/close if we created the session
1173 if owns_session:
1174 try:
1175 db.commit() # Commit read-only transaction to avoid implicit rollback
1176 finally:
1177 db.close()
1179 async def __call__(self, request: Request, call_next):
1180 """Middleware function to check token scoping including team-level validation.
1182 Args:
1183 request: FastAPI request object
1184 call_next: Next middleware/handler in chain
1186 Returns:
1187 Response from next handler or HTTPException
1189 Raises:
1190 HTTPException: If token scoping restrictions are violated
1191 """
1192 try:
1193 # Skip if already scoped (prevents double-scoping for /mcp requests)
1194 # MCPPathRewriteMiddleware runs scoping via dispatch, then routes through
1195 # middleware stack which hits BaseHTTPMiddleware's scoping again.
1196 # Use request.state flag which persists across middleware invocations.
1197 if getattr(request.state, "_token_scoping_done", False):
1198 return await call_next(request)
1200 # Mark as scoped before doing any work
1201 request.state._token_scoping_done = True
1203 normalized_path = self._get_normalized_request_path(request)
1205 # Skip scoping for certain paths (truly public endpoints only)
1206 skip_paths = [
1207 "/health",
1208 "/openapi.json",
1209 "/docs",
1210 "/redoc",
1211 "/auth/email/login",
1212 "/auth/email/register",
1213 "/.well-known/",
1214 ]
1216 # Check exact root path separately
1217 if normalized_path == "/":
1218 return await call_next(request)
1220 # Trusted internal Rust -> Python MCP dispatch already carries a
1221 # normalized auth context and is re-authorized by the internal MCP
1222 # handlers. Re-applying token-scoping path checks here would reject
1223 # the private /_internal/mcp/* hop for scoped tokens.
1224 if self._is_trusted_internal_mcp_runtime_request(request, normalized_path):
1225 return await call_next(request)
1227 if any(normalized_path.startswith(path) for path in skip_paths):
1228 return await call_next(request)
1230 # Skip server-specific well-known endpoints (RFC 9728)
1231 if re.match(r"^/servers/[^/]+/\.well-known/", normalized_path):
1232 return await call_next(request)
1234 # Extract full token payload (not just scopes)
1235 payload = await self._extract_token_scopes(request)
1237 # If no payload, continue (regular auth will handle this)
1238 if not payload:
1239 return await call_next(request)
1241 # TEAM VALIDATION: Use single DB session for both team checks
1242 # This reduces connection pool overhead from 2 sessions to 1 for resource endpoints
1243 user_email = payload.get("sub") or payload.get("email") # Extract user email for ownership check
1245 # Resolve teams based on token_use claim
1246 token_use = payload.get("token_use")
1247 if token_use == "session": # nosec B105 - Not a password; token_use is a JWT claim type
1248 # Session token: resolve teams from DB/cache directly
1249 # Cannot rely on request.state.token_teams — AuthContextMiddleware
1250 # is gated by security_logging_enabled (defaults to False)
1251 is_admin = payload.get("is_admin", False) or payload.get("user", {}).get("is_admin", False)
1252 user_info = {"is_admin": is_admin}
1253 token_teams = await resolve_session_teams(payload, user_email, user_info)
1254 else:
1255 # API token or legacy: use embedded teams with normalize_token_teams
1256 token_teams = normalize_token_teams(payload)
1258 # Check if admin bypass is active (token_teams is None means admin with explicit null teams)
1259 is_admin_bypass = token_teams is None
1261 # Admin with explicit null teams bypasses team validation entirely
1262 if is_admin_bypass:
1263 logger.debug(f"Admin bypass: skipping team validation for {SecurityValidator.sanitize_log_message(user_email)}")
1264 # Skip to other checks (server_id, IP, etc.)
1265 elif token_teams:
1266 # First-Party
1267 from mcpgateway.db import get_db # pylint: disable=import-outside-toplevel
1269 db = next(get_db())
1270 try:
1271 # Check team membership — only for API/legacy tokens whose teams
1272 # come from JWT claims and may be stale. Session tokens skip this
1273 # because resolve_session_teams() already resolved membership from
1274 # the DB; re-checking the raw JWT claim here would conflict with
1275 # the intersection semantics (stale JWT teams would cause a 403
1276 # even though the user has valid DB teams).
1277 # NOTE: session-token membership staleness is bounded by the
1278 # auth_cache TTL (see _resolve_teams_from_db).
1279 if token_use != "session" and not self._check_team_membership(payload, db=db): # nosec B105 - Not a password; token_use is a JWT claim type
1280 logger.warning("Token rejected: User no longer member of associated team(s)")
1281 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token is invalid: User is no longer a member of the associated team")
1283 # Check resource team ownership with shared session
1284 if not self._check_resource_team_ownership(normalized_path, token_teams, db=db, _user_email=user_email):
1285 logger.warning(f"Access denied: Resource does not belong to token's teams {SecurityValidator.sanitize_log_message(str(token_teams))}")
1286 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
1287 finally:
1288 # Ensure session cleanup even if checks raise exceptions
1289 try:
1290 db.commit()
1291 finally:
1292 db.close()
1293 else:
1294 # Public-only token (or session token with empty intersection):
1295 # skip _check_team_membership for session tokens — the empty
1296 # intersection already means no team-scoped access.
1297 # Membership staleness bounded by auth_cache TTL.
1298 if token_use != "session" and not self._check_team_membership(payload): # nosec B105 - Not a password; token_use is a JWT claim type
1299 logger.warning("Token rejected: User no longer member of associated team(s)")
1300 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token is invalid: User is no longer a member of the associated team")
1302 if not self._check_resource_team_ownership(normalized_path, token_teams, _user_email=user_email):
1303 logger.warning(f"Access denied: Resource does not belong to token's teams {SecurityValidator.sanitize_log_message(str(token_teams))}")
1304 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
1306 # Extract scopes from payload
1307 scopes = payload.get("scopes", {})
1309 # Check server ID restriction
1310 server_id = scopes.get("server_id")
1311 if not self._check_server_restriction(normalized_path, server_id):
1312 logger.warning(f"Token not authorized for this server. Required: {SecurityValidator.sanitize_log_message(str(server_id))}")
1313 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
1315 # Check IP restrictions
1316 ip_restrictions = scopes.get("ip_restrictions", [])
1317 if ip_restrictions:
1318 client_ip = self._get_client_ip(request)
1319 if not self._check_ip_restrictions(client_ip, ip_restrictions):
1320 logger.warning(f"Request from IP {client_ip} not allowed by token restrictions")
1321 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
1323 # Check time restrictions
1324 time_restrictions = scopes.get("time_restrictions", {})
1325 if not self._check_time_restrictions(time_restrictions):
1326 logger.warning("Request not allowed at this time by token restrictions")
1327 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Request not allowed at this time by token restrictions")
1329 # Check permission restrictions
1330 permissions = scopes.get("permissions", [])
1331 if not self._check_permission_restrictions(normalized_path, request.method, permissions):
1332 logger.warning("Insufficient permissions for this operation")
1333 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
1335 # Check optional token usage limits.
1336 usage_limits = scopes.get("usage_limits", {})
1337 usage_allowed, usage_reason = self._check_usage_limits(payload.get("jti"), usage_limits)
1338 if not usage_allowed:
1339 logger.warning("Token usage limit exceeded for jti %s: %s", payload.get("jti"), usage_reason)
1340 raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=usage_reason or "Token usage limit exceeded")
1342 # All scoping checks passed, continue
1343 return await call_next(request)
1345 except HTTPException as exc:
1346 # Return clean JSON response instead of traceback
1347 return ORJSONResponse(
1348 status_code=exc.status_code,
1349 content={"detail": exc.detail},
1350 )
1352 def _is_trusted_internal_mcp_runtime_request(self, request: Request, normalized_path: str) -> bool:
1353 """Return whether the request is a trusted loopback Rust MCP sidecar hop.
1355 Args:
1356 request: Incoming HTTP request.
1357 normalized_path: Canonicalized request path used for route matching.
1359 Returns:
1360 ``True`` when the request originated from the local Rust MCP runtime and
1361 includes the expected trusted headers.
1362 """
1363 if normalized_path != _INTERNAL_MCP_PATH_PREFIX and not normalized_path.startswith(f"{_INTERNAL_MCP_PATH_PREFIX}/"):
1364 return False
1366 if request.headers.get(_INTERNAL_MCP_RUNTIME_HEADER) != "rust":
1367 return False
1369 provided_auth = request.headers.get(_INTERNAL_MCP_RUNTIME_AUTH_HEADER)
1370 if not provided_auth:
1371 return False
1373 expected_auth = self._expected_internal_mcp_runtime_auth_header()
1374 if not hmac.compare_digest(provided_auth, expected_auth):
1375 return False
1377 if not request.headers.get(_INTERNAL_MCP_AUTH_CONTEXT_HEADER):
1378 return False
1380 client_host = getattr(getattr(request, "client", None), "host", None)
1381 return client_host in ("127.0.0.1", "::1")
1383 @staticmethod
1384 def _auth_encryption_secret_value() -> str:
1385 """Return the configured auth-encryption secret as a plain string.
1387 Returns:
1388 The auth-encryption secret, normalized to a regular string.
1389 """
1390 secret = settings.auth_encryption_secret
1391 if hasattr(secret, "get_secret_value"):
1392 return secret.get_secret_value()
1393 return str(secret)
1395 @staticmethod
1396 @lru_cache(maxsize=8)
1397 def _expected_internal_mcp_runtime_auth_header_for_secret(secret: str) -> str:
1398 """Return the expected shared internal-auth header for a specific secret.
1400 Args:
1401 secret: Auth-encryption secret to derive the trust header from.
1403 Returns:
1404 Hex-encoded SHA-256 digest derived from the provided auth secret.
1405 """
1406 material = f"{secret}:{_INTERNAL_MCP_RUNTIME_AUTH_CONTEXT}".encode("utf-8")
1407 return hashlib.sha256(material).hexdigest()
1409 @staticmethod
1410 def _expected_internal_mcp_runtime_auth_header() -> str:
1411 """Return the expected shared internal-auth header for Rust MCP hops.
1413 Returns:
1414 Shared secret-derived digest expected on trusted internal Rust MCP calls.
1415 """
1416 return TokenScopingMiddleware._expected_internal_mcp_runtime_auth_header_for_secret(TokenScopingMiddleware._auth_encryption_secret_value())
1419# Create middleware instance
1420token_scoping_middleware = TokenScopingMiddleware()