Coverage for mcpgateway / middleware / token_scoping.py: 99%

382 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/middleware/token_scoping.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Token Scoping Middleware. 

8This middleware enforces token scoping restrictions at the API level, 

9including server_id restrictions, IP restrictions, permission checks, 

10and time-based restrictions. 

11""" 

12 

13# Standard 

14from datetime import datetime, timezone 

15import ipaddress 

16import re 

17from typing import List, Optional, Pattern, Tuple 

18 

19# Third-Party 

20from fastapi import HTTPException, Request, status 

21from fastapi.security import HTTPBearer 

22 

23# First-Party 

24from mcpgateway.auth import normalize_token_teams 

25from mcpgateway.db import Permissions 

26from mcpgateway.services.logging_service import LoggingService 

27from mcpgateway.utils.orjson_response import ORJSONResponse 

28from mcpgateway.utils.verify_credentials import verify_jwt_token_cached 

29 

30# Security scheme 

31bearer_scheme = HTTPBearer(auto_error=False) 

32 

33# Initialize logging service first 

34logging_service = LoggingService() 

35logger = logging_service.get_logger(__name__) 

36 

37# ============================================================================ 

38# Precompiled regex patterns (compiled once at module load for performance) 

39# ============================================================================ 

40 

41# Server path extraction patterns 

42_SERVER_PATH_PATTERNS: List[Pattern[str]] = [ 

43 re.compile(r"^/servers/([^/]+)(?:$|/)"), 

44 re.compile(r"^/sse/([^/?]+)(?:$|\?)"), 

45 re.compile(r"^/ws/([^/?]+)(?:$|\?)"), 

46] 

47 

48# Resource ID extraction patterns (IDs are UUID hex strings) 

49_RESOURCE_PATTERNS: List[Tuple[Pattern[str], str]] = [ 

50 (re.compile(r"/servers/?([a-f0-9\-]+)"), "server"), 

51 (re.compile(r"/tools/?([a-f0-9\-]+)"), "tool"), 

52 (re.compile(r"/resources/?([a-f0-9\-]+)"), "resource"), 

53 (re.compile(r"/prompts/?([a-f0-9\-]+)"), "prompt"), 

54 (re.compile(r"/gateways/?([a-f0-9\-]+)"), "gateway"), 

55] 

56 

57# Permission map with precompiled patterns 

58# Maps (HTTP method, path pattern) to required permission 

59_PERMISSION_PATTERNS: List[Tuple[str, Pattern[str], str]] = [ 

60 # Tools permissions 

61 ("GET", re.compile(r"^/tools(?:$|/)"), Permissions.TOOLS_READ), 

62 ("POST", re.compile(r"^/tools(?:$|/)"), Permissions.TOOLS_CREATE), 

63 ("PUT", re.compile(r"^/tools/[^/]+(?:$|/)"), Permissions.TOOLS_UPDATE), 

64 ("DELETE", re.compile(r"^/tools/[^/]+(?:$|/)"), Permissions.TOOLS_DELETE), 

65 ("GET", re.compile(r"^/servers/[^/]+/tools(?:$|/)"), Permissions.TOOLS_READ), 

66 ("POST", re.compile(r"^/servers/[^/]+/tools/[^/]+/call(?:$|/)"), Permissions.TOOLS_EXECUTE), 

67 # Resources permissions 

68 ("GET", re.compile(r"^/resources(?:$|/)"), Permissions.RESOURCES_READ), 

69 ("POST", re.compile(r"^/resources(?:$|/)"), Permissions.RESOURCES_CREATE), 

70 ("PUT", re.compile(r"^/resources/[^/]+(?:$|/)"), Permissions.RESOURCES_UPDATE), 

71 ("DELETE", re.compile(r"^/resources/[^/]+(?:$|/)"), Permissions.RESOURCES_DELETE), 

72 ("GET", re.compile(r"^/servers/[^/]+/resources(?:$|/)"), Permissions.RESOURCES_READ), 

73 # Prompts permissions 

74 ("GET", re.compile(r"^/prompts(?:$|/)"), Permissions.PROMPTS_READ), 

75 ("POST", re.compile(r"^/prompts(?:$|/)"), Permissions.PROMPTS_CREATE), 

76 ("PUT", re.compile(r"^/prompts/[^/]+(?:$|/)"), Permissions.PROMPTS_UPDATE), 

77 ("DELETE", re.compile(r"^/prompts/[^/]+(?:$|/)"), Permissions.PROMPTS_DELETE), 

78 # Server management permissions 

79 ("GET", re.compile(r"^/servers(?:$|/)"), Permissions.SERVERS_READ), 

80 ("POST", re.compile(r"^/servers(?:$|/)"), Permissions.SERVERS_CREATE), 

81 ("PUT", re.compile(r"^/servers/[^/]+(?:$|/)"), Permissions.SERVERS_UPDATE), 

82 ("DELETE", re.compile(r"^/servers/[^/]+(?:$|/)"), Permissions.SERVERS_DELETE), 

83 # Gateway permissions 

84 ("GET", re.compile(r"^/gateways(?:$|/)"), Permissions.GATEWAYS_READ), 

85 ("POST", re.compile(r"^/gateways/?$"), Permissions.GATEWAYS_CREATE), # Only exact /gateways or /gateways/ 

86 ("POST", re.compile(r"^/gateways/[^/]+/"), Permissions.GATEWAYS_UPDATE), # POST to sub-resources (state, toggle, refresh) 

87 ("PUT", re.compile(r"^/gateways/[^/]+(?:$|/)"), Permissions.GATEWAYS_UPDATE), 

88 ("DELETE", re.compile(r"^/gateways/[^/]+(?:$|/)"), Permissions.GATEWAYS_DELETE), 

89 # Admin permissions 

90 ("GET", re.compile(r"^/admin(?:$|/)"), Permissions.ADMIN_USER_MANAGEMENT), 

91 ("POST", re.compile(r"^/admin/[^/]+(?:$|/)"), Permissions.ADMIN_USER_MANAGEMENT), 

92 ("PUT", re.compile(r"^/admin/[^/]+(?:$|/)"), Permissions.ADMIN_USER_MANAGEMENT), 

93 ("DELETE", re.compile(r"^/admin/[^/]+(?:$|/)"), Permissions.ADMIN_USER_MANAGEMENT), 

94] 

95 

96 

97class TokenScopingMiddleware: 

98 """Middleware to enforce token scoping restrictions. 

99 

100 Examples: 

101 >>> middleware = TokenScopingMiddleware() 

102 >>> isinstance(middleware, TokenScopingMiddleware) 

103 True 

104 """ 

105 

106 def __init__(self): 

107 """Initialize token scoping middleware. 

108 

109 Examples: 

110 >>> middleware = TokenScopingMiddleware() 

111 >>> hasattr(middleware, '_extract_token_scopes') 

112 True 

113 """ 

114 

115 def _normalize_teams(self, teams) -> list: 

116 """Normalize teams from token payload to list of team IDs. 

117 

118 Handles various team formats: 

119 - None -> [] 

120 - List of strings -> as-is 

121 - List of dicts with 'id' key -> extract IDs 

122 

123 Args: 

124 teams: Raw teams value from JWT payload 

125 

126 Returns: 

127 List of team ID strings 

128 """ 

129 if not teams: 

130 return [] 

131 normalized = [] 

132 for team in teams: 

133 if isinstance(team, dict): 

134 team_id = team.get("id") 

135 if team_id: 

136 normalized.append(team_id) 

137 elif isinstance(team, str): 

138 normalized.append(team) 

139 return normalized 

140 

141 async def _extract_token_scopes(self, request: Request) -> Optional[dict]: 

142 """Extract token scopes from JWT in request. 

143 

144 Args: 

145 request: FastAPI request object 

146 

147 Returns: 

148 Dict containing token scopes or None if no valid token 

149 """ 

150 # Get authorization header 

151 auth_header = request.headers.get("Authorization") 

152 if not auth_header or not auth_header.startswith("Bearer "): 

153 return None 

154 

155 token = auth_header.split(" ", 1)[1] 

156 

157 try: 

158 # Use the centralized verify_jwt_token_cached function for consistent JWT validation 

159 payload = await verify_jwt_token_cached(token, request) 

160 return payload 

161 except HTTPException: 

162 # Token validation failed (expired, invalid, etc.) 

163 return None 

164 except Exception: 

165 # Any other error in token validation 

166 return None 

167 

168 def _get_client_ip(self, request: Request) -> str: 

169 """Extract client IP address from request. 

170 

171 Args: 

172 request: FastAPI request object 

173 

174 Returns: 

175 str: Client IP address 

176 """ 

177 # Check for X-Forwarded-For header (proxy/load balancer) 

178 forwarded_for = request.headers.get("X-Forwarded-For") 

179 if forwarded_for: 

180 return forwarded_for.split(",")[0].strip() 

181 

182 # Check for X-Real-IP header 

183 real_ip = request.headers.get("X-Real-IP") 

184 if real_ip: 

185 return real_ip 

186 

187 # Fall back to direct client IP 

188 return request.client.host if request.client else "unknown" 

189 

190 def _check_ip_restrictions(self, client_ip: str, ip_restrictions: list) -> bool: 

191 """Check if client IP is allowed by restrictions. 

192 

193 Args: 

194 client_ip: Client's IP address 

195 ip_restrictions: List of allowed IP addresses/CIDR ranges 

196 

197 Returns: 

198 bool: True if IP is allowed, False otherwise 

199 

200 Examples: 

201 Allow specific IP: 

202 >>> m = TokenScopingMiddleware() 

203 >>> m._check_ip_restrictions('192.168.1.10', ['192.168.1.10']) 

204 True 

205 

206 Allow CIDR range: 

207 >>> m._check_ip_restrictions('10.0.0.5', ['10.0.0.0/24']) 

208 True 

209 

210 Deny when not in list: 

211 >>> m._check_ip_restrictions('10.0.1.5', ['10.0.0.0/24']) 

212 False 

213 

214 Empty restrictions allow all: 

215 >>> m._check_ip_restrictions('203.0.113.1', []) 

216 True 

217 """ 

218 if not ip_restrictions: 

219 return True # No restrictions 

220 

221 try: 

222 client_ip_obj = ipaddress.ip_address(client_ip) 

223 

224 for restriction in ip_restrictions: 

225 try: 

226 # Check if it's a CIDR range 

227 if "/" in restriction: 

228 network = ipaddress.ip_network(restriction, strict=False) 

229 if client_ip_obj in network: 

230 return True 

231 else: 

232 # Single IP address 

233 if client_ip_obj == ipaddress.ip_address(restriction): 233 ↛ 224line 233 didn't jump to line 224 because the condition on line 233 was always true

234 return True 

235 except (ValueError, ipaddress.AddressValueError): 

236 continue 

237 

238 except (ValueError, ipaddress.AddressValueError): 

239 return False 

240 

241 return False 

242 

243 def _check_time_restrictions(self, time_restrictions: dict) -> bool: 

244 """Check if current time is allowed by restrictions. 

245 

246 Args: 

247 time_restrictions: Dict containing time-based restrictions 

248 

249 Returns: 

250 bool: True if current time is allowed, False otherwise 

251 

252 Examples: 

253 No restrictions allow access: 

254 >>> m = TokenScopingMiddleware() 

255 >>> m._check_time_restrictions({}) 

256 True 

257 

258 Weekdays only: result depends on current weekday (always bool): 

259 >>> isinstance(m._check_time_restrictions({'weekdays_only': True}), bool) 

260 True 

261 

262 Business hours only: result depends on current hour (always bool): 

263 >>> isinstance(m._check_time_restrictions({'business_hours_only': True}), bool) 

264 True 

265 """ 

266 if not time_restrictions: 

267 return True # No restrictions 

268 

269 now = datetime.now(tz=timezone.utc) 

270 

271 # Check business hours restriction 

272 if time_restrictions.get("business_hours_only"): 

273 # Assume business hours are 9 AM to 5 PM UTC 

274 # This could be made configurable 

275 if not 9 <= now.hour < 17: 

276 return False 

277 

278 # Check day of week restrictions 

279 weekdays_only = time_restrictions.get("weekdays_only") 

280 if weekdays_only and now.weekday() >= 5: # Saturday=5, Sunday=6 

281 return False 

282 

283 return True 

284 

285 def _check_server_restriction(self, request_path: str, server_id: Optional[str]) -> bool: 

286 """Check if request path matches server restriction. 

287 

288 Args: 

289 request_path: The request path/URL 

290 server_id: Required server ID (None means no restriction) 

291 

292 Returns: 

293 bool: True if request is allowed, False otherwise 

294 

295 Examples: 

296 Match server paths: 

297 >>> m = TokenScopingMiddleware() 

298 >>> m._check_server_restriction('/servers/abc/tools', 'abc') 

299 True 

300 >>> m._check_server_restriction('/sse/xyz', 'xyz') 

301 True 

302 >>> m._check_server_restriction('/ws/xyz?x=1', 'xyz') 

303 True 

304 

305 Mismatch denies: 

306 >>> m._check_server_restriction('/servers/def', 'abc') 

307 False 

308 

309 General endpoints allowed: 

310 >>> m._check_server_restriction('/health', 'abc') 

311 True 

312 >>> m._check_server_restriction('/', 'abc') 

313 True 

314 """ 

315 if not server_id: 

316 return True # No server restriction 

317 

318 # Extract server ID from path patterns (uses precompiled regex) 

319 # /servers/{server_id}/... 

320 # /sse/{server_id} 

321 # /ws/{server_id} 

322 for pattern in _SERVER_PATH_PATTERNS: 

323 match = pattern.search(request_path) 

324 if match: 

325 path_server_id = match.group(1) 

326 return path_server_id == server_id 

327 

328 # If no server ID found in path, allow general endpoints 

329 general_endpoints = ["/health", "/metrics", "/openapi.json", "/docs", "/redoc", "/rpc"] 

330 

331 # Check exact root path separately 

332 if request_path == "/": 

333 return True 

334 

335 for endpoint in general_endpoints: 

336 if request_path.startswith(endpoint): 

337 return True 

338 

339 # Default deny for unmatched paths with server restrictions 

340 return False 

341 

342 def _check_permission_restrictions(self, request_path: str, request_method: str, permissions: list) -> bool: 

343 """Check if request is allowed by permission restrictions. 

344 

345 Args: 

346 request_path: The request path/URL 

347 request_method: HTTP method (GET, POST, etc.) 

348 permissions: List of allowed permissions 

349 

350 Returns: 

351 bool: True if request is allowed, False otherwise 

352 

353 Examples: 

354 Wildcard allows all: 

355 >>> m = TokenScopingMiddleware() 

356 >>> m._check_permission_restrictions('/tools', 'GET', ['*']) 

357 True 

358 

359 Requires specific permission: 

360 >>> m._check_permission_restrictions('/tools', 'POST', ['tools.create']) 

361 True 

362 >>> m._check_permission_restrictions('/tools/xyz', 'PUT', ['tools.update']) 

363 True 

364 >>> m._check_permission_restrictions('/resources', 'GET', ['resources.read']) 

365 True 

366 >>> m._check_permission_restrictions('/servers/s1/tools/abc/call', 'POST', ['tools.execute']) 

367 True 

368 

369 Missing permission denies: 

370 >>> m._check_permission_restrictions('/tools', 'POST', ['tools.read']) 

371 False 

372 """ 

373 if not permissions or "*" in permissions: 

374 return True # No restrictions or full access 

375 

376 # Check each permission mapping (uses precompiled regex patterns) 

377 for method, path_pattern, required_permission in _PERMISSION_PATTERNS: 

378 if request_method == method and path_pattern.match(request_path): 

379 return required_permission in permissions 

380 

381 # Default allow for unmatched paths 

382 return True 

383 

384 def _check_team_membership(self, payload: dict, db=None) -> bool: 

385 """ 

386 Check if user still belongs to teams in the token. 

387 

388 For public-only tokens (no teams), always returns True. 

389 For team-scoped tokens, validates membership with caching. 

390 

391 Uses in-memory cache (per gateway instance, 60s TTL) to avoid repeated 

392 email_team_members queries for the same user+teams combination. 

393 Note: Sync path uses in-memory only for performance; Redis is not 

394 consulted to avoid async overhead in the hot path. 

395 

396 Args: 

397 payload: Decoded JWT payload containing teams 

398 db: Optional database session. If provided, caller manages lifecycle. 

399 If None, creates and manages its own session. 

400 

401 Returns: 

402 bool: True if team membership is valid, False otherwise 

403 """ 

404 teams = payload.get("teams", []) 

405 user_email = payload.get("sub") 

406 

407 # PUBLIC-ONLY TOKEN: No team validation needed 

408 if not teams or len(teams) == 0: 

409 logger.debug(f"Public-only token for user {user_email} - no team validation required") 

410 return True 

411 

412 # TEAM-SCOPED TOKEN: Validate membership 

413 if not user_email: 

414 logger.warning("Token missing user email") 

415 return False 

416 

417 # Extract team IDs from token (handles both dict and string formats) 

418 team_ids = [team["id"] if isinstance(team, dict) else team for team in teams] 

419 

420 # First-Party 

421 from mcpgateway.cache.auth_cache import get_auth_cache # pylint: disable=import-outside-toplevel 

422 

423 # Check cache first (synchronous in-memory lookup) 

424 auth_cache = get_auth_cache() 

425 cached_result = auth_cache.get_team_membership_valid_sync(user_email, team_ids) 

426 if cached_result is not None: 

427 if not cached_result: 427 ↛ 429line 427 didn't jump to line 429 because the condition on line 427 was always true

428 logger.warning(f"Token invalid (cached): User {user_email} no longer member of teams") 

429 return cached_result 

430 

431 # Cache miss - query database 

432 # Third-Party 

433 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

434 

435 # First-Party 

436 from mcpgateway.db import EmailTeamMember, get_db # pylint: disable=import-outside-toplevel 

437 

438 # Track if we own the session (and thus must clean it up) 

439 owns_session = db is None 

440 if owns_session: 440 ↛ 443line 440 didn't jump to line 443 because the condition on line 440 was always true

441 db = next(get_db()) 

442 

443 try: 

444 # Single query for all teams (fixes N+1 pattern) 

445 memberships = ( 

446 db.execute( 

447 select(EmailTeamMember.team_id).where( 

448 EmailTeamMember.team_id.in_(team_ids), 

449 EmailTeamMember.user_email == user_email, 

450 EmailTeamMember.is_active.is_(True), 

451 ) 

452 ) 

453 .scalars() 

454 .all() 

455 ) 

456 

457 # Check if user is member of ALL teams in token 

458 valid_team_ids = set(memberships) 

459 missing_teams = set(team_ids) - valid_team_ids 

460 

461 if missing_teams: 

462 logger.warning(f"Token invalid: User {user_email} no longer member of teams: {missing_teams}") 

463 # Cache negative result 

464 auth_cache.set_team_membership_valid_sync(user_email, team_ids, False) 

465 return False 

466 

467 # Cache positive result 

468 auth_cache.set_team_membership_valid_sync(user_email, team_ids, True) 

469 return True 

470 finally: 

471 # Only commit/close if we created the session 

472 if owns_session: 

473 try: 

474 db.commit() # Commit read-only transaction to avoid implicit rollback 

475 finally: 

476 db.close() 

477 

478 def _check_resource_team_ownership(self, request_path: str, token_teams: list, db=None, _user_email: str = None) -> bool: # pylint: disable=too-many-return-statements 

479 """ 

480 Check if the requested resource is accessible by the token. 

481 

482 Implements Three-Tier Resource Visibility (Public/Team/Private): 

483 - PUBLIC: Accessible by all tokens (public-only and team-scoped) 

484 - TEAM: Accessible only by tokens scoped to that specific team 

485 - PRIVATE: Accessible only by tokens scoped to that specific team 

486 

487 Token Access Rules: 

488 - Public-only tokens (empty token_teams): Can access public resources + their own resources 

489 - Team-scoped tokens: Can access their team's resources + public resources 

490 

491 Handles URLs like: 

492 - /servers/{id}/mcp 

493 - /servers/{id}/sse 

494 - /servers/{id} 

495 - /tools/{id}/execute 

496 - /tools/{id} 

497 - /resources/{id} 

498 - /prompts/{id} 

499 

500 Args: 

501 request_path: The request path/URL 

502 token_teams: List of team IDs from the token (empty list = public-only token) 

503 db: Optional database session. If provided, caller manages lifecycle. 

504 If None, creates and manages its own session. 

505 

506 Returns: 

507 bool: True if resource access is allowed, False otherwise 

508 """ 

509 # Normalize token_teams: extract team IDs from dict objects (backward compatibility) 

510 token_team_ids = [] 

511 for team in token_teams: 

512 if isinstance(team, dict): 

513 token_team_ids.append(team["id"]) 

514 else: 

515 token_team_ids.append(team) 

516 

517 # Determine token type 

518 is_public_token = not token_team_ids or len(token_team_ids) == 0 

519 

520 if is_public_token: 

521 logger.debug("Processing request with PUBLIC-ONLY token") 

522 else: 

523 logger.debug(f"Processing request with TEAM-SCOPED token (teams: {token_teams})") 

524 

525 # Extract resource type and ID from path (uses precompiled regex patterns) 

526 # IDs are UUID hex strings (32 chars) or UUID with dashes (36 chars) 

527 resource_id = None 

528 resource_type = None 

529 

530 for pattern, rtype in _RESOURCE_PATTERNS: 

531 match = pattern.search(request_path) 

532 if match: 

533 resource_id = match.group(1) 

534 resource_type = rtype 

535 logger.debug(f"Extracted {rtype} ID: {resource_id} from path: {request_path}") 

536 break 

537 

538 # If no resource ID in path, allow (general endpoints like /health, /tokens, /metrics) 

539 if not resource_id or not resource_type: 

540 logger.debug(f"No resource ID found in path {request_path}, allowing access") 

541 return True 

542 

543 # Import database models 

544 # Third-Party 

545 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

546 

547 # First-Party 

548 from mcpgateway.db import Gateway, get_db, Prompt, Resource, Server, Tool # pylint: disable=import-outside-toplevel 

549 

550 # Track if we own the session (and thus must clean it up) 

551 owns_session = db is None 

552 if owns_session: 

553 db = next(get_db()) 

554 

555 try: 

556 # Check Virtual Servers 

557 if resource_type == "server": 

558 server = db.execute(select(Server).where(Server.id == resource_id)).scalar_one_or_none() 

559 

560 if not server: 

561 logger.warning(f"Server {resource_id} not found in database") 

562 return True 

563 

564 # Get server visibility (default to 'team' if field doesn't exist) 

565 server_visibility = getattr(server, "visibility", "team") 

566 

567 # PUBLIC SERVERS: Accessible by everyone (including public-only tokens) 

568 if server_visibility == "public": 

569 logger.debug(f"Access granted: Server {resource_id} is PUBLIC") 

570 return True 

571 

572 # PUBLIC-ONLY TOKEN: Can ONLY access public servers (strict public-only policy) 

573 # No owner access - if user needs own resources, use a personal team-scoped token 

574 if is_public_token: 

575 logger.warning(f"Access denied: Public-only token cannot access {server_visibility} server {resource_id}") 

576 return False 

577 

578 # TEAM-SCOPED SERVERS: Check if server belongs to token's teams 

579 if server_visibility == "team": 

580 if server.team_id in token_team_ids: 

581 logger.debug(f"Access granted: Team server {resource_id} belongs to token's team {server.team_id}") 

582 return True 

583 

584 logger.warning(f"Access denied: Server {resource_id} is team-scoped to '{server.team_id}', token is scoped to teams {token_team_ids}") 

585 return False 

586 

587 # PRIVATE SERVERS: Owner-only access (per RBAC doc) 

588 if server_visibility == "private": 

589 server_owner = getattr(server, "owner_email", None) 

590 if server_owner and server_owner == _user_email: 

591 logger.debug(f"Access granted: Private server {resource_id} owned by {_user_email}") 

592 return True 

593 

594 logger.warning(f"Access denied: Server {resource_id} is private, owner is '{server_owner}', requester is '{_user_email}'") 

595 return False 

596 

597 # Unknown visibility - deny by default 

598 logger.warning(f"Access denied: Server {resource_id} has unknown visibility: {server_visibility}") 

599 return False 

600 

601 # CHECK TOOLS 

602 if resource_type == "tool": 

603 tool = db.execute(select(Tool).where(Tool.id == resource_id)).scalar_one_or_none() 

604 

605 if not tool: 

606 logger.warning(f"Tool {resource_id} not found in database") 

607 return True 

608 

609 # Get tool visibility (default to 'team' if field doesn't exist) 

610 tool_visibility = getattr(tool, "visibility", "team") 

611 

612 # PUBLIC TOOLS: Accessible by everyone (including public-only tokens) 

613 if tool_visibility == "public": 

614 logger.debug(f"Access granted: Tool {resource_id} is PUBLIC") 

615 return True 

616 

617 # PUBLIC-ONLY TOKEN: Can ONLY access public tools (strict public-only policy) 

618 # No owner access - if user needs own resources, use a personal team-scoped token 

619 if is_public_token: 

620 logger.warning(f"Access denied: Public-only token cannot access {tool_visibility} tool {resource_id}") 

621 return False 

622 

623 # TEAM TOOLS: Check if tool's team matches token's teams 

624 if tool_visibility == "team": 

625 tool_team_id = getattr(tool, "team_id", None) 

626 if tool_team_id and tool_team_id in token_team_ids: 

627 logger.debug(f"Access granted: Team tool {resource_id} belongs to token's team {tool_team_id}") 

628 return True 

629 

630 logger.warning(f"Access denied: Tool {resource_id} is team-scoped to '{tool_team_id}', token is scoped to teams {token_team_ids}") 

631 return False 

632 

633 # PRIVATE TOOLS: Owner-only access (per RBAC doc) 

634 if tool_visibility in ["private", "user"]: 

635 tool_owner = getattr(tool, "owner_email", None) 

636 if tool_owner and tool_owner == _user_email: 

637 logger.debug(f"Access granted: Private tool {resource_id} owned by {_user_email}") 

638 return True 

639 

640 logger.warning(f"Access denied: Tool {resource_id} is {tool_visibility}, owner is '{tool_owner}', requester is '{_user_email}'") 

641 return False 

642 

643 # Unknown visibility - deny by default 

644 logger.warning(f"Access denied: Tool {resource_id} has unknown visibility: {tool_visibility}") 

645 return False 

646 

647 # CHECK RESOURCES 

648 if resource_type == "resource": 

649 resource = db.execute(select(Resource).where(Resource.id == resource_id)).scalar_one_or_none() 

650 

651 if not resource: 

652 logger.warning(f"Resource {resource_id} not found in database") 

653 return True 

654 

655 # Get resource visibility (default to 'team' if field doesn't exist) 

656 resource_visibility = getattr(resource, "visibility", "team") 

657 

658 # PUBLIC RESOURCES: Accessible by everyone (including public-only tokens) 

659 if resource_visibility == "public": 

660 logger.debug(f"Access granted: Resource {resource_id} is PUBLIC") 

661 return True 

662 

663 # PUBLIC-ONLY TOKEN: Can ONLY access public resources (strict public-only policy) 

664 # No owner access - if user needs own resources, use a personal team-scoped token 

665 if is_public_token: 

666 logger.warning(f"Access denied: Public-only token cannot access {resource_visibility} resource {resource_id}") 

667 return False 

668 

669 # TEAM RESOURCES: Check if resource's team matches token's teams 

670 if resource_visibility == "team": 

671 resource_team_id = getattr(resource, "team_id", None) 

672 if resource_team_id and resource_team_id in token_team_ids: 

673 logger.debug(f"Access granted: Team resource {resource_id} belongs to token's team {resource_team_id}") 

674 return True 

675 

676 logger.warning(f"Access denied: Resource {resource_id} is team-scoped to '{resource_team_id}', token is scoped to teams {token_team_ids}") 

677 return False 

678 

679 # PRIVATE RESOURCES: Owner-only access (per RBAC doc) 

680 if resource_visibility in ["private", "user"]: 

681 resource_owner = getattr(resource, "owner_email", None) 

682 if resource_owner and resource_owner == _user_email: 

683 logger.debug(f"Access granted: Private resource {resource_id} owned by {_user_email}") 

684 return True 

685 

686 logger.warning(f"Access denied: Resource {resource_id} is {resource_visibility}, owner is '{resource_owner}', requester is '{_user_email}'") 

687 return False 

688 

689 # Unknown visibility - deny by default 

690 logger.warning(f"Access denied: Resource {resource_id} has unknown visibility: {resource_visibility}") 

691 return False 

692 

693 # CHECK PROMPTS 

694 if resource_type == "prompt": 

695 prompt = db.execute(select(Prompt).where(Prompt.id == resource_id)).scalar_one_or_none() 

696 

697 if not prompt: 

698 logger.warning(f"Prompt {resource_id} not found in database") 

699 return True 

700 

701 # Get prompt visibility (default to 'team' if field doesn't exist) 

702 prompt_visibility = getattr(prompt, "visibility", "team") 

703 

704 # PUBLIC PROMPTS: Accessible by everyone (including public-only tokens) 

705 if prompt_visibility == "public": 

706 logger.debug(f"Access granted: Prompt {resource_id} is PUBLIC") 

707 return True 

708 

709 # PUBLIC-ONLY TOKEN: Can ONLY access public prompts (strict public-only policy) 

710 # No owner access - if user needs own resources, use a personal team-scoped token 

711 if is_public_token: 

712 logger.warning(f"Access denied: Public-only token cannot access {prompt_visibility} prompt {resource_id}") 

713 return False 

714 

715 # TEAM PROMPTS: Check if prompt's team matches token's teams 

716 if prompt_visibility == "team": 

717 prompt_team_id = getattr(prompt, "team_id", None) 

718 if prompt_team_id and prompt_team_id in token_team_ids: 

719 logger.debug(f"Access granted: Team prompt {resource_id} belongs to token's team {prompt_team_id}") 

720 return True 

721 

722 logger.warning(f"Access denied: Prompt {resource_id} is team-scoped to '{prompt_team_id}', token is scoped to teams {token_team_ids}") 

723 return False 

724 

725 # PRIVATE PROMPTS: Owner-only access (per RBAC doc) 

726 if prompt_visibility in ["private", "user"]: 

727 prompt_owner = getattr(prompt, "owner_email", None) 

728 if prompt_owner and prompt_owner == _user_email: 

729 logger.debug(f"Access granted: Private prompt {resource_id} owned by {_user_email}") 

730 return True 

731 

732 logger.warning(f"Access denied: Prompt {resource_id} is {prompt_visibility}, owner is '{prompt_owner}', requester is '{_user_email}'") 

733 return False 

734 

735 # Unknown visibility - deny by default 

736 logger.warning(f"Access denied: Prompt {resource_id} has unknown visibility: {prompt_visibility}") 

737 return False 

738 

739 # CHECK GATEWAYS 

740 if resource_type == "gateway": 

741 gateway = db.execute(select(Gateway).where(Gateway.id == resource_id)).scalar_one_or_none() 

742 

743 if not gateway: 

744 logger.warning(f"Gateway {resource_id} not found in database") 

745 return True 

746 

747 # Get gateway visibility (default to 'team' if field doesn't exist) 

748 gateway_visibility = getattr(gateway, "visibility", "team") 

749 

750 # PUBLIC GATEWAYS: Accessible by everyone (including public-only tokens) 

751 if gateway_visibility == "public": 

752 logger.debug(f"Access granted: Gateway {resource_id} is PUBLIC") 

753 return True 

754 

755 # PUBLIC-ONLY TOKEN: Can ONLY access public gateways (strict public-only policy) 

756 # No owner access - if user needs own resources, use a personal team-scoped token 

757 if is_public_token: 

758 logger.warning(f"Access denied: Public-only token cannot access {gateway_visibility} gateway {resource_id}") 

759 return False 

760 

761 # TEAM GATEWAYS: Check if gateway's team matches token's teams 

762 if gateway_visibility == "team": 

763 gateway_team_id = getattr(gateway, "team_id", None) 

764 if gateway_team_id and gateway_team_id in token_team_ids: 

765 logger.debug(f"Access granted: Team gateway {resource_id} belongs to token's team {gateway_team_id}") 

766 return True 

767 

768 logger.warning(f"Access denied: Gateway {resource_id} is team-scoped to '{gateway_team_id}', token is scoped to teams {token_team_ids}") 

769 return False 

770 

771 # PRIVATE GATEWAYS: Owner-only access (per RBAC doc) 

772 if gateway_visibility in ["private", "user"]: 

773 gateway_owner = getattr(gateway, "owner_email", None) 

774 if gateway_owner and gateway_owner == _user_email: 

775 logger.debug(f"Access granted: Private gateway {resource_id} owned by {_user_email}") 

776 return True 

777 

778 logger.warning(f"Access denied: Gateway {resource_id} is {gateway_visibility}, owner is '{gateway_owner}', requester is '{_user_email}'") 

779 return False 

780 

781 # Unknown visibility - deny by default 

782 logger.warning(f"Access denied: Gateway {resource_id} has unknown visibility: {gateway_visibility}") 

783 return False 

784 

785 # UNKNOWN RESOURCE TYPE 

786 logger.warning(f"Unknown resource type '{resource_type}' for path: {request_path}") 

787 return False 

788 

789 except Exception as e: 

790 logger.error(f"Error checking resource team ownership for {request_path}: {e}", exc_info=True) 

791 # Fail securely - deny access on error 

792 return False 

793 finally: 

794 # Only commit/close if we created the session 

795 if owns_session: 

796 try: 

797 db.commit() # Commit read-only transaction to avoid implicit rollback 

798 finally: 

799 db.close() 

800 

801 async def __call__(self, request: Request, call_next): 

802 """Middleware function to check token scoping including team-level validation. 

803 

804 Args: 

805 request: FastAPI request object 

806 call_next: Next middleware/handler in chain 

807 

808 Returns: 

809 Response from next handler or HTTPException 

810 

811 Raises: 

812 HTTPException: If token scoping restrictions are violated 

813 """ 

814 try: 

815 # Skip if already scoped (prevents double-scoping for /mcp requests) 

816 # MCPPathRewriteMiddleware runs scoping via dispatch, then routes through 

817 # middleware stack which hits BaseHTTPMiddleware's scoping again. 

818 # Use request.state flag which persists across middleware invocations. 

819 if getattr(request.state, "_token_scoping_done", False): 

820 return await call_next(request) 

821 

822 # Mark as scoped before doing any work 

823 request.state._token_scoping_done = True 

824 

825 # Skip scoping for certain paths (truly public endpoints only) 

826 skip_paths = [ 

827 "/health", 

828 "/metrics", 

829 "/openapi.json", 

830 "/docs", 

831 "/redoc", 

832 "/auth/email/login", 

833 "/auth/email/register", 

834 "/.well-known/", 

835 ] 

836 

837 # Check exact root path separately 

838 if request.url.path == "/": 

839 return await call_next(request) 

840 

841 if any(request.url.path.startswith(path) for path in skip_paths): 

842 return await call_next(request) 

843 

844 # Skip server-specific well-known endpoints (RFC 9728) 

845 if re.match(r"^/servers/[^/]+/\.well-known/", request.url.path): 

846 return await call_next(request) 

847 

848 # Extract full token payload (not just scopes) 

849 payload = await self._extract_token_scopes(request) 

850 

851 # If no payload, continue (regular auth will handle this) 

852 if not payload: 

853 return await call_next(request) 

854 

855 # TEAM VALIDATION: Use single DB session for both team checks 

856 # This reduces connection pool overhead from 2 sessions to 1 for resource endpoints 

857 user_email = payload.get("sub") or payload.get("email") # Extract user email for ownership check 

858 

859 # Resolve teams based on token_use claim 

860 token_use = payload.get("token_use") 

861 if token_use == "session": # nosec B105 - Not a password; token_use is a JWT claim type 

862 # Session token: resolve teams from DB/cache directly 

863 # Cannot rely on request.state.token_teams — AuthContextMiddleware 

864 # is gated by security_logging_enabled (defaults to False) 

865 # First-Party 

866 from mcpgateway.auth import _resolve_teams_from_db # pylint: disable=import-outside-toplevel 

867 

868 is_admin = payload.get("is_admin", False) or payload.get("user", {}).get("is_admin", False) 

869 user_info = {"is_admin": is_admin} 

870 token_teams = await _resolve_teams_from_db(user_email, user_info) 

871 else: 

872 # API token or legacy: use embedded teams with normalize_token_teams 

873 token_teams = normalize_token_teams(payload) 

874 

875 # Check if admin bypass is active (token_teams is None means admin with explicit null teams) 

876 is_admin_bypass = token_teams is None 

877 

878 # Admin with explicit null teams bypasses team validation entirely 

879 if is_admin_bypass: 

880 logger.debug(f"Admin bypass: skipping team validation for {user_email}") 

881 # Skip to other checks (server_id, IP, etc.) 

882 elif token_teams: 

883 # First-Party 

884 from mcpgateway.db import get_db # pylint: disable=import-outside-toplevel 

885 

886 db = next(get_db()) 

887 try: 

888 # Check team membership with shared session 

889 if not self._check_team_membership(payload, db=db): 

890 logger.warning("Token rejected: User no longer member of associated team(s)") 

891 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token is invalid: User is no longer a member of the associated team") 

892 

893 # Check resource team ownership with shared session 

894 if not self._check_resource_team_ownership(request.url.path, token_teams, db=db, _user_email=user_email): 

895 logger.warning(f"Access denied: Resource does not belong to token's teams {token_teams}") 

896 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied: You do not have permission to access this resource using the current token") 

897 finally: 

898 # Ensure session cleanup even if checks raise exceptions 

899 try: 

900 db.commit() 

901 finally: 

902 db.close() 

903 else: 

904 # Public-only token: no team membership check needed, but still check resource ownership 

905 if not self._check_team_membership(payload): 

906 logger.warning("Token rejected: User no longer member of associated team(s)") 

907 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token is invalid: User is no longer a member of the associated team") 

908 

909 if not self._check_resource_team_ownership(request.url.path, token_teams, _user_email=user_email): 

910 logger.warning(f"Access denied: Resource does not belong to token's teams {token_teams}") 

911 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied: You do not have permission to access this resource using the current token") 

912 

913 # Extract scopes from payload 

914 scopes = payload.get("scopes", {}) 

915 

916 # Check server ID restriction 

917 server_id = scopes.get("server_id") 

918 if not self._check_server_restriction(request.url.path, server_id): 

919 logger.warning(f"Token not authorized for this server. Required: {server_id}") 

920 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"Token not authorized for this server. Required: {server_id}") 

921 

922 # Check IP restrictions 

923 ip_restrictions = scopes.get("ip_restrictions", []) 

924 if ip_restrictions: 

925 client_ip = self._get_client_ip(request) 

926 if not self._check_ip_restrictions(client_ip, ip_restrictions): 926 ↛ 931line 926 didn't jump to line 931 because the condition on line 926 was always true

927 logger.warning(f"Request from IP {client_ip} not allowed by token restrictions") 

928 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"Request from IP {client_ip} not allowed by token restrictions") 

929 

930 # Check time restrictions 

931 time_restrictions = scopes.get("time_restrictions", {}) 

932 if not self._check_time_restrictions(time_restrictions): 

933 logger.warning("Request not allowed at this time by token restrictions") 

934 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Request not allowed at this time by token restrictions") 

935 

936 # Check permission restrictions 

937 permissions = scopes.get("permissions", []) 

938 if not self._check_permission_restrictions(request.url.path, request.method, permissions): 

939 logger.warning("Insufficient permissions for this operation") 

940 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions for this operation") 

941 

942 # All scoping checks passed, continue 

943 return await call_next(request) 

944 

945 except HTTPException as exc: 

946 # Return clean JSON response instead of traceback 

947 return ORJSONResponse( 

948 status_code=exc.status_code, 

949 content={"detail": exc.detail}, 

950 ) 

951 

952 

953# Create middleware instance 

954token_scoping_middleware = TokenScopingMiddleware()