Coverage for mcpgateway / middleware / token_scoping.py: 99%
486 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/middleware/token_scoping.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Token Scoping Middleware.
8This middleware enforces token scoping restrictions at the API level,
9including server_id restrictions, IP restrictions, permission checks,
10and time-based restrictions.
11"""
13# Standard
14from datetime import datetime, timedelta, timezone
15from functools import lru_cache
16import ipaddress
17import re
18from typing import List, Optional, Pattern, Tuple
20# Third-Party
21from fastapi import HTTPException, Request, status
22from fastapi.security import HTTPBearer
23from sqlalchemy import and_, func, select
25# First-Party
26from mcpgateway.auth import normalize_token_teams
27from mcpgateway.config import settings
28from mcpgateway.db import Permissions
29from mcpgateway.middleware.rbac import _ACCESS_DENIED_MSG
30from mcpgateway.services.logging_service import LoggingService
31from mcpgateway.utils.orjson_response import ORJSONResponse
32from mcpgateway.utils.verify_credentials import verify_jwt_token_cached
34# Security scheme
35bearer_scheme = HTTPBearer(auto_error=False)
37# Initialize logging service first
38logging_service = LoggingService()
39logger = logging_service.get_logger(__name__)
41# ============================================================================
42# Precompiled regex patterns (compiled once at module load for performance)
43# ============================================================================
45# Server path extraction patterns
46_SERVER_PATH_PATTERNS: List[Pattern[str]] = [
47 re.compile(r"^/servers/([^/]+)(?:$|/)"),
48 re.compile(r"^/sse/([^/?]+)(?:$|\?)"),
49 re.compile(r"^/ws/([^/?]+)(?:$|\?)"),
50]
52# Resource ID extraction patterns (IDs are UUID hex strings)
53_RESOURCE_PATTERNS: List[Tuple[Pattern[str], str]] = [
54 (re.compile(r"/servers/?([a-f0-9\-]+)"), "server"),
55 (re.compile(r"/tools/?([a-f0-9\-]+)"), "tool"),
56 (re.compile(r"/resources/?([a-f0-9\-]+)"), "resource"),
57 (re.compile(r"/prompts/?([a-f0-9\-]+)"), "prompt"),
58 (re.compile(r"/gateways/?([a-f0-9\-]+)"), "gateway"),
59]
60_AUTH_COOKIE_NAMES = ("jwt_token", "access_token")
62# Permission map with precompiled patterns
63# Maps (HTTP method, path pattern) to required permission
64_PERMISSION_PATTERNS: List[Tuple[str, Pattern[str], str]] = [
65 # Tools permissions
66 ("GET", re.compile(r"^/tools(?:$|/)"), Permissions.TOOLS_READ),
67 ("POST", re.compile(r"^/tools/?$"), Permissions.TOOLS_CREATE), # Only exact /tools or /tools/
68 ("POST", re.compile(r"^/tools/[^/]+/"), Permissions.TOOLS_UPDATE), # POST to sub-resources (state, toggle)
69 ("PUT", re.compile(r"^/tools/[^/]+(?:$|/)"), Permissions.TOOLS_UPDATE),
70 ("DELETE", re.compile(r"^/tools/[^/]+(?:$|/)"), Permissions.TOOLS_DELETE),
71 ("GET", re.compile(r"^/servers/[^/]+/tools(?:$|/)"), Permissions.TOOLS_READ),
72 ("POST", re.compile(r"^/servers/[^/]+/tools/[^/]+/call(?:$|/)"), Permissions.TOOLS_EXECUTE),
73 # JSON-RPC endpoint — multiplexes tools/call, resources/list, initialize, etc.
74 # Fine-grained per-method RBAC is enforced downstream by _ensure_rpc_permission();
75 # the middleware only gates transport-level access via servers.use.
76 ("POST", re.compile(r"^/rpc(?:$|/)"), Permissions.SERVERS_USE),
77 # SSE transport — like /rpc and /mcp, this is a transport-level endpoint;
78 # the handler's own @require_permission enforces fine-grained RBAC.
79 ("GET", re.compile(r"^/sse(?:$|/)"), Permissions.SERVERS_USE),
80 # Streamable HTTP MCP transport (POST=send, GET=SSE stream, DELETE=session termination)
81 ("POST", re.compile(r"^/mcp(?:$|/)"), Permissions.SERVERS_USE),
82 ("GET", re.compile(r"^/mcp(?:$|/)"), Permissions.SERVERS_USE),
83 ("DELETE", re.compile(r"^/mcp(?:$|/)"), Permissions.SERVERS_USE),
84 # Resources permissions
85 ("GET", re.compile(r"^/resources(?:$|/)"), Permissions.RESOURCES_READ),
86 ("POST", re.compile(r"^/resources/?$"), Permissions.RESOURCES_CREATE), # Only exact /resources or /resources/
87 ("POST", re.compile(r"^/resources/subscribe(?:$|/)"), Permissions.RESOURCES_READ), # SSE subscription
88 ("POST", re.compile(r"^/resources/[^/]+/"), Permissions.RESOURCES_UPDATE), # POST to sub-resources (state, toggle)
89 ("PUT", re.compile(r"^/resources/[^/]+(?:$|/)"), Permissions.RESOURCES_UPDATE),
90 ("DELETE", re.compile(r"^/resources/[^/]+(?:$|/)"), Permissions.RESOURCES_DELETE),
91 ("GET", re.compile(r"^/servers/[^/]+/resources(?:$|/)"), Permissions.RESOURCES_READ),
92 # Prompts permissions
93 ("GET", re.compile(r"^/prompts(?:$|/)"), Permissions.PROMPTS_READ),
94 ("POST", re.compile(r"^/prompts/?$"), Permissions.PROMPTS_CREATE), # Only exact /prompts or /prompts/
95 ("POST", re.compile(r"^/prompts/[^/]+/"), Permissions.PROMPTS_UPDATE), # POST to sub-resources (state, toggle)
96 ("POST", re.compile(r"^/prompts/[^/]+$"), Permissions.PROMPTS_READ), # MCP spec prompt retrieval (POST /prompts/{id})
97 ("PUT", re.compile(r"^/prompts/[^/]+(?:$|/)"), Permissions.PROMPTS_UPDATE),
98 ("DELETE", re.compile(r"^/prompts/[^/]+(?:$|/)"), Permissions.PROMPTS_DELETE),
99 # Server management permissions
100 ("GET", re.compile(r"^/servers/[^/]+/sse(?:$|/)"), Permissions.SERVERS_USE), # Server SSE access endpoint
101 ("GET", re.compile(r"^/servers(?:$|/)"), Permissions.SERVERS_READ),
102 ("POST", re.compile(r"^/servers/?$"), Permissions.SERVERS_CREATE), # Only exact /servers or /servers/
103 ("POST", re.compile(r"^/servers/[^/]+/(?:state|toggle)(?:$|/)"), Permissions.SERVERS_UPDATE), # Server management sub-resources
104 ("POST", re.compile(r"^/servers/[^/]+/message(?:$|/)"), Permissions.SERVERS_USE), # Server message access endpoint
105 ("POST", re.compile(r"^/servers/[^/]+/mcp(?:$|/)"), Permissions.SERVERS_USE), # Server MCP access endpoint
106 ("PUT", re.compile(r"^/servers/[^/]+(?:$|/)"), Permissions.SERVERS_UPDATE),
107 ("DELETE", re.compile(r"^/servers/[^/]+(?:$|/)"), Permissions.SERVERS_DELETE),
108 # Gateway permissions
109 ("GET", re.compile(r"^/gateways(?:$|/)"), Permissions.GATEWAYS_READ),
110 ("POST", re.compile(r"^/gateways/?$"), Permissions.GATEWAYS_CREATE), # Only exact /gateways or /gateways/
111 ("POST", re.compile(r"^/gateways/[^/]+/"), Permissions.GATEWAYS_UPDATE), # POST to sub-resources (state, toggle, refresh)
112 ("PUT", re.compile(r"^/gateways/[^/]+(?:$|/)"), Permissions.GATEWAYS_UPDATE),
113 ("DELETE", re.compile(r"^/gateways/[^/]+(?:$|/)"), Permissions.GATEWAYS_DELETE),
114 # Metrics permissions
115 ("GET", re.compile(r"^/metrics(?:$|/)"), Permissions.ADMIN_METRICS),
116 ("POST", re.compile(r"^/metrics/reset(?:$|/)"), Permissions.ADMIN_METRICS),
117 # Token permissions
118 ("GET", re.compile(r"^/tokens(?:$|/)"), Permissions.TOKENS_READ),
119 ("POST", re.compile(r"^/tokens/?$"), Permissions.TOKENS_CREATE), # Only exact /tokens or /tokens/
120 ("POST", re.compile(r"^/tokens/teams/[^/]+(?:$|/)"), Permissions.TOKENS_CREATE),
121 ("PUT", re.compile(r"^/tokens/[^/]+(?:$|/)"), Permissions.TOKENS_UPDATE),
122 ("DELETE", re.compile(r"^/tokens/[^/]+(?:$|/)"), Permissions.TOKENS_REVOKE),
123]
125# Admin route permission map (granular by route group).
126# IMPORTANT: Unmatched /admin/* paths are denied by default (fail-secure).
127_ADMIN_PERMISSION_PATTERNS: List[Tuple[str, Pattern[str], str]] = [
128 # Dashboard/overview surfaces
129 ("GET", re.compile(r"^/admin/?$"), Permissions.ADMIN_DASHBOARD),
130 ("GET", re.compile(r"^/admin/search(?:$|/)"), Permissions.ADMIN_DASHBOARD),
131 ("GET", re.compile(r"^/admin/overview(?:$|/)"), Permissions.ADMIN_OVERVIEW),
132 # User management
133 ("GET", re.compile(r"^/admin/users(?:$|/)"), Permissions.ADMIN_USER_MANAGEMENT),
134 ("POST", re.compile(r"^/admin/users(?:$|/)"), Permissions.ADMIN_USER_MANAGEMENT),
135 ("DELETE", re.compile(r"^/admin/users(?:$|/)"), Permissions.ADMIN_USER_MANAGEMENT),
136 # Team management
137 ("POST", re.compile(r"^/admin/teams/?$"), Permissions.TEAMS_CREATE),
138 ("DELETE", re.compile(r"^/admin/teams/[^/]+/join-request/[^/]+(?:$|/)"), Permissions.TEAMS_JOIN),
139 ("DELETE", re.compile(r"^/admin/teams/[^/]+(?:$|/)"), Permissions.TEAMS_DELETE),
140 ("GET", re.compile(r"^/admin/teams/[^/]+/edit(?:$|/)"), Permissions.TEAMS_UPDATE),
141 ("POST", re.compile(r"^/admin/teams/[^/]+/update(?:$|/)"), Permissions.TEAMS_UPDATE),
142 ("GET", re.compile(r"^/admin/teams/[^/]+/(?:members/add|members/partial|non-members/partial|join-requests)(?:$|/)"), Permissions.TEAMS_MANAGE_MEMBERS),
143 ("POST", re.compile(r"^/admin/teams/[^/]+/(?:add-member|update-member-role|remove-member|join-requests/[^/]+/(?:approve|reject))(?:$|/)"), Permissions.TEAMS_MANAGE_MEMBERS),
144 ("POST", re.compile(r"^/admin/teams/[^/]+/(?:leave|join-request(?:/[^/]+)?)(?:$|/)"), Permissions.TEAMS_JOIN),
145 ("GET", re.compile(r"^/admin/teams(?:$|/)"), Permissions.TEAMS_READ),
146 # Tool management
147 ("POST", re.compile(r"^/admin/tools/?$"), Permissions.TOOLS_CREATE),
148 ("POST", re.compile(r"^/admin/tools/import(?:$|/)"), Permissions.TOOLS_CREATE),
149 ("POST", re.compile(r"^/admin/tools/[^/]+/delete(?:$|/)"), Permissions.TOOLS_DELETE),
150 ("POST", re.compile(r"^/admin/tools/[^/]+/(?:edit|state)(?:$|/)"), Permissions.TOOLS_UPDATE),
151 ("GET", re.compile(r"^/admin/tools(?:$|/)"), Permissions.TOOLS_READ),
152 # Resource management
153 ("POST", re.compile(r"^/admin/resources/?$"), Permissions.RESOURCES_CREATE),
154 ("POST", re.compile(r"^/admin/resources/[^/]+/delete(?:$|/)"), Permissions.RESOURCES_DELETE),
155 ("POST", re.compile(r"^/admin/resources/[^/]+/(?:edit|state)(?:$|/)"), Permissions.RESOURCES_UPDATE),
156 ("GET", re.compile(r"^/admin/resources(?:$|/)"), Permissions.RESOURCES_READ),
157 # Prompt management
158 ("POST", re.compile(r"^/admin/prompts/?$"), Permissions.PROMPTS_CREATE),
159 ("POST", re.compile(r"^/admin/prompts/[^/]+/delete(?:$|/)"), Permissions.PROMPTS_DELETE),
160 ("POST", re.compile(r"^/admin/prompts/[^/]+/(?:edit|state)(?:$|/)"), Permissions.PROMPTS_UPDATE),
161 ("GET", re.compile(r"^/admin/prompts(?:$|/)"), Permissions.PROMPTS_READ),
162 # Gateway management
163 ("POST", re.compile(r"^/admin/gateways/test(?:$|/)"), Permissions.GATEWAYS_READ),
164 ("POST", re.compile(r"^/admin/gateways/?$"), Permissions.GATEWAYS_CREATE),
165 ("POST", re.compile(r"^/admin/gateways/[^/]+/delete(?:$|/)"), Permissions.GATEWAYS_DELETE),
166 ("POST", re.compile(r"^/admin/gateways/[^/]+/(?:edit|state)(?:$|/)"), Permissions.GATEWAYS_UPDATE),
167 ("GET", re.compile(r"^/admin/gateways(?:$|/)"), Permissions.GATEWAYS_READ),
168 # Server management
169 ("POST", re.compile(r"^/admin/servers/?$"), Permissions.SERVERS_CREATE),
170 ("POST", re.compile(r"^/admin/servers/[^/]+/delete(?:$|/)"), Permissions.SERVERS_DELETE),
171 ("POST", re.compile(r"^/admin/servers/[^/]+/(?:edit|state)(?:$|/)"), Permissions.SERVERS_UPDATE),
172 ("GET", re.compile(r"^/admin/servers(?:$|/)"), Permissions.SERVERS_READ),
173 # Token/tag read surfaces
174 ("GET", re.compile(r"^/admin/tokens(?:$|/)"), Permissions.TOKENS_READ),
175 ("GET", re.compile(r"^/admin/tags(?:$|/)"), Permissions.TAGS_READ),
176 # A2A management
177 ("POST", re.compile(r"^/admin/a2a/?$"), Permissions.A2A_CREATE),
178 ("POST", re.compile(r"^/admin/a2a/[^/]+/delete(?:$|/)"), Permissions.A2A_DELETE),
179 ("POST", re.compile(r"^/admin/a2a/[^/]+/(?:edit|state)(?:$|/)"), Permissions.A2A_UPDATE),
180 ("POST", re.compile(r"^/admin/a2a/[^/]+/test(?:$|/)"), Permissions.A2A_INVOKE),
181 ("GET", re.compile(r"^/admin/a2a(?:$|/)"), Permissions.A2A_READ),
182 # Section partials
183 ("GET", re.compile(r"^/admin/sections/resources(?:$|/)"), Permissions.RESOURCES_READ),
184 ("GET", re.compile(r"^/admin/sections/prompts(?:$|/)"), Permissions.PROMPTS_READ),
185 ("GET", re.compile(r"^/admin/sections/servers(?:$|/)"), Permissions.SERVERS_READ),
186 ("GET", re.compile(r"^/admin/sections/gateways(?:$|/)"), Permissions.GATEWAYS_READ),
187 # Specialized admin domains
188 ("GET", re.compile(r"^/admin/events(?:$|/)"), Permissions.ADMIN_EVENTS),
189 ("GET", re.compile(r"^/admin/grpc(?:$|/)"), Permissions.ADMIN_GRPC),
190 ("POST", re.compile(r"^/admin/grpc(?:$|/)"), Permissions.ADMIN_GRPC),
191 ("PUT", re.compile(r"^/admin/grpc(?:$|/)"), Permissions.ADMIN_GRPC),
192 ("GET", re.compile(r"^/admin/plugins(?:$|/)"), Permissions.ADMIN_PLUGINS),
193 ("POST", re.compile(r"^/admin/plugins(?:$|/)"), Permissions.ADMIN_PLUGINS),
194 ("PUT", re.compile(r"^/admin/plugins(?:$|/)"), Permissions.ADMIN_PLUGINS),
195 ("DELETE", re.compile(r"^/admin/plugins(?:$|/)"), Permissions.ADMIN_PLUGINS),
196 # System configuration/admin operations
197 (
198 "GET",
199 re.compile(r"^/admin/(?:config|cache|mcp-pool|roots|metrics|logs|export|import|mcp-registry|system|support-bundle|maintenance|observability|performance|llm)(?:$|/)"),
200 Permissions.ADMIN_SYSTEM_CONFIG,
201 ),
202 (
203 "POST",
204 re.compile(r"^/admin/(?:config|cache|mcp-pool|roots|metrics|logs|export|import|mcp-registry|system|support-bundle|maintenance|observability|performance|llm)(?:$|/)"),
205 Permissions.ADMIN_SYSTEM_CONFIG,
206 ),
207 (
208 "PUT",
209 re.compile(r"^/admin/(?:config|cache|mcp-pool|roots|metrics|logs|export|import|mcp-registry|system|support-bundle|maintenance|observability|performance|llm)(?:$|/)"),
210 Permissions.ADMIN_SYSTEM_CONFIG,
211 ),
212 (
213 "DELETE",
214 re.compile(r"^/admin/(?:config|cache|mcp-pool|roots|metrics|logs|export|import|mcp-registry|system|support-bundle|maintenance|observability|performance|llm)(?:$|/)"),
215 Permissions.ADMIN_SYSTEM_CONFIG,
216 ),
217]
220def _normalize_llm_api_prefix(prefix: Optional[str]) -> str:
221 """Normalize llm_api_prefix to a canonical path prefix.
223 Args:
224 prefix: Raw LLM API prefix setting value.
226 Returns:
227 str: Normalized path prefix, or empty string when prefix is empty or "/".
228 """
229 if not prefix:
230 return ""
231 normalized = "/" + str(prefix).strip().strip("/")
232 return "" if normalized == "/" else normalized
235def _normalize_scope_path(scope_path: str, root_path: str) -> str:
236 """Strip ``root_path`` from ``scope_path`` when the incoming path includes it.
238 Args:
239 scope_path: Request path observed by middleware.
240 root_path: Application root path prefix, if configured.
242 Returns:
243 Path value normalized for permission and scope pattern matching.
244 """
245 if root_path and len(root_path) > 1:
246 root_path = root_path.rstrip("/")
247 if root_path and len(root_path) > 1 and scope_path.startswith(root_path):
248 rest = scope_path[len(root_path) :]
249 # root_path="/app" must not strip from "/application/..."
250 if rest == "" or rest.startswith("/"):
251 return rest or "/"
252 return scope_path
255@lru_cache(maxsize=16)
256def _get_llm_permission_patterns(prefix: str) -> Tuple[Tuple[str, Pattern[str], str], ...]:
257 """Build precompiled permission patterns for LLM proxy endpoints.
259 Args:
260 prefix: LLM API prefix used to mount proxy routes.
262 Returns:
263 Tuple[Tuple[str, Pattern[str], str], ...]: Method/path regex to required permission mappings.
264 """
265 normalized_prefix = _normalize_llm_api_prefix(prefix)
266 escaped_prefix = re.escape(normalized_prefix)
267 return (
268 # LLM proxy routes are exact endpoints (optionally with a trailing slash),
269 # unlike many REST resources that intentionally include sub-resources.
270 ("POST", re.compile(rf"^{escaped_prefix}/chat/completions/?$"), Permissions.LLM_INVOKE),
271 ("GET", re.compile(rf"^{escaped_prefix}/models/?$"), Permissions.LLM_READ),
272 )
275class TokenScopingMiddleware:
276 """Middleware to enforce token scoping restrictions.
278 Examples:
279 >>> middleware = TokenScopingMiddleware()
280 >>> isinstance(middleware, TokenScopingMiddleware)
281 True
282 """
284 def __init__(self):
285 """Initialize token scoping middleware.
287 Examples:
288 >>> middleware = TokenScopingMiddleware()
289 >>> hasattr(middleware, '_extract_token_scopes')
290 True
291 """
293 def _normalize_teams(self, teams) -> list:
294 """Normalize teams from token payload to list of team IDs.
296 Handles various team formats:
297 - None -> []
298 - List of strings -> as-is
299 - List of dicts with 'id' key -> extract IDs
301 Args:
302 teams: Raw teams value from JWT payload
304 Returns:
305 List of team ID strings
306 """
307 if not teams:
308 return []
309 normalized = []
310 for team in teams:
311 if isinstance(team, dict):
312 team_id = team.get("id")
313 if team_id:
314 normalized.append(team_id)
315 elif isinstance(team, str):
316 normalized.append(team)
317 return normalized
319 def _normalize_path_for_matching(self, request_path: str) -> str:
320 """Normalize a path for team scoping and permission matching.
322 Args:
323 request_path: Raw request path.
325 Returns:
326 Normalized absolute path suitable for route matching.
327 """
328 normalized = _normalize_scope_path(request_path or "/", settings.app_root_path or "")
329 if not normalized.startswith("/"):
330 return f"/{normalized}"
331 return normalized
333 def _get_normalized_request_path(self, request: Request) -> str:
334 """Resolve request path with APP_ROOT_PATH-aware normalization.
336 Args:
337 request: Request object containing scope and URL data.
339 Returns:
340 Normalized request path suitable for permission checks.
341 """
342 scope = getattr(request, "scope", {}) or {}
343 if not isinstance(scope, dict):
344 scope = {}
345 scope_path = request.url.path or scope.get("path") or "/"
346 root_path = scope.get("root_path") or settings.app_root_path or ""
347 normalized = _normalize_scope_path(scope_path, root_path)
348 if not normalized.startswith("/"):
349 return f"/{normalized}"
350 return normalized
352 def _extract_jwt_token_from_request(self, request: Request) -> Optional[str]:
353 """Extract JWT token from supported cookie names or Bearer auth header.
355 Args:
356 request: Request object carrying cookies and headers.
358 Returns:
359 JWT string when present and validly formatted; otherwise ``None``.
360 """
361 cookies = getattr(request, "cookies", None)
362 if cookies and hasattr(cookies, "get"):
363 for cookie_name in _AUTH_COOKIE_NAMES:
364 cookie_token = cookies.get(cookie_name)
365 if isinstance(cookie_token, str) and cookie_token.strip():
366 return cookie_token.strip()
368 # Get authorization header and parse bearer scheme case-insensitively.
369 auth_header = request.headers.get("Authorization")
370 if not auth_header:
371 return None
373 parts = auth_header.split(" ", 1)
374 if len(parts) != 2 or parts[0].lower() != "bearer":
375 return None
377 token = parts[1].strip()
378 return token or None
380 async def _extract_token_scopes(self, request: Request) -> Optional[dict]:
381 """Extract token scopes from JWT in request.
383 Args:
384 request: FastAPI request object
386 Returns:
387 Dict containing token scopes or None if no valid token
388 """
389 token = self._extract_jwt_token_from_request(request)
390 if not token:
391 return None
393 try:
394 # Use the centralized verify_jwt_token_cached function for consistent JWT validation
395 payload = await verify_jwt_token_cached(token, request)
396 return payload
397 except HTTPException:
398 # Token validation failed (expired, invalid, etc.)
399 return None
400 except Exception:
401 # Any other error in token validation
402 return None
404 def _get_client_ip(self, request: Request) -> str:
405 """Extract client IP address from request.
407 Only trusts X-Forwarded-For / X-Real-IP headers when a trusted proxy
408 configuration is in place (ProxyHeadersMiddleware with specific hosts).
409 Otherwise, uses the direct connection IP to prevent header spoofing.
411 Args:
412 request: FastAPI request object
414 Returns:
415 str: Client IP address
416 """
417 # Use direct client IP as the secure default.
418 # Proxy headers are only trustworthy when Uvicorn/Starlette's
419 # ProxyHeadersMiddleware has already rewritten request.client from a
420 # trusted upstream. That middleware replaces request.client.host with
421 # the real client IP, so we can rely on it directly.
422 return request.client.host if request.client else "unknown"
424 def _check_ip_restrictions(self, client_ip: str, ip_restrictions: list) -> bool:
425 """Check if client IP is allowed by restrictions.
427 Args:
428 client_ip: Client's IP address
429 ip_restrictions: List of allowed IP addresses/CIDR ranges
431 Returns:
432 bool: True if IP is allowed, False otherwise
434 Examples:
435 Allow specific IP:
436 >>> m = TokenScopingMiddleware()
437 >>> m._check_ip_restrictions('192.168.1.10', ['192.168.1.10'])
438 True
440 Allow CIDR range:
441 >>> m._check_ip_restrictions('10.0.0.5', ['10.0.0.0/24'])
442 True
444 Deny when not in list:
445 >>> m._check_ip_restrictions('10.0.1.5', ['10.0.0.0/24'])
446 False
448 Empty restrictions allow all:
449 >>> m._check_ip_restrictions('203.0.113.1', [])
450 True
451 """
452 if not ip_restrictions:
453 return True # No restrictions
455 try:
456 client_ip_obj = ipaddress.ip_address(client_ip)
458 for restriction in ip_restrictions:
459 try:
460 # Check if it's a CIDR range
461 if "/" in restriction:
462 network = ipaddress.ip_network(restriction, strict=False)
463 if client_ip_obj in network:
464 return True
465 else:
466 # Single IP address
467 if client_ip_obj == ipaddress.ip_address(restriction):
468 return True
469 except (ValueError, ipaddress.AddressValueError):
470 continue
472 except (ValueError, ipaddress.AddressValueError):
473 return False
475 return False
477 def _check_time_restrictions(self, time_restrictions: dict) -> bool:
478 """Check if current time is allowed by restrictions.
480 Args:
481 time_restrictions: Dict containing time-based restrictions
483 Returns:
484 bool: True if current time is allowed, False otherwise
486 Examples:
487 No restrictions allow access:
488 >>> m = TokenScopingMiddleware()
489 >>> m._check_time_restrictions({})
490 True
492 Weekdays only: result depends on current weekday (always bool):
493 >>> isinstance(m._check_time_restrictions({'weekdays_only': True}), bool)
494 True
496 Business hours only: result depends on current hour (always bool):
497 >>> isinstance(m._check_time_restrictions({'business_hours_only': True}), bool)
498 True
499 """
500 if not time_restrictions:
501 return True # No restrictions
503 now = datetime.now(tz=timezone.utc)
505 # Check business hours restriction
506 if time_restrictions.get("business_hours_only"):
507 # Assume business hours are 9 AM to 5 PM UTC
508 # This could be made configurable
509 if not 9 <= now.hour < 17:
510 return False
512 # Check day of week restrictions
513 weekdays_only = time_restrictions.get("weekdays_only")
514 if weekdays_only and now.weekday() >= 5: # Saturday=5, Sunday=6
515 return False
517 return True
519 @staticmethod
520 def _parse_positive_limit(value: object) -> Optional[int]:
521 """Parse usage-limit values as positive integers.
523 Args:
524 value: Candidate limit value from token scope configuration.
526 Returns:
527 Parsed positive integer limit, or ``None`` when invalid/non-positive.
528 """
529 try:
530 parsed = int(value)
531 except (TypeError, ValueError):
532 return None
533 return parsed if parsed > 0 else None
535 def _check_usage_limits(self, jti: Optional[str], usage_limits: dict) -> Tuple[bool, Optional[str]]:
536 """Check token usage limits against recorded usage logs.
538 Args:
539 jti: Token JTI identifier.
540 usage_limits: Usage limits from token scope.
542 Returns:
543 Tuple[bool, Optional[str]]: (allowed, denial_reason)
544 """
545 if not isinstance(usage_limits, dict) or not usage_limits or not jti:
546 return True, None
548 requests_per_hour = self._parse_positive_limit(usage_limits.get("requests_per_hour"))
549 requests_per_day = self._parse_positive_limit(usage_limits.get("requests_per_day"))
551 if not requests_per_hour and not requests_per_day:
552 return True, None
554 # First-Party
555 from mcpgateway.db import get_db, TokenUsageLog # pylint: disable=import-outside-toplevel
557 db = next(get_db())
558 try:
559 now = datetime.now(timezone.utc)
561 if requests_per_hour:
562 hour_window_start = now - timedelta(hours=1)
563 hourly_count = db.execute(
564 # Pylint false-positive: SQLAlchemy func namespace is callable at runtime.
565 # pylint: disable=not-callable
566 select(func.count(TokenUsageLog.id)).where(and_(TokenUsageLog.token_jti == jti, TokenUsageLog.timestamp >= hour_window_start))
567 ).scalar()
568 if int(hourly_count or 0) >= requests_per_hour:
569 return False, "Hourly request limit exceeded"
571 if requests_per_day:
572 day_window_start = now - timedelta(days=1)
573 daily_count = db.execute(
574 # Pylint false-positive: SQLAlchemy func namespace is callable at runtime.
575 # pylint: disable=not-callable
576 select(func.count(TokenUsageLog.id)).where(and_(TokenUsageLog.token_jti == jti, TokenUsageLog.timestamp >= day_window_start))
577 ).scalar()
578 if int(daily_count or 0) >= requests_per_day:
579 return False, "Daily request limit exceeded"
580 except Exception as exc:
581 logger.warning("Failed to evaluate token usage limits for jti %s: %s", jti, exc)
582 return True, None
583 finally:
584 try:
585 db.rollback()
586 finally:
587 db.close()
589 return True, None
591 def _check_server_restriction(self, request_path: str, server_id: Optional[str]) -> bool:
592 """Check if request path matches server restriction.
594 Args:
595 request_path: The request path/URL
596 server_id: Required server ID (None means no restriction)
598 Returns:
599 bool: True if request is allowed, False otherwise
601 Examples:
602 Match server paths:
603 >>> m = TokenScopingMiddleware()
604 >>> m._check_server_restriction('/servers/abc/tools', 'abc')
605 True
606 >>> m._check_server_restriction('/sse/xyz', 'xyz')
607 True
608 >>> m._check_server_restriction('/ws/xyz?x=1', 'xyz')
609 True
611 Mismatch denies:
612 >>> m._check_server_restriction('/servers/def', 'abc')
613 False
615 General endpoints allowed:
616 >>> m._check_server_restriction('/health', 'abc')
617 True
618 >>> m._check_server_restriction('/', 'abc')
619 True
620 """
621 request_path = self._normalize_path_for_matching(request_path)
623 if not server_id:
624 return True # No server restriction
626 # Extract server ID from path patterns (uses precompiled regex)
627 # /servers/{server_id}/...
628 # /sse/{server_id}
629 # /ws/{server_id}
630 for pattern in _SERVER_PATH_PATTERNS:
631 match = pattern.search(request_path)
632 if match:
633 path_server_id = match.group(1)
634 return path_server_id == server_id
636 # If no server ID found in path, allow general endpoints
637 general_endpoints = ["/health", "/metrics", "/openapi.json", "/docs", "/redoc", "/rpc", "/mcp", "/sse"]
639 # Check exact root path separately
640 if request_path == "/":
641 return True
643 for endpoint in general_endpoints:
644 if request_path.startswith(endpoint):
645 return True
647 # Default deny for unmatched paths with server restrictions
648 return False
650 def _check_permission_restrictions(self, request_path: str, request_method: str, permissions: list) -> bool:
651 """Check if request is allowed by permission restrictions.
653 Args:
654 request_path: The request path/URL
655 request_method: HTTP method (GET, POST, etc.)
656 permissions: List of allowed permissions
658 Returns:
659 bool: True if request is allowed, False otherwise
661 Examples:
662 Wildcard allows all:
663 >>> m = TokenScopingMiddleware()
664 >>> m._check_permission_restrictions('/tools', 'GET', ['*'])
665 True
667 Requires specific permission:
668 >>> m._check_permission_restrictions('/tools', 'POST', ['tools.create'])
669 True
670 >>> m._check_permission_restrictions('/tools/xyz', 'PUT', ['tools.update'])
671 True
672 >>> m._check_permission_restrictions('/resources', 'GET', ['resources.read'])
673 True
674 >>> m._check_permission_restrictions('/servers/s1/tools/abc/call', 'POST', ['tools.execute'])
675 True
677 Missing permission denies:
678 >>> m._check_permission_restrictions('/tools', 'POST', ['tools.read'])
679 False
680 """
681 request_path = self._normalize_path_for_matching(request_path)
683 if not permissions or "*" in permissions:
684 return True # No restrictions or full access
686 # Handle admin routes with granular route-group mapping.
687 # Unmapped /admin/* paths are denied by default (fail-secure).
688 if request_path.startswith("/admin"):
689 for method, path_pattern, required_permission in _ADMIN_PERMISSION_PATTERNS:
690 if request_method == method and path_pattern.match(request_path):
691 return required_permission in permissions
692 return False
694 # Check each permission mapping (uses precompiled regex patterns)
695 for method, path_pattern, required_permission in _PERMISSION_PATTERNS:
696 if request_method == method and path_pattern.match(request_path):
697 if required_permission in permissions:
698 return True
699 # Runtime compensation: tokens with MCP method permissions
700 # (tools.*, resources.*, prompts.*) implicitly have transport
701 # access (servers.use) — mirrors the generation-time injection
702 # in token_catalog_service._generate_token() for pre-existing tokens.
703 if required_permission == Permissions.SERVERS_USE:
704 if any(p.startswith(Permissions.MCP_METHOD_PREFIXES) for p in permissions):
705 logger.debug("Runtime servers.use compensation applied for token with MCP method permissions: %s", permissions)
706 return True
707 return False
708 return False
710 # LLM proxy permissions (respect configured llm_api_prefix).
711 for method, path_pattern, required_permission in _get_llm_permission_patterns(settings.llm_api_prefix):
712 if request_method == method and path_pattern.match(request_path):
713 return required_permission in permissions
715 # Default deny for unmatched paths (requires explicit permission mapping)
716 return False
718 def _check_team_membership(self, payload: dict, db=None) -> bool:
719 """
720 Check if user still belongs to teams in the token.
722 For public-only tokens (no teams), always returns True.
723 For team-scoped tokens, validates membership with caching.
725 Uses in-memory cache (per gateway instance, 60s TTL) to avoid repeated
726 email_team_members queries for the same user+teams combination.
727 Note: Sync path uses in-memory only for performance; Redis is not
728 consulted to avoid async overhead in the hot path.
730 Args:
731 payload: Decoded JWT payload containing teams
732 db: Optional database session. If provided, caller manages lifecycle.
733 If None, creates and manages its own session.
735 Returns:
736 bool: True if team membership is valid, False otherwise
737 """
738 teams = payload.get("teams", [])
739 user_email = payload.get("sub")
741 # PUBLIC-ONLY TOKEN: No team validation needed
742 if not teams or len(teams) == 0:
743 logger.debug(f"Public-only token for user {user_email} - no team validation required")
744 return True
746 # TEAM-SCOPED TOKEN: Validate membership
747 if not user_email:
748 logger.warning("Token missing user email")
749 return False
751 # Extract team IDs from token (handles both dict and string formats)
752 team_ids = [team["id"] if isinstance(team, dict) else team for team in teams]
754 # First-Party
755 from mcpgateway.cache.auth_cache import get_auth_cache # pylint: disable=import-outside-toplevel
757 # Check cache first (synchronous in-memory lookup)
758 auth_cache = get_auth_cache()
759 cached_result = auth_cache.get_team_membership_valid_sync(user_email, team_ids)
760 if cached_result is not None:
761 if not cached_result:
762 logger.warning(f"Token invalid (cached): User {user_email} no longer member of teams")
763 return cached_result
765 # Cache miss - query database
766 # First-Party
767 from mcpgateway.db import EmailTeamMember, get_db # pylint: disable=import-outside-toplevel
769 # Track if we own the session (and thus must clean it up)
770 owns_session = db is None
771 if owns_session:
772 db = next(get_db())
774 try:
775 # Single query for all teams (fixes N+1 pattern)
776 memberships = (
777 db.execute(
778 select(EmailTeamMember.team_id).where(
779 EmailTeamMember.team_id.in_(team_ids),
780 EmailTeamMember.user_email == user_email,
781 EmailTeamMember.is_active.is_(True),
782 )
783 )
784 .scalars()
785 .all()
786 )
788 # Check if user is member of ALL teams in token
789 valid_team_ids = set(memberships)
790 missing_teams = set(team_ids) - valid_team_ids
792 if missing_teams:
793 logger.warning(f"Token invalid: User {user_email} no longer member of teams: {missing_teams}")
794 # Cache negative result
795 auth_cache.set_team_membership_valid_sync(user_email, team_ids, False)
796 return False
798 # Cache positive result
799 auth_cache.set_team_membership_valid_sync(user_email, team_ids, True)
800 return True
801 finally:
802 # Only commit/close if we created the session
803 if owns_session:
804 try:
805 db.commit() # Commit read-only transaction to avoid implicit rollback
806 finally:
807 db.close()
809 def _check_resource_team_ownership(self, request_path: str, token_teams: list, db=None, _user_email: str = None) -> bool: # noqa: PLR0911 # pylint: disable=too-many-return-statements
810 """
811 Check if the requested resource is accessible by the token.
813 Implements Three-Tier Resource Visibility (Public/Team/Private):
814 - PUBLIC: Accessible by all tokens (public-only and team-scoped)
815 - TEAM: Accessible only by tokens scoped to that specific team
816 - PRIVATE: Accessible only by tokens scoped to that specific team
818 Token Access Rules:
819 - Public-only tokens (empty token_teams): Can access public resources + their own resources
820 - Team-scoped tokens: Can access their team's resources + public resources
822 Handles URLs like:
823 - /servers/{id}/mcp
824 - /servers/{id}/sse
825 - /servers/{id}
826 - /tools/{id}/execute
827 - /tools/{id}
828 - /resources/{id}
829 - /prompts/{id}
831 Args:
832 request_path: The request path/URL
833 token_teams: List of team IDs from the token (empty list = public-only token)
834 db: Optional database session. If provided, caller manages lifecycle.
835 If None, creates and manages its own session.
837 Returns:
838 bool: True if resource access is allowed, False otherwise
839 """
840 request_path = self._normalize_path_for_matching(request_path)
842 # Normalize token_teams: extract team IDs from dict objects (backward compatibility)
843 token_team_ids = []
844 for team in token_teams:
845 if isinstance(team, dict):
846 token_team_ids.append(team["id"])
847 else:
848 token_team_ids.append(team)
850 # Determine token type
851 is_public_token = not token_team_ids or len(token_team_ids) == 0
853 if is_public_token:
854 logger.debug("Processing request with PUBLIC-ONLY token")
855 else:
856 logger.debug(f"Processing request with TEAM-SCOPED token (teams: {token_teams})")
858 # Extract resource type and ID from path (uses precompiled regex patterns)
859 # IDs are UUID hex strings (32 chars) or UUID with dashes (36 chars)
860 resource_id = None
861 resource_type = None
863 for pattern, rtype in _RESOURCE_PATTERNS:
864 match = pattern.search(request_path)
865 if match:
866 resource_id = match.group(1)
867 resource_type = rtype
868 logger.debug(f"Extracted {rtype} ID: {resource_id} from path: {request_path}")
869 break
871 # If no resource ID in path, allow (general endpoints like /health, /tokens, /metrics)
872 if not resource_id or not resource_type:
873 logger.debug(f"No resource ID found in path {request_path}, allowing access")
874 return True
876 # Import database models
877 # First-Party
878 from mcpgateway.db import Gateway, get_db, Prompt, Resource, Server, Tool # pylint: disable=import-outside-toplevel
880 # Track if we own the session (and thus must clean it up)
881 owns_session = db is None
882 if owns_session:
883 db = next(get_db())
885 try:
886 # Check Virtual Servers
887 if resource_type == "server":
888 server = db.execute(select(Server).where(Server.id == resource_id)).scalar_one_or_none()
890 if not server:
891 logger.warning(f"Server {resource_id} not found in database")
892 return False
894 # Get server visibility (default to 'team' if field doesn't exist)
895 server_visibility = getattr(server, "visibility", "team")
897 # PUBLIC SERVERS: Accessible by everyone (including public-only tokens)
898 if server_visibility == "public":
899 logger.debug(f"Access granted: Server {resource_id} is PUBLIC")
900 return True
902 # PUBLIC-ONLY TOKEN: Can ONLY access public servers (strict public-only policy)
903 # No owner access - if user needs own resources, use a personal team-scoped token
904 if is_public_token:
905 logger.warning(f"Access denied: Public-only token cannot access {server_visibility} server {resource_id}")
906 return False
908 # TEAM-SCOPED SERVERS: Check if server belongs to token's teams
909 if server_visibility == "team":
910 if server.team_id in token_team_ids:
911 logger.debug(f"Access granted: Team server {resource_id} belongs to token's team {server.team_id}")
912 return True
914 logger.warning(f"Access denied: Server {resource_id} is team-scoped to '{server.team_id}', token is scoped to teams {token_team_ids}")
915 return False
917 # PRIVATE SERVERS: Owner-only access (per RBAC doc)
918 if server_visibility == "private":
919 server_owner = getattr(server, "owner_email", None)
920 if server_owner and server_owner == _user_email:
921 logger.debug(f"Access granted: Private server {resource_id} owned by {_user_email}")
922 return True
924 logger.warning(f"Access denied: Server {resource_id} is private, owner is '{server_owner}', requester is '{_user_email}'")
925 return False
927 # Unknown visibility - deny by default
928 logger.warning(f"Access denied: Server {resource_id} has unknown visibility: {server_visibility}")
929 return False
931 # CHECK TOOLS
932 if resource_type == "tool":
933 tool = db.execute(select(Tool).where(Tool.id == resource_id)).scalar_one_or_none()
935 if not tool:
936 logger.warning(f"Tool {resource_id} not found in database")
937 return False
939 # Get tool visibility (default to 'team' if field doesn't exist)
940 tool_visibility = getattr(tool, "visibility", "team")
942 # PUBLIC TOOLS: Accessible by everyone (including public-only tokens)
943 if tool_visibility == "public":
944 logger.debug(f"Access granted: Tool {resource_id} is PUBLIC")
945 return True
947 # PUBLIC-ONLY TOKEN: Can ONLY access public tools (strict public-only policy)
948 # No owner access - if user needs own resources, use a personal team-scoped token
949 if is_public_token:
950 logger.warning(f"Access denied: Public-only token cannot access {tool_visibility} tool {resource_id}")
951 return False
953 # TEAM TOOLS: Check if tool's team matches token's teams
954 if tool_visibility == "team":
955 tool_team_id = getattr(tool, "team_id", None)
956 if tool_team_id and tool_team_id in token_team_ids:
957 logger.debug(f"Access granted: Team tool {resource_id} belongs to token's team {tool_team_id}")
958 return True
960 logger.warning(f"Access denied: Tool {resource_id} is team-scoped to '{tool_team_id}', token is scoped to teams {token_team_ids}")
961 return False
963 # PRIVATE TOOLS: Owner-only access (per RBAC doc)
964 if tool_visibility in ["private", "user"]:
965 tool_owner = getattr(tool, "owner_email", None)
966 if tool_owner and tool_owner == _user_email:
967 logger.debug(f"Access granted: Private tool {resource_id} owned by {_user_email}")
968 return True
970 logger.warning(f"Access denied: Tool {resource_id} is {tool_visibility}, owner is '{tool_owner}', requester is '{_user_email}'")
971 return False
973 # Unknown visibility - deny by default
974 logger.warning(f"Access denied: Tool {resource_id} has unknown visibility: {tool_visibility}")
975 return False
977 # CHECK RESOURCES
978 if resource_type == "resource":
979 resource = db.execute(select(Resource).where(Resource.id == resource_id)).scalar_one_or_none()
981 if not resource:
982 logger.warning(f"Resource {resource_id} not found in database")
983 return False
985 # Get resource visibility (default to 'team' if field doesn't exist)
986 resource_visibility = getattr(resource, "visibility", "team")
988 # PUBLIC RESOURCES: Accessible by everyone (including public-only tokens)
989 if resource_visibility == "public":
990 logger.debug(f"Access granted: Resource {resource_id} is PUBLIC")
991 return True
993 # PUBLIC-ONLY TOKEN: Can ONLY access public resources (strict public-only policy)
994 # No owner access - if user needs own resources, use a personal team-scoped token
995 if is_public_token:
996 logger.warning(f"Access denied: Public-only token cannot access {resource_visibility} resource {resource_id}")
997 return False
999 # TEAM RESOURCES: Check if resource's team matches token's teams
1000 if resource_visibility == "team":
1001 resource_team_id = getattr(resource, "team_id", None)
1002 if resource_team_id and resource_team_id in token_team_ids:
1003 logger.debug(f"Access granted: Team resource {resource_id} belongs to token's team {resource_team_id}")
1004 return True
1006 logger.warning(f"Access denied: Resource {resource_id} is team-scoped to '{resource_team_id}', token is scoped to teams {token_team_ids}")
1007 return False
1009 # PRIVATE RESOURCES: Owner-only access (per RBAC doc)
1010 if resource_visibility in ["private", "user"]:
1011 resource_owner = getattr(resource, "owner_email", None)
1012 if resource_owner and resource_owner == _user_email:
1013 logger.debug(f"Access granted: Private resource {resource_id} owned by {_user_email}")
1014 return True
1016 logger.warning(f"Access denied: Resource {resource_id} is {resource_visibility}, owner is '{resource_owner}', requester is '{_user_email}'")
1017 return False
1019 # Unknown visibility - deny by default
1020 logger.warning(f"Access denied: Resource {resource_id} has unknown visibility: {resource_visibility}")
1021 return False
1023 # CHECK PROMPTS
1024 if resource_type == "prompt":
1025 prompt = db.execute(select(Prompt).where(Prompt.id == resource_id)).scalar_one_or_none()
1027 if not prompt:
1028 logger.warning(f"Prompt {resource_id} not found in database")
1029 return False
1031 # Get prompt visibility (default to 'team' if field doesn't exist)
1032 prompt_visibility = getattr(prompt, "visibility", "team")
1034 # PUBLIC PROMPTS: Accessible by everyone (including public-only tokens)
1035 if prompt_visibility == "public":
1036 logger.debug(f"Access granted: Prompt {resource_id} is PUBLIC")
1037 return True
1039 # PUBLIC-ONLY TOKEN: Can ONLY access public prompts (strict public-only policy)
1040 # No owner access - if user needs own resources, use a personal team-scoped token
1041 if is_public_token:
1042 logger.warning(f"Access denied: Public-only token cannot access {prompt_visibility} prompt {resource_id}")
1043 return False
1045 # TEAM PROMPTS: Check if prompt's team matches token's teams
1046 if prompt_visibility == "team":
1047 prompt_team_id = getattr(prompt, "team_id", None)
1048 if prompt_team_id and prompt_team_id in token_team_ids:
1049 logger.debug(f"Access granted: Team prompt {resource_id} belongs to token's team {prompt_team_id}")
1050 return True
1052 logger.warning(f"Access denied: Prompt {resource_id} is team-scoped to '{prompt_team_id}', token is scoped to teams {token_team_ids}")
1053 return False
1055 # PRIVATE PROMPTS: Owner-only access (per RBAC doc)
1056 if prompt_visibility in ["private", "user"]:
1057 prompt_owner = getattr(prompt, "owner_email", None)
1058 if prompt_owner and prompt_owner == _user_email:
1059 logger.debug(f"Access granted: Private prompt {resource_id} owned by {_user_email}")
1060 return True
1062 logger.warning(f"Access denied: Prompt {resource_id} is {prompt_visibility}, owner is '{prompt_owner}', requester is '{_user_email}'")
1063 return False
1065 # Unknown visibility - deny by default
1066 logger.warning(f"Access denied: Prompt {resource_id} has unknown visibility: {prompt_visibility}")
1067 return False
1069 # CHECK GATEWAYS
1070 if resource_type == "gateway":
1071 gateway = db.execute(select(Gateway).where(Gateway.id == resource_id)).scalar_one_or_none()
1073 if not gateway:
1074 logger.warning(f"Gateway {resource_id} not found in database")
1075 return False
1077 # Get gateway visibility (default to 'team' if field doesn't exist)
1078 gateway_visibility = getattr(gateway, "visibility", "team")
1080 # PUBLIC GATEWAYS: Accessible by everyone (including public-only tokens)
1081 if gateway_visibility == "public":
1082 logger.debug(f"Access granted: Gateway {resource_id} is PUBLIC")
1083 return True
1085 # PUBLIC-ONLY TOKEN: Can ONLY access public gateways (strict public-only policy)
1086 # No owner access - if user needs own resources, use a personal team-scoped token
1087 if is_public_token:
1088 logger.warning(f"Access denied: Public-only token cannot access {gateway_visibility} gateway {resource_id}")
1089 return False
1091 # TEAM GATEWAYS: Check if gateway's team matches token's teams
1092 if gateway_visibility == "team":
1093 gateway_team_id = getattr(gateway, "team_id", None)
1094 if gateway_team_id and gateway_team_id in token_team_ids:
1095 logger.debug(f"Access granted: Team gateway {resource_id} belongs to token's team {gateway_team_id}")
1096 return True
1098 logger.warning(f"Access denied: Gateway {resource_id} is team-scoped to '{gateway_team_id}', token is scoped to teams {token_team_ids}")
1099 return False
1101 # PRIVATE GATEWAYS: Owner-only access (per RBAC doc)
1102 if gateway_visibility in ["private", "user"]:
1103 gateway_owner = getattr(gateway, "owner_email", None)
1104 if gateway_owner and gateway_owner == _user_email:
1105 logger.debug(f"Access granted: Private gateway {resource_id} owned by {_user_email}")
1106 return True
1108 logger.warning(f"Access denied: Gateway {resource_id} is {gateway_visibility}, owner is '{gateway_owner}', requester is '{_user_email}'")
1109 return False
1111 # Unknown visibility - deny by default
1112 logger.warning(f"Access denied: Gateway {resource_id} has unknown visibility: {gateway_visibility}")
1113 return False
1115 # UNKNOWN RESOURCE TYPE
1116 logger.warning(f"Unknown resource type '{resource_type}' for path: {request_path}")
1117 return False
1119 except Exception as e:
1120 logger.error(f"Error checking resource team ownership for {request_path}: {e}", exc_info=True)
1121 # Fail securely - deny access on error
1122 return False
1123 finally:
1124 # Only commit/close if we created the session
1125 if owns_session:
1126 try:
1127 db.commit() # Commit read-only transaction to avoid implicit rollback
1128 finally:
1129 db.close()
1131 async def __call__(self, request: Request, call_next):
1132 """Middleware function to check token scoping including team-level validation.
1134 Args:
1135 request: FastAPI request object
1136 call_next: Next middleware/handler in chain
1138 Returns:
1139 Response from next handler or HTTPException
1141 Raises:
1142 HTTPException: If token scoping restrictions are violated
1143 """
1144 try:
1145 # Skip if already scoped (prevents double-scoping for /mcp requests)
1146 # MCPPathRewriteMiddleware runs scoping via dispatch, then routes through
1147 # middleware stack which hits BaseHTTPMiddleware's scoping again.
1148 # Use request.state flag which persists across middleware invocations.
1149 if getattr(request.state, "_token_scoping_done", False):
1150 return await call_next(request)
1152 # Mark as scoped before doing any work
1153 request.state._token_scoping_done = True
1155 normalized_path = self._get_normalized_request_path(request)
1157 # Skip scoping for certain paths (truly public endpoints only)
1158 skip_paths = [
1159 "/health",
1160 "/openapi.json",
1161 "/docs",
1162 "/redoc",
1163 "/auth/email/login",
1164 "/auth/email/register",
1165 "/.well-known/",
1166 ]
1168 # Check exact root path separately
1169 if normalized_path == "/":
1170 return await call_next(request)
1172 if any(normalized_path.startswith(path) for path in skip_paths):
1173 return await call_next(request)
1175 # Skip server-specific well-known endpoints (RFC 9728)
1176 if re.match(r"^/servers/[^/]+/\.well-known/", normalized_path):
1177 return await call_next(request)
1179 # Extract full token payload (not just scopes)
1180 payload = await self._extract_token_scopes(request)
1182 # If no payload, continue (regular auth will handle this)
1183 if not payload:
1184 return await call_next(request)
1186 # TEAM VALIDATION: Use single DB session for both team checks
1187 # This reduces connection pool overhead from 2 sessions to 1 for resource endpoints
1188 user_email = payload.get("sub") or payload.get("email") # Extract user email for ownership check
1190 # Resolve teams based on token_use claim
1191 token_use = payload.get("token_use")
1192 if token_use == "session": # nosec B105 - Not a password; token_use is a JWT claim type
1193 # Session token: resolve teams from DB/cache directly
1194 # Cannot rely on request.state.token_teams — AuthContextMiddleware
1195 # is gated by security_logging_enabled (defaults to False)
1196 # First-Party
1197 from mcpgateway.auth import _resolve_teams_from_db # pylint: disable=import-outside-toplevel
1199 is_admin = payload.get("is_admin", False) or payload.get("user", {}).get("is_admin", False)
1200 user_info = {"is_admin": is_admin}
1201 token_teams = await _resolve_teams_from_db(user_email, user_info)
1202 else:
1203 # API token or legacy: use embedded teams with normalize_token_teams
1204 token_teams = normalize_token_teams(payload)
1206 # Check if admin bypass is active (token_teams is None means admin with explicit null teams)
1207 is_admin_bypass = token_teams is None
1209 # Admin with explicit null teams bypasses team validation entirely
1210 if is_admin_bypass:
1211 logger.debug(f"Admin bypass: skipping team validation for {user_email}")
1212 # Skip to other checks (server_id, IP, etc.)
1213 elif token_teams:
1214 # First-Party
1215 from mcpgateway.db import get_db # pylint: disable=import-outside-toplevel
1217 db = next(get_db())
1218 try:
1219 # Check team membership with shared session
1220 if not self._check_team_membership(payload, db=db):
1221 logger.warning("Token rejected: User no longer member of associated team(s)")
1222 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token is invalid: User is no longer a member of the associated team")
1224 # Check resource team ownership with shared session
1225 if not self._check_resource_team_ownership(normalized_path, token_teams, db=db, _user_email=user_email):
1226 logger.warning(f"Access denied: Resource does not belong to token's teams {token_teams}")
1227 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
1228 finally:
1229 # Ensure session cleanup even if checks raise exceptions
1230 try:
1231 db.commit()
1232 finally:
1233 db.close()
1234 else:
1235 # Public-only token: no team membership check needed, but still check resource ownership
1236 if not self._check_team_membership(payload):
1237 logger.warning("Token rejected: User no longer member of associated team(s)")
1238 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token is invalid: User is no longer a member of the associated team")
1240 if not self._check_resource_team_ownership(normalized_path, token_teams, _user_email=user_email):
1241 logger.warning(f"Access denied: Resource does not belong to token's teams {token_teams}")
1242 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
1244 # Extract scopes from payload
1245 scopes = payload.get("scopes", {})
1247 # Check server ID restriction
1248 server_id = scopes.get("server_id")
1249 if not self._check_server_restriction(normalized_path, server_id):
1250 logger.warning(f"Token not authorized for this server. Required: {server_id}")
1251 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
1253 # Check IP restrictions
1254 ip_restrictions = scopes.get("ip_restrictions", [])
1255 if ip_restrictions:
1256 client_ip = self._get_client_ip(request)
1257 if not self._check_ip_restrictions(client_ip, ip_restrictions):
1258 logger.warning(f"Request from IP {client_ip} not allowed by token restrictions")
1259 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
1261 # Check time restrictions
1262 time_restrictions = scopes.get("time_restrictions", {})
1263 if not self._check_time_restrictions(time_restrictions):
1264 logger.warning("Request not allowed at this time by token restrictions")
1265 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Request not allowed at this time by token restrictions")
1267 # Check permission restrictions
1268 permissions = scopes.get("permissions", [])
1269 if not self._check_permission_restrictions(normalized_path, request.method, permissions):
1270 logger.warning("Insufficient permissions for this operation")
1271 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG)
1273 # Check optional token usage limits.
1274 usage_limits = scopes.get("usage_limits", {})
1275 usage_allowed, usage_reason = self._check_usage_limits(payload.get("jti"), usage_limits)
1276 if not usage_allowed:
1277 logger.warning("Token usage limit exceeded for jti %s: %s", payload.get("jti"), usage_reason)
1278 raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=usage_reason or "Token usage limit exceeded")
1280 # All scoping checks passed, continue
1281 return await call_next(request)
1283 except HTTPException as exc:
1284 # Return clean JSON response instead of traceback
1285 return ORJSONResponse(
1286 status_code=exc.status_code,
1287 content={"detail": exc.detail},
1288 )
1291# Create middleware instance
1292token_scoping_middleware = TokenScopingMiddleware()