Coverage for mcpgateway / middleware / token_scoping.py: 99%
382 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/middleware/token_scoping.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Token Scoping Middleware.
8This middleware enforces token scoping restrictions at the API level,
9including server_id restrictions, IP restrictions, permission checks,
10and time-based restrictions.
11"""
13# Standard
14from datetime import datetime, timezone
15import ipaddress
16import re
17from typing import List, Optional, Pattern, Tuple
19# Third-Party
20from fastapi import HTTPException, Request, status
21from fastapi.security import HTTPBearer
23# First-Party
24from mcpgateway.auth import normalize_token_teams
25from mcpgateway.db import Permissions
26from mcpgateway.services.logging_service import LoggingService
27from mcpgateway.utils.orjson_response import ORJSONResponse
28from mcpgateway.utils.verify_credentials import verify_jwt_token_cached
30# Security scheme
31bearer_scheme = HTTPBearer(auto_error=False)
33# Initialize logging service first
34logging_service = LoggingService()
35logger = logging_service.get_logger(__name__)
37# ============================================================================
38# Precompiled regex patterns (compiled once at module load for performance)
39# ============================================================================
41# Server path extraction patterns
42_SERVER_PATH_PATTERNS: List[Pattern[str]] = [
43 re.compile(r"^/servers/([^/]+)(?:$|/)"),
44 re.compile(r"^/sse/([^/?]+)(?:$|\?)"),
45 re.compile(r"^/ws/([^/?]+)(?:$|\?)"),
46]
48# Resource ID extraction patterns (IDs are UUID hex strings)
49_RESOURCE_PATTERNS: List[Tuple[Pattern[str], str]] = [
50 (re.compile(r"/servers/?([a-f0-9\-]+)"), "server"),
51 (re.compile(r"/tools/?([a-f0-9\-]+)"), "tool"),
52 (re.compile(r"/resources/?([a-f0-9\-]+)"), "resource"),
53 (re.compile(r"/prompts/?([a-f0-9\-]+)"), "prompt"),
54 (re.compile(r"/gateways/?([a-f0-9\-]+)"), "gateway"),
55]
57# Permission map with precompiled patterns
58# Maps (HTTP method, path pattern) to required permission
59_PERMISSION_PATTERNS: List[Tuple[str, Pattern[str], str]] = [
60 # Tools permissions
61 ("GET", re.compile(r"^/tools(?:$|/)"), Permissions.TOOLS_READ),
62 ("POST", re.compile(r"^/tools(?:$|/)"), Permissions.TOOLS_CREATE),
63 ("PUT", re.compile(r"^/tools/[^/]+(?:$|/)"), Permissions.TOOLS_UPDATE),
64 ("DELETE", re.compile(r"^/tools/[^/]+(?:$|/)"), Permissions.TOOLS_DELETE),
65 ("GET", re.compile(r"^/servers/[^/]+/tools(?:$|/)"), Permissions.TOOLS_READ),
66 ("POST", re.compile(r"^/servers/[^/]+/tools/[^/]+/call(?:$|/)"), Permissions.TOOLS_EXECUTE),
67 # Resources permissions
68 ("GET", re.compile(r"^/resources(?:$|/)"), Permissions.RESOURCES_READ),
69 ("POST", re.compile(r"^/resources(?:$|/)"), Permissions.RESOURCES_CREATE),
70 ("PUT", re.compile(r"^/resources/[^/]+(?:$|/)"), Permissions.RESOURCES_UPDATE),
71 ("DELETE", re.compile(r"^/resources/[^/]+(?:$|/)"), Permissions.RESOURCES_DELETE),
72 ("GET", re.compile(r"^/servers/[^/]+/resources(?:$|/)"), Permissions.RESOURCES_READ),
73 # Prompts permissions
74 ("GET", re.compile(r"^/prompts(?:$|/)"), Permissions.PROMPTS_READ),
75 ("POST", re.compile(r"^/prompts(?:$|/)"), Permissions.PROMPTS_CREATE),
76 ("PUT", re.compile(r"^/prompts/[^/]+(?:$|/)"), Permissions.PROMPTS_UPDATE),
77 ("DELETE", re.compile(r"^/prompts/[^/]+(?:$|/)"), Permissions.PROMPTS_DELETE),
78 # Server management permissions
79 ("GET", re.compile(r"^/servers(?:$|/)"), Permissions.SERVERS_READ),
80 ("POST", re.compile(r"^/servers(?:$|/)"), Permissions.SERVERS_CREATE),
81 ("PUT", re.compile(r"^/servers/[^/]+(?:$|/)"), Permissions.SERVERS_UPDATE),
82 ("DELETE", re.compile(r"^/servers/[^/]+(?:$|/)"), Permissions.SERVERS_DELETE),
83 # Gateway permissions
84 ("GET", re.compile(r"^/gateways(?:$|/)"), Permissions.GATEWAYS_READ),
85 ("POST", re.compile(r"^/gateways/?$"), Permissions.GATEWAYS_CREATE), # Only exact /gateways or /gateways/
86 ("POST", re.compile(r"^/gateways/[^/]+/"), Permissions.GATEWAYS_UPDATE), # POST to sub-resources (state, toggle, refresh)
87 ("PUT", re.compile(r"^/gateways/[^/]+(?:$|/)"), Permissions.GATEWAYS_UPDATE),
88 ("DELETE", re.compile(r"^/gateways/[^/]+(?:$|/)"), Permissions.GATEWAYS_DELETE),
89 # Admin permissions
90 ("GET", re.compile(r"^/admin(?:$|/)"), Permissions.ADMIN_USER_MANAGEMENT),
91 ("POST", re.compile(r"^/admin/[^/]+(?:$|/)"), Permissions.ADMIN_USER_MANAGEMENT),
92 ("PUT", re.compile(r"^/admin/[^/]+(?:$|/)"), Permissions.ADMIN_USER_MANAGEMENT),
93 ("DELETE", re.compile(r"^/admin/[^/]+(?:$|/)"), Permissions.ADMIN_USER_MANAGEMENT),
94]
97class TokenScopingMiddleware:
98 """Middleware to enforce token scoping restrictions.
100 Examples:
101 >>> middleware = TokenScopingMiddleware()
102 >>> isinstance(middleware, TokenScopingMiddleware)
103 True
104 """
106 def __init__(self):
107 """Initialize token scoping middleware.
109 Examples:
110 >>> middleware = TokenScopingMiddleware()
111 >>> hasattr(middleware, '_extract_token_scopes')
112 True
113 """
115 def _normalize_teams(self, teams) -> list:
116 """Normalize teams from token payload to list of team IDs.
118 Handles various team formats:
119 - None -> []
120 - List of strings -> as-is
121 - List of dicts with 'id' key -> extract IDs
123 Args:
124 teams: Raw teams value from JWT payload
126 Returns:
127 List of team ID strings
128 """
129 if not teams:
130 return []
131 normalized = []
132 for team in teams:
133 if isinstance(team, dict):
134 team_id = team.get("id")
135 if team_id:
136 normalized.append(team_id)
137 elif isinstance(team, str):
138 normalized.append(team)
139 return normalized
141 async def _extract_token_scopes(self, request: Request) -> Optional[dict]:
142 """Extract token scopes from JWT in request.
144 Args:
145 request: FastAPI request object
147 Returns:
148 Dict containing token scopes or None if no valid token
149 """
150 # Get authorization header
151 auth_header = request.headers.get("Authorization")
152 if not auth_header or not auth_header.startswith("Bearer "):
153 return None
155 token = auth_header.split(" ", 1)[1]
157 try:
158 # Use the centralized verify_jwt_token_cached function for consistent JWT validation
159 payload = await verify_jwt_token_cached(token, request)
160 return payload
161 except HTTPException:
162 # Token validation failed (expired, invalid, etc.)
163 return None
164 except Exception:
165 # Any other error in token validation
166 return None
168 def _get_client_ip(self, request: Request) -> str:
169 """Extract client IP address from request.
171 Args:
172 request: FastAPI request object
174 Returns:
175 str: Client IP address
176 """
177 # Check for X-Forwarded-For header (proxy/load balancer)
178 forwarded_for = request.headers.get("X-Forwarded-For")
179 if forwarded_for:
180 return forwarded_for.split(",")[0].strip()
182 # Check for X-Real-IP header
183 real_ip = request.headers.get("X-Real-IP")
184 if real_ip:
185 return real_ip
187 # Fall back to direct client IP
188 return request.client.host if request.client else "unknown"
190 def _check_ip_restrictions(self, client_ip: str, ip_restrictions: list) -> bool:
191 """Check if client IP is allowed by restrictions.
193 Args:
194 client_ip: Client's IP address
195 ip_restrictions: List of allowed IP addresses/CIDR ranges
197 Returns:
198 bool: True if IP is allowed, False otherwise
200 Examples:
201 Allow specific IP:
202 >>> m = TokenScopingMiddleware()
203 >>> m._check_ip_restrictions('192.168.1.10', ['192.168.1.10'])
204 True
206 Allow CIDR range:
207 >>> m._check_ip_restrictions('10.0.0.5', ['10.0.0.0/24'])
208 True
210 Deny when not in list:
211 >>> m._check_ip_restrictions('10.0.1.5', ['10.0.0.0/24'])
212 False
214 Empty restrictions allow all:
215 >>> m._check_ip_restrictions('203.0.113.1', [])
216 True
217 """
218 if not ip_restrictions:
219 return True # No restrictions
221 try:
222 client_ip_obj = ipaddress.ip_address(client_ip)
224 for restriction in ip_restrictions:
225 try:
226 # Check if it's a CIDR range
227 if "/" in restriction:
228 network = ipaddress.ip_network(restriction, strict=False)
229 if client_ip_obj in network:
230 return True
231 else:
232 # Single IP address
233 if client_ip_obj == ipaddress.ip_address(restriction): 233 ↛ 224line 233 didn't jump to line 224 because the condition on line 233 was always true
234 return True
235 except (ValueError, ipaddress.AddressValueError):
236 continue
238 except (ValueError, ipaddress.AddressValueError):
239 return False
241 return False
243 def _check_time_restrictions(self, time_restrictions: dict) -> bool:
244 """Check if current time is allowed by restrictions.
246 Args:
247 time_restrictions: Dict containing time-based restrictions
249 Returns:
250 bool: True if current time is allowed, False otherwise
252 Examples:
253 No restrictions allow access:
254 >>> m = TokenScopingMiddleware()
255 >>> m._check_time_restrictions({})
256 True
258 Weekdays only: result depends on current weekday (always bool):
259 >>> isinstance(m._check_time_restrictions({'weekdays_only': True}), bool)
260 True
262 Business hours only: result depends on current hour (always bool):
263 >>> isinstance(m._check_time_restrictions({'business_hours_only': True}), bool)
264 True
265 """
266 if not time_restrictions:
267 return True # No restrictions
269 now = datetime.now(tz=timezone.utc)
271 # Check business hours restriction
272 if time_restrictions.get("business_hours_only"):
273 # Assume business hours are 9 AM to 5 PM UTC
274 # This could be made configurable
275 if not 9 <= now.hour < 17:
276 return False
278 # Check day of week restrictions
279 weekdays_only = time_restrictions.get("weekdays_only")
280 if weekdays_only and now.weekday() >= 5: # Saturday=5, Sunday=6
281 return False
283 return True
285 def _check_server_restriction(self, request_path: str, server_id: Optional[str]) -> bool:
286 """Check if request path matches server restriction.
288 Args:
289 request_path: The request path/URL
290 server_id: Required server ID (None means no restriction)
292 Returns:
293 bool: True if request is allowed, False otherwise
295 Examples:
296 Match server paths:
297 >>> m = TokenScopingMiddleware()
298 >>> m._check_server_restriction('/servers/abc/tools', 'abc')
299 True
300 >>> m._check_server_restriction('/sse/xyz', 'xyz')
301 True
302 >>> m._check_server_restriction('/ws/xyz?x=1', 'xyz')
303 True
305 Mismatch denies:
306 >>> m._check_server_restriction('/servers/def', 'abc')
307 False
309 General endpoints allowed:
310 >>> m._check_server_restriction('/health', 'abc')
311 True
312 >>> m._check_server_restriction('/', 'abc')
313 True
314 """
315 if not server_id:
316 return True # No server restriction
318 # Extract server ID from path patterns (uses precompiled regex)
319 # /servers/{server_id}/...
320 # /sse/{server_id}
321 # /ws/{server_id}
322 for pattern in _SERVER_PATH_PATTERNS:
323 match = pattern.search(request_path)
324 if match:
325 path_server_id = match.group(1)
326 return path_server_id == server_id
328 # If no server ID found in path, allow general endpoints
329 general_endpoints = ["/health", "/metrics", "/openapi.json", "/docs", "/redoc", "/rpc"]
331 # Check exact root path separately
332 if request_path == "/":
333 return True
335 for endpoint in general_endpoints:
336 if request_path.startswith(endpoint):
337 return True
339 # Default deny for unmatched paths with server restrictions
340 return False
342 def _check_permission_restrictions(self, request_path: str, request_method: str, permissions: list) -> bool:
343 """Check if request is allowed by permission restrictions.
345 Args:
346 request_path: The request path/URL
347 request_method: HTTP method (GET, POST, etc.)
348 permissions: List of allowed permissions
350 Returns:
351 bool: True if request is allowed, False otherwise
353 Examples:
354 Wildcard allows all:
355 >>> m = TokenScopingMiddleware()
356 >>> m._check_permission_restrictions('/tools', 'GET', ['*'])
357 True
359 Requires specific permission:
360 >>> m._check_permission_restrictions('/tools', 'POST', ['tools.create'])
361 True
362 >>> m._check_permission_restrictions('/tools/xyz', 'PUT', ['tools.update'])
363 True
364 >>> m._check_permission_restrictions('/resources', 'GET', ['resources.read'])
365 True
366 >>> m._check_permission_restrictions('/servers/s1/tools/abc/call', 'POST', ['tools.execute'])
367 True
369 Missing permission denies:
370 >>> m._check_permission_restrictions('/tools', 'POST', ['tools.read'])
371 False
372 """
373 if not permissions or "*" in permissions:
374 return True # No restrictions or full access
376 # Check each permission mapping (uses precompiled regex patterns)
377 for method, path_pattern, required_permission in _PERMISSION_PATTERNS:
378 if request_method == method and path_pattern.match(request_path):
379 return required_permission in permissions
381 # Default allow for unmatched paths
382 return True
384 def _check_team_membership(self, payload: dict, db=None) -> bool:
385 """
386 Check if user still belongs to teams in the token.
388 For public-only tokens (no teams), always returns True.
389 For team-scoped tokens, validates membership with caching.
391 Uses in-memory cache (per gateway instance, 60s TTL) to avoid repeated
392 email_team_members queries for the same user+teams combination.
393 Note: Sync path uses in-memory only for performance; Redis is not
394 consulted to avoid async overhead in the hot path.
396 Args:
397 payload: Decoded JWT payload containing teams
398 db: Optional database session. If provided, caller manages lifecycle.
399 If None, creates and manages its own session.
401 Returns:
402 bool: True if team membership is valid, False otherwise
403 """
404 teams = payload.get("teams", [])
405 user_email = payload.get("sub")
407 # PUBLIC-ONLY TOKEN: No team validation needed
408 if not teams or len(teams) == 0:
409 logger.debug(f"Public-only token for user {user_email} - no team validation required")
410 return True
412 # TEAM-SCOPED TOKEN: Validate membership
413 if not user_email:
414 logger.warning("Token missing user email")
415 return False
417 # Extract team IDs from token (handles both dict and string formats)
418 team_ids = [team["id"] if isinstance(team, dict) else team for team in teams]
420 # First-Party
421 from mcpgateway.cache.auth_cache import get_auth_cache # pylint: disable=import-outside-toplevel
423 # Check cache first (synchronous in-memory lookup)
424 auth_cache = get_auth_cache()
425 cached_result = auth_cache.get_team_membership_valid_sync(user_email, team_ids)
426 if cached_result is not None:
427 if not cached_result: 427 ↛ 429line 427 didn't jump to line 429 because the condition on line 427 was always true
428 logger.warning(f"Token invalid (cached): User {user_email} no longer member of teams")
429 return cached_result
431 # Cache miss - query database
432 # Third-Party
433 from sqlalchemy import select # pylint: disable=import-outside-toplevel
435 # First-Party
436 from mcpgateway.db import EmailTeamMember, get_db # pylint: disable=import-outside-toplevel
438 # Track if we own the session (and thus must clean it up)
439 owns_session = db is None
440 if owns_session: 440 ↛ 443line 440 didn't jump to line 443 because the condition on line 440 was always true
441 db = next(get_db())
443 try:
444 # Single query for all teams (fixes N+1 pattern)
445 memberships = (
446 db.execute(
447 select(EmailTeamMember.team_id).where(
448 EmailTeamMember.team_id.in_(team_ids),
449 EmailTeamMember.user_email == user_email,
450 EmailTeamMember.is_active.is_(True),
451 )
452 )
453 .scalars()
454 .all()
455 )
457 # Check if user is member of ALL teams in token
458 valid_team_ids = set(memberships)
459 missing_teams = set(team_ids) - valid_team_ids
461 if missing_teams:
462 logger.warning(f"Token invalid: User {user_email} no longer member of teams: {missing_teams}")
463 # Cache negative result
464 auth_cache.set_team_membership_valid_sync(user_email, team_ids, False)
465 return False
467 # Cache positive result
468 auth_cache.set_team_membership_valid_sync(user_email, team_ids, True)
469 return True
470 finally:
471 # Only commit/close if we created the session
472 if owns_session:
473 try:
474 db.commit() # Commit read-only transaction to avoid implicit rollback
475 finally:
476 db.close()
478 def _check_resource_team_ownership(self, request_path: str, token_teams: list, db=None, _user_email: str = None) -> bool: # pylint: disable=too-many-return-statements
479 """
480 Check if the requested resource is accessible by the token.
482 Implements Three-Tier Resource Visibility (Public/Team/Private):
483 - PUBLIC: Accessible by all tokens (public-only and team-scoped)
484 - TEAM: Accessible only by tokens scoped to that specific team
485 - PRIVATE: Accessible only by tokens scoped to that specific team
487 Token Access Rules:
488 - Public-only tokens (empty token_teams): Can access public resources + their own resources
489 - Team-scoped tokens: Can access their team's resources + public resources
491 Handles URLs like:
492 - /servers/{id}/mcp
493 - /servers/{id}/sse
494 - /servers/{id}
495 - /tools/{id}/execute
496 - /tools/{id}
497 - /resources/{id}
498 - /prompts/{id}
500 Args:
501 request_path: The request path/URL
502 token_teams: List of team IDs from the token (empty list = public-only token)
503 db: Optional database session. If provided, caller manages lifecycle.
504 If None, creates and manages its own session.
506 Returns:
507 bool: True if resource access is allowed, False otherwise
508 """
509 # Normalize token_teams: extract team IDs from dict objects (backward compatibility)
510 token_team_ids = []
511 for team in token_teams:
512 if isinstance(team, dict):
513 token_team_ids.append(team["id"])
514 else:
515 token_team_ids.append(team)
517 # Determine token type
518 is_public_token = not token_team_ids or len(token_team_ids) == 0
520 if is_public_token:
521 logger.debug("Processing request with PUBLIC-ONLY token")
522 else:
523 logger.debug(f"Processing request with TEAM-SCOPED token (teams: {token_teams})")
525 # Extract resource type and ID from path (uses precompiled regex patterns)
526 # IDs are UUID hex strings (32 chars) or UUID with dashes (36 chars)
527 resource_id = None
528 resource_type = None
530 for pattern, rtype in _RESOURCE_PATTERNS:
531 match = pattern.search(request_path)
532 if match:
533 resource_id = match.group(1)
534 resource_type = rtype
535 logger.debug(f"Extracted {rtype} ID: {resource_id} from path: {request_path}")
536 break
538 # If no resource ID in path, allow (general endpoints like /health, /tokens, /metrics)
539 if not resource_id or not resource_type:
540 logger.debug(f"No resource ID found in path {request_path}, allowing access")
541 return True
543 # Import database models
544 # Third-Party
545 from sqlalchemy import select # pylint: disable=import-outside-toplevel
547 # First-Party
548 from mcpgateway.db import Gateway, get_db, Prompt, Resource, Server, Tool # pylint: disable=import-outside-toplevel
550 # Track if we own the session (and thus must clean it up)
551 owns_session = db is None
552 if owns_session:
553 db = next(get_db())
555 try:
556 # Check Virtual Servers
557 if resource_type == "server":
558 server = db.execute(select(Server).where(Server.id == resource_id)).scalar_one_or_none()
560 if not server:
561 logger.warning(f"Server {resource_id} not found in database")
562 return True
564 # Get server visibility (default to 'team' if field doesn't exist)
565 server_visibility = getattr(server, "visibility", "team")
567 # PUBLIC SERVERS: Accessible by everyone (including public-only tokens)
568 if server_visibility == "public":
569 logger.debug(f"Access granted: Server {resource_id} is PUBLIC")
570 return True
572 # PUBLIC-ONLY TOKEN: Can ONLY access public servers (strict public-only policy)
573 # No owner access - if user needs own resources, use a personal team-scoped token
574 if is_public_token:
575 logger.warning(f"Access denied: Public-only token cannot access {server_visibility} server {resource_id}")
576 return False
578 # TEAM-SCOPED SERVERS: Check if server belongs to token's teams
579 if server_visibility == "team":
580 if server.team_id in token_team_ids:
581 logger.debug(f"Access granted: Team server {resource_id} belongs to token's team {server.team_id}")
582 return True
584 logger.warning(f"Access denied: Server {resource_id} is team-scoped to '{server.team_id}', token is scoped to teams {token_team_ids}")
585 return False
587 # PRIVATE SERVERS: Owner-only access (per RBAC doc)
588 if server_visibility == "private":
589 server_owner = getattr(server, "owner_email", None)
590 if server_owner and server_owner == _user_email:
591 logger.debug(f"Access granted: Private server {resource_id} owned by {_user_email}")
592 return True
594 logger.warning(f"Access denied: Server {resource_id} is private, owner is '{server_owner}', requester is '{_user_email}'")
595 return False
597 # Unknown visibility - deny by default
598 logger.warning(f"Access denied: Server {resource_id} has unknown visibility: {server_visibility}")
599 return False
601 # CHECK TOOLS
602 if resource_type == "tool":
603 tool = db.execute(select(Tool).where(Tool.id == resource_id)).scalar_one_or_none()
605 if not tool:
606 logger.warning(f"Tool {resource_id} not found in database")
607 return True
609 # Get tool visibility (default to 'team' if field doesn't exist)
610 tool_visibility = getattr(tool, "visibility", "team")
612 # PUBLIC TOOLS: Accessible by everyone (including public-only tokens)
613 if tool_visibility == "public":
614 logger.debug(f"Access granted: Tool {resource_id} is PUBLIC")
615 return True
617 # PUBLIC-ONLY TOKEN: Can ONLY access public tools (strict public-only policy)
618 # No owner access - if user needs own resources, use a personal team-scoped token
619 if is_public_token:
620 logger.warning(f"Access denied: Public-only token cannot access {tool_visibility} tool {resource_id}")
621 return False
623 # TEAM TOOLS: Check if tool's team matches token's teams
624 if tool_visibility == "team":
625 tool_team_id = getattr(tool, "team_id", None)
626 if tool_team_id and tool_team_id in token_team_ids:
627 logger.debug(f"Access granted: Team tool {resource_id} belongs to token's team {tool_team_id}")
628 return True
630 logger.warning(f"Access denied: Tool {resource_id} is team-scoped to '{tool_team_id}', token is scoped to teams {token_team_ids}")
631 return False
633 # PRIVATE TOOLS: Owner-only access (per RBAC doc)
634 if tool_visibility in ["private", "user"]:
635 tool_owner = getattr(tool, "owner_email", None)
636 if tool_owner and tool_owner == _user_email:
637 logger.debug(f"Access granted: Private tool {resource_id} owned by {_user_email}")
638 return True
640 logger.warning(f"Access denied: Tool {resource_id} is {tool_visibility}, owner is '{tool_owner}', requester is '{_user_email}'")
641 return False
643 # Unknown visibility - deny by default
644 logger.warning(f"Access denied: Tool {resource_id} has unknown visibility: {tool_visibility}")
645 return False
647 # CHECK RESOURCES
648 if resource_type == "resource":
649 resource = db.execute(select(Resource).where(Resource.id == resource_id)).scalar_one_or_none()
651 if not resource:
652 logger.warning(f"Resource {resource_id} not found in database")
653 return True
655 # Get resource visibility (default to 'team' if field doesn't exist)
656 resource_visibility = getattr(resource, "visibility", "team")
658 # PUBLIC RESOURCES: Accessible by everyone (including public-only tokens)
659 if resource_visibility == "public":
660 logger.debug(f"Access granted: Resource {resource_id} is PUBLIC")
661 return True
663 # PUBLIC-ONLY TOKEN: Can ONLY access public resources (strict public-only policy)
664 # No owner access - if user needs own resources, use a personal team-scoped token
665 if is_public_token:
666 logger.warning(f"Access denied: Public-only token cannot access {resource_visibility} resource {resource_id}")
667 return False
669 # TEAM RESOURCES: Check if resource's team matches token's teams
670 if resource_visibility == "team":
671 resource_team_id = getattr(resource, "team_id", None)
672 if resource_team_id and resource_team_id in token_team_ids:
673 logger.debug(f"Access granted: Team resource {resource_id} belongs to token's team {resource_team_id}")
674 return True
676 logger.warning(f"Access denied: Resource {resource_id} is team-scoped to '{resource_team_id}', token is scoped to teams {token_team_ids}")
677 return False
679 # PRIVATE RESOURCES: Owner-only access (per RBAC doc)
680 if resource_visibility in ["private", "user"]:
681 resource_owner = getattr(resource, "owner_email", None)
682 if resource_owner and resource_owner == _user_email:
683 logger.debug(f"Access granted: Private resource {resource_id} owned by {_user_email}")
684 return True
686 logger.warning(f"Access denied: Resource {resource_id} is {resource_visibility}, owner is '{resource_owner}', requester is '{_user_email}'")
687 return False
689 # Unknown visibility - deny by default
690 logger.warning(f"Access denied: Resource {resource_id} has unknown visibility: {resource_visibility}")
691 return False
693 # CHECK PROMPTS
694 if resource_type == "prompt":
695 prompt = db.execute(select(Prompt).where(Prompt.id == resource_id)).scalar_one_or_none()
697 if not prompt:
698 logger.warning(f"Prompt {resource_id} not found in database")
699 return True
701 # Get prompt visibility (default to 'team' if field doesn't exist)
702 prompt_visibility = getattr(prompt, "visibility", "team")
704 # PUBLIC PROMPTS: Accessible by everyone (including public-only tokens)
705 if prompt_visibility == "public":
706 logger.debug(f"Access granted: Prompt {resource_id} is PUBLIC")
707 return True
709 # PUBLIC-ONLY TOKEN: Can ONLY access public prompts (strict public-only policy)
710 # No owner access - if user needs own resources, use a personal team-scoped token
711 if is_public_token:
712 logger.warning(f"Access denied: Public-only token cannot access {prompt_visibility} prompt {resource_id}")
713 return False
715 # TEAM PROMPTS: Check if prompt's team matches token's teams
716 if prompt_visibility == "team":
717 prompt_team_id = getattr(prompt, "team_id", None)
718 if prompt_team_id and prompt_team_id in token_team_ids:
719 logger.debug(f"Access granted: Team prompt {resource_id} belongs to token's team {prompt_team_id}")
720 return True
722 logger.warning(f"Access denied: Prompt {resource_id} is team-scoped to '{prompt_team_id}', token is scoped to teams {token_team_ids}")
723 return False
725 # PRIVATE PROMPTS: Owner-only access (per RBAC doc)
726 if prompt_visibility in ["private", "user"]:
727 prompt_owner = getattr(prompt, "owner_email", None)
728 if prompt_owner and prompt_owner == _user_email:
729 logger.debug(f"Access granted: Private prompt {resource_id} owned by {_user_email}")
730 return True
732 logger.warning(f"Access denied: Prompt {resource_id} is {prompt_visibility}, owner is '{prompt_owner}', requester is '{_user_email}'")
733 return False
735 # Unknown visibility - deny by default
736 logger.warning(f"Access denied: Prompt {resource_id} has unknown visibility: {prompt_visibility}")
737 return False
739 # CHECK GATEWAYS
740 if resource_type == "gateway":
741 gateway = db.execute(select(Gateway).where(Gateway.id == resource_id)).scalar_one_or_none()
743 if not gateway:
744 logger.warning(f"Gateway {resource_id} not found in database")
745 return True
747 # Get gateway visibility (default to 'team' if field doesn't exist)
748 gateway_visibility = getattr(gateway, "visibility", "team")
750 # PUBLIC GATEWAYS: Accessible by everyone (including public-only tokens)
751 if gateway_visibility == "public":
752 logger.debug(f"Access granted: Gateway {resource_id} is PUBLIC")
753 return True
755 # PUBLIC-ONLY TOKEN: Can ONLY access public gateways (strict public-only policy)
756 # No owner access - if user needs own resources, use a personal team-scoped token
757 if is_public_token:
758 logger.warning(f"Access denied: Public-only token cannot access {gateway_visibility} gateway {resource_id}")
759 return False
761 # TEAM GATEWAYS: Check if gateway's team matches token's teams
762 if gateway_visibility == "team":
763 gateway_team_id = getattr(gateway, "team_id", None)
764 if gateway_team_id and gateway_team_id in token_team_ids:
765 logger.debug(f"Access granted: Team gateway {resource_id} belongs to token's team {gateway_team_id}")
766 return True
768 logger.warning(f"Access denied: Gateway {resource_id} is team-scoped to '{gateway_team_id}', token is scoped to teams {token_team_ids}")
769 return False
771 # PRIVATE GATEWAYS: Owner-only access (per RBAC doc)
772 if gateway_visibility in ["private", "user"]:
773 gateway_owner = getattr(gateway, "owner_email", None)
774 if gateway_owner and gateway_owner == _user_email:
775 logger.debug(f"Access granted: Private gateway {resource_id} owned by {_user_email}")
776 return True
778 logger.warning(f"Access denied: Gateway {resource_id} is {gateway_visibility}, owner is '{gateway_owner}', requester is '{_user_email}'")
779 return False
781 # Unknown visibility - deny by default
782 logger.warning(f"Access denied: Gateway {resource_id} has unknown visibility: {gateway_visibility}")
783 return False
785 # UNKNOWN RESOURCE TYPE
786 logger.warning(f"Unknown resource type '{resource_type}' for path: {request_path}")
787 return False
789 except Exception as e:
790 logger.error(f"Error checking resource team ownership for {request_path}: {e}", exc_info=True)
791 # Fail securely - deny access on error
792 return False
793 finally:
794 # Only commit/close if we created the session
795 if owns_session:
796 try:
797 db.commit() # Commit read-only transaction to avoid implicit rollback
798 finally:
799 db.close()
801 async def __call__(self, request: Request, call_next):
802 """Middleware function to check token scoping including team-level validation.
804 Args:
805 request: FastAPI request object
806 call_next: Next middleware/handler in chain
808 Returns:
809 Response from next handler or HTTPException
811 Raises:
812 HTTPException: If token scoping restrictions are violated
813 """
814 try:
815 # Skip if already scoped (prevents double-scoping for /mcp requests)
816 # MCPPathRewriteMiddleware runs scoping via dispatch, then routes through
817 # middleware stack which hits BaseHTTPMiddleware's scoping again.
818 # Use request.state flag which persists across middleware invocations.
819 if getattr(request.state, "_token_scoping_done", False):
820 return await call_next(request)
822 # Mark as scoped before doing any work
823 request.state._token_scoping_done = True
825 # Skip scoping for certain paths (truly public endpoints only)
826 skip_paths = [
827 "/health",
828 "/metrics",
829 "/openapi.json",
830 "/docs",
831 "/redoc",
832 "/auth/email/login",
833 "/auth/email/register",
834 "/.well-known/",
835 ]
837 # Check exact root path separately
838 if request.url.path == "/":
839 return await call_next(request)
841 if any(request.url.path.startswith(path) for path in skip_paths):
842 return await call_next(request)
844 # Skip server-specific well-known endpoints (RFC 9728)
845 if re.match(r"^/servers/[^/]+/\.well-known/", request.url.path):
846 return await call_next(request)
848 # Extract full token payload (not just scopes)
849 payload = await self._extract_token_scopes(request)
851 # If no payload, continue (regular auth will handle this)
852 if not payload:
853 return await call_next(request)
855 # TEAM VALIDATION: Use single DB session for both team checks
856 # This reduces connection pool overhead from 2 sessions to 1 for resource endpoints
857 user_email = payload.get("sub") or payload.get("email") # Extract user email for ownership check
859 # Resolve teams based on token_use claim
860 token_use = payload.get("token_use")
861 if token_use == "session": # nosec B105 - Not a password; token_use is a JWT claim type
862 # Session token: resolve teams from DB/cache directly
863 # Cannot rely on request.state.token_teams — AuthContextMiddleware
864 # is gated by security_logging_enabled (defaults to False)
865 # First-Party
866 from mcpgateway.auth import _resolve_teams_from_db # pylint: disable=import-outside-toplevel
868 is_admin = payload.get("is_admin", False) or payload.get("user", {}).get("is_admin", False)
869 user_info = {"is_admin": is_admin}
870 token_teams = await _resolve_teams_from_db(user_email, user_info)
871 else:
872 # API token or legacy: use embedded teams with normalize_token_teams
873 token_teams = normalize_token_teams(payload)
875 # Check if admin bypass is active (token_teams is None means admin with explicit null teams)
876 is_admin_bypass = token_teams is None
878 # Admin with explicit null teams bypasses team validation entirely
879 if is_admin_bypass:
880 logger.debug(f"Admin bypass: skipping team validation for {user_email}")
881 # Skip to other checks (server_id, IP, etc.)
882 elif token_teams:
883 # First-Party
884 from mcpgateway.db import get_db # pylint: disable=import-outside-toplevel
886 db = next(get_db())
887 try:
888 # Check team membership with shared session
889 if not self._check_team_membership(payload, db=db):
890 logger.warning("Token rejected: User no longer member of associated team(s)")
891 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token is invalid: User is no longer a member of the associated team")
893 # Check resource team ownership with shared session
894 if not self._check_resource_team_ownership(request.url.path, token_teams, db=db, _user_email=user_email):
895 logger.warning(f"Access denied: Resource does not belong to token's teams {token_teams}")
896 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied: You do not have permission to access this resource using the current token")
897 finally:
898 # Ensure session cleanup even if checks raise exceptions
899 try:
900 db.commit()
901 finally:
902 db.close()
903 else:
904 # Public-only token: no team membership check needed, but still check resource ownership
905 if not self._check_team_membership(payload):
906 logger.warning("Token rejected: User no longer member of associated team(s)")
907 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token is invalid: User is no longer a member of the associated team")
909 if not self._check_resource_team_ownership(request.url.path, token_teams, _user_email=user_email):
910 logger.warning(f"Access denied: Resource does not belong to token's teams {token_teams}")
911 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied: You do not have permission to access this resource using the current token")
913 # Extract scopes from payload
914 scopes = payload.get("scopes", {})
916 # Check server ID restriction
917 server_id = scopes.get("server_id")
918 if not self._check_server_restriction(request.url.path, server_id):
919 logger.warning(f"Token not authorized for this server. Required: {server_id}")
920 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"Token not authorized for this server. Required: {server_id}")
922 # Check IP restrictions
923 ip_restrictions = scopes.get("ip_restrictions", [])
924 if ip_restrictions:
925 client_ip = self._get_client_ip(request)
926 if not self._check_ip_restrictions(client_ip, ip_restrictions): 926 ↛ 931line 926 didn't jump to line 931 because the condition on line 926 was always true
927 logger.warning(f"Request from IP {client_ip} not allowed by token restrictions")
928 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"Request from IP {client_ip} not allowed by token restrictions")
930 # Check time restrictions
931 time_restrictions = scopes.get("time_restrictions", {})
932 if not self._check_time_restrictions(time_restrictions):
933 logger.warning("Request not allowed at this time by token restrictions")
934 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Request not allowed at this time by token restrictions")
936 # Check permission restrictions
937 permissions = scopes.get("permissions", [])
938 if not self._check_permission_restrictions(request.url.path, request.method, permissions):
939 logger.warning("Insufficient permissions for this operation")
940 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions for this operation")
942 # All scoping checks passed, continue
943 return await call_next(request)
945 except HTTPException as exc:
946 # Return clean JSON response instead of traceback
947 return ORJSONResponse(
948 status_code=exc.status_code,
949 content={"detail": exc.detail},
950 )
953# Create middleware instance
954token_scoping_middleware = TokenScopingMiddleware()