Coverage for mcpgateway / middleware / token_scoping.py: 99%

524 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/middleware/token_scoping.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Token Scoping Middleware. 

8This middleware enforces token scoping restrictions at the API level, 

9including server_id restrictions, IP restrictions, permission checks, 

10and time-based restrictions. 

11""" 

12 

13# Standard 

14from datetime import datetime, timedelta, timezone 

15from functools import lru_cache 

16import hashlib 

17import hmac 

18import ipaddress 

19import re 

20from typing import List, Optional, Pattern, Tuple 

21 

22# Third-Party 

23from fastapi import HTTPException, Request, status 

24from fastapi.security import HTTPBearer 

25from sqlalchemy import and_, func, select 

26 

27# First-Party 

28from mcpgateway.auth import normalize_token_teams, resolve_session_teams 

29from mcpgateway.common.validators import SecurityValidator 

30from mcpgateway.config import settings 

31from mcpgateway.db import Permissions 

32from mcpgateway.middleware.rbac import _ACCESS_DENIED_MSG 

33from mcpgateway.services.logging_service import LoggingService 

34from mcpgateway.utils.orjson_response import ORJSONResponse 

35from mcpgateway.utils.verify_credentials import verify_jwt_token_cached 

36 

37# Security scheme 

38bearer_scheme = HTTPBearer(auto_error=False) 

39 

40# Initialize logging service first 

41logging_service = LoggingService() 

42logger = logging_service.get_logger(__name__) 

43 

44# ============================================================================ 

45# Precompiled regex patterns (compiled once at module load for performance) 

46# ============================================================================ 

47 

48# Server path extraction patterns 

49_SERVER_PATH_PATTERNS: List[Pattern[str]] = [ 

50 re.compile(r"^/servers/([^/]+)(?:$|/)"), 

51 re.compile(r"^/sse/([^/?]+)(?:$|\?)"), 

52 re.compile(r"^/ws/([^/?]+)(?:$|\?)"), 

53] 

54 

55# Resource ID extraction patterns (IDs are UUID hex strings) 

56_RESOURCE_PATTERNS: List[Tuple[Pattern[str], str]] = [ 

57 (re.compile(r"/servers/?([a-f0-9\-]+)"), "server"), 

58 (re.compile(r"/tools/?([a-f0-9\-]+)"), "tool"), 

59 (re.compile(r"/resources/?([a-f0-9\-]+)"), "resource"), 

60 (re.compile(r"/prompts/?([a-f0-9\-]+)"), "prompt"), 

61 (re.compile(r"/gateways/?([a-f0-9\-]+)"), "gateway"), 

62] 

63_AUTH_COOKIE_NAMES = ("jwt_token", "access_token") 

64_INTERNAL_MCP_PATH_PREFIX = "/_internal/mcp" 

65_INTERNAL_MCP_RUNTIME_HEADER = "x-contextforge-mcp-runtime" 

66_INTERNAL_MCP_AUTH_CONTEXT_HEADER = "x-contextforge-auth-context" 

67_INTERNAL_MCP_RUNTIME_AUTH_HEADER = "x-contextforge-mcp-runtime-auth" 

68_INTERNAL_MCP_RUNTIME_AUTH_CONTEXT = "contextforge-internal-mcp-runtime-v1" 

69 

70# Permission map with precompiled patterns 

71# Maps (HTTP method, path pattern) to required permission 

72_PERMISSION_PATTERNS: List[Tuple[str, Pattern[str], str]] = [ 

73 # Tools permissions 

74 ("GET", re.compile(r"^/tools(?:$|/)"), Permissions.TOOLS_READ), 

75 ("POST", re.compile(r"^/tools/?$"), Permissions.TOOLS_CREATE), # Only exact /tools or /tools/ 

76 ("POST", re.compile(r"^/tools/[^/]+/"), Permissions.TOOLS_UPDATE), # POST to sub-resources (state, toggle) 

77 ("PUT", re.compile(r"^/tools/[^/]+(?:$|/)"), Permissions.TOOLS_UPDATE), 

78 ("DELETE", re.compile(r"^/tools/[^/]+(?:$|/)"), Permissions.TOOLS_DELETE), 

79 ("GET", re.compile(r"^/servers/[^/]+/tools(?:$|/)"), Permissions.TOOLS_READ), 

80 ("POST", re.compile(r"^/servers/[^/]+/tools/[^/]+/call(?:$|/)"), Permissions.TOOLS_EXECUTE), 

81 # JSON-RPC endpoint — multiplexes tools/call, resources/list, initialize, etc. 

82 # Fine-grained per-method RBAC is enforced downstream by _ensure_rpc_permission(); 

83 # the middleware only gates transport-level access via servers.use. 

84 ("POST", re.compile(r"^/rpc(?:$|/)"), Permissions.SERVERS_USE), 

85 # SSE transport — like /rpc and /mcp, this is a transport-level endpoint; 

86 # the handler's own @require_permission enforces fine-grained RBAC. 

87 ("GET", re.compile(r"^/sse(?:$|/)"), Permissions.SERVERS_USE), 

88 # Streamable HTTP MCP transport (POST=send, GET=SSE stream, DELETE=session termination) 

89 ("POST", re.compile(r"^/mcp(?:$|/)"), Permissions.SERVERS_USE), 

90 ("GET", re.compile(r"^/mcp(?:$|/)"), Permissions.SERVERS_USE), 

91 ("DELETE", re.compile(r"^/mcp(?:$|/)"), Permissions.SERVERS_USE), 

92 # Resources permissions 

93 ("GET", re.compile(r"^/resources(?:$|/)"), Permissions.RESOURCES_READ), 

94 ("POST", re.compile(r"^/resources/?$"), Permissions.RESOURCES_CREATE), # Only exact /resources or /resources/ 

95 ("POST", re.compile(r"^/resources/subscribe(?:$|/)"), Permissions.RESOURCES_READ), # SSE subscription 

96 ("POST", re.compile(r"^/resources/[^/]+/"), Permissions.RESOURCES_UPDATE), # POST to sub-resources (state, toggle) 

97 ("PUT", re.compile(r"^/resources/[^/]+(?:$|/)"), Permissions.RESOURCES_UPDATE), 

98 ("DELETE", re.compile(r"^/resources/[^/]+(?:$|/)"), Permissions.RESOURCES_DELETE), 

99 ("GET", re.compile(r"^/servers/[^/]+/resources(?:$|/)"), Permissions.RESOURCES_READ), 

100 # Prompts permissions 

101 ("GET", re.compile(r"^/prompts(?:$|/)"), Permissions.PROMPTS_READ), 

102 ("POST", re.compile(r"^/prompts/?$"), Permissions.PROMPTS_CREATE), # Only exact /prompts or /prompts/ 

103 ("POST", re.compile(r"^/prompts/[^/]+/"), Permissions.PROMPTS_UPDATE), # POST to sub-resources (state, toggle) 

104 ("POST", re.compile(r"^/prompts/[^/]+$"), Permissions.PROMPTS_READ), # MCP spec prompt retrieval (POST /prompts/{id}) 

105 ("PUT", re.compile(r"^/prompts/[^/]+(?:$|/)"), Permissions.PROMPTS_UPDATE), 

106 ("DELETE", re.compile(r"^/prompts/[^/]+(?:$|/)"), Permissions.PROMPTS_DELETE), 

107 # Server management permissions 

108 ("GET", re.compile(r"^/servers/[^/]+/sse(?:$|/)"), Permissions.SERVERS_USE), # Server SSE access endpoint 

109 ("GET", re.compile(r"^/servers(?:$|/)"), Permissions.SERVERS_READ), 

110 ("POST", re.compile(r"^/servers/?$"), Permissions.SERVERS_CREATE), # Only exact /servers or /servers/ 

111 ("POST", re.compile(r"^/servers/[^/]+/(?:state|toggle)(?:$|/)"), Permissions.SERVERS_UPDATE), # Server management sub-resources 

112 ("POST", re.compile(r"^/servers/[^/]+/message(?:$|/)"), Permissions.SERVERS_USE), # Server message access endpoint 

113 ("POST", re.compile(r"^/servers/[^/]+/mcp(?:$|/)"), Permissions.SERVERS_USE), # Server MCP access endpoint 

114 ("PUT", re.compile(r"^/servers/[^/]+(?:$|/)"), Permissions.SERVERS_UPDATE), 

115 ("DELETE", re.compile(r"^/servers/[^/]+(?:$|/)"), Permissions.SERVERS_DELETE), 

116 # Gateway permissions 

117 ("GET", re.compile(r"^/gateways(?:$|/)"), Permissions.GATEWAYS_READ), 

118 ("POST", re.compile(r"^/gateways/?$"), Permissions.GATEWAYS_CREATE), # Only exact /gateways or /gateways/ 

119 ("POST", re.compile(r"^/gateways/[^/]+/"), Permissions.GATEWAYS_UPDATE), # POST to sub-resources (state, toggle, refresh) 

120 ("PUT", re.compile(r"^/gateways/[^/]+(?:$|/)"), Permissions.GATEWAYS_UPDATE), 

121 ("DELETE", re.compile(r"^/gateways/[^/]+(?:$|/)"), Permissions.GATEWAYS_DELETE), 

122 # Metrics permissions 

123 ("GET", re.compile(r"^/metrics(?:$|/)"), Permissions.ADMIN_METRICS), 

124 ("POST", re.compile(r"^/metrics/reset(?:$|/)"), Permissions.ADMIN_METRICS), 

125 # Token permissions 

126 ("GET", re.compile(r"^/tokens(?:$|/)"), Permissions.TOKENS_READ), 

127 ("POST", re.compile(r"^/tokens/?$"), Permissions.TOKENS_CREATE), # Only exact /tokens or /tokens/ 

128 ("POST", re.compile(r"^/tokens/teams/[^/]+(?:$|/)"), Permissions.TOKENS_CREATE), 

129 ("PUT", re.compile(r"^/tokens/[^/]+(?:$|/)"), Permissions.TOKENS_UPDATE), 

130 ("DELETE", re.compile(r"^/tokens/[^/]+(?:$|/)"), Permissions.TOKENS_REVOKE), 

131] 

132 

133# Admin route permission map (granular by route group). 

134# IMPORTANT: Unmatched /admin/* paths are denied by default (fail-secure). 

135_ADMIN_PERMISSION_PATTERNS: List[Tuple[str, Pattern[str], str]] = [ 

136 # Dashboard/overview surfaces 

137 ("GET", re.compile(r"^/admin/?$"), Permissions.ADMIN_DASHBOARD), 

138 ("GET", re.compile(r"^/admin/search(?:$|/)"), Permissions.ADMIN_DASHBOARD), 

139 ("GET", re.compile(r"^/admin/overview(?:$|/)"), Permissions.ADMIN_OVERVIEW), 

140 # User management 

141 ("GET", re.compile(r"^/admin/users(?:$|/)"), Permissions.ADMIN_USER_MANAGEMENT), 

142 ("POST", re.compile(r"^/admin/users(?:$|/)"), Permissions.ADMIN_USER_MANAGEMENT), 

143 ("DELETE", re.compile(r"^/admin/users(?:$|/)"), Permissions.ADMIN_USER_MANAGEMENT), 

144 # Team management 

145 ("POST", re.compile(r"^/admin/teams/?$"), Permissions.TEAMS_CREATE), 

146 ("DELETE", re.compile(r"^/admin/teams/[^/]+/join-request/[^/]+(?:$|/)"), Permissions.TEAMS_JOIN), 

147 ("DELETE", re.compile(r"^/admin/teams/[^/]+(?:$|/)"), Permissions.TEAMS_DELETE), 

148 ("GET", re.compile(r"^/admin/teams/[^/]+/edit(?:$|/)"), Permissions.TEAMS_UPDATE), 

149 ("POST", re.compile(r"^/admin/teams/[^/]+/update(?:$|/)"), Permissions.TEAMS_UPDATE), 

150 ("GET", re.compile(r"^/admin/teams/[^/]+/(?:members/add|members/partial|non-members/partial|join-requests)(?:$|/)"), Permissions.TEAMS_MANAGE_MEMBERS), 

151 ("POST", re.compile(r"^/admin/teams/[^/]+/(?:add-member|update-member-role|remove-member|join-requests/[^/]+/(?:approve|reject))(?:$|/)"), Permissions.TEAMS_MANAGE_MEMBERS), 

152 ("POST", re.compile(r"^/admin/teams/[^/]+/(?:leave|join-request(?:/[^/]+)?)(?:$|/)"), Permissions.TEAMS_JOIN), 

153 ("GET", re.compile(r"^/admin/teams(?:$|/)"), Permissions.TEAMS_READ), 

154 # Tool management 

155 ("POST", re.compile(r"^/admin/tools/?$"), Permissions.TOOLS_CREATE), 

156 ("POST", re.compile(r"^/admin/tools/import(?:$|/)"), Permissions.TOOLS_CREATE), 

157 ("POST", re.compile(r"^/admin/tools/[^/]+/delete(?:$|/)"), Permissions.TOOLS_DELETE), 

158 ("POST", re.compile(r"^/admin/tools/[^/]+/(?:edit|state)(?:$|/)"), Permissions.TOOLS_UPDATE), 

159 ("GET", re.compile(r"^/admin/tools(?:$|/)"), Permissions.TOOLS_READ), 

160 # Resource management 

161 ("POST", re.compile(r"^/admin/resources/?$"), Permissions.RESOURCES_CREATE), 

162 ("POST", re.compile(r"^/admin/resources/[^/]+/delete(?:$|/)"), Permissions.RESOURCES_DELETE), 

163 ("POST", re.compile(r"^/admin/resources/[^/]+/(?:edit|state)(?:$|/)"), Permissions.RESOURCES_UPDATE), 

164 ("GET", re.compile(r"^/admin/resources(?:$|/)"), Permissions.RESOURCES_READ), 

165 # Prompt management 

166 ("POST", re.compile(r"^/admin/prompts/?$"), Permissions.PROMPTS_CREATE), 

167 ("POST", re.compile(r"^/admin/prompts/[^/]+/delete(?:$|/)"), Permissions.PROMPTS_DELETE), 

168 ("POST", re.compile(r"^/admin/prompts/[^/]+/(?:edit|state)(?:$|/)"), Permissions.PROMPTS_UPDATE), 

169 ("GET", re.compile(r"^/admin/prompts(?:$|/)"), Permissions.PROMPTS_READ), 

170 # Gateway management 

171 ("POST", re.compile(r"^/admin/gateways/test(?:$|/)"), Permissions.GATEWAYS_READ), 

172 ("POST", re.compile(r"^/admin/gateways/?$"), Permissions.GATEWAYS_CREATE), 

173 ("POST", re.compile(r"^/admin/gateways/[^/]+/delete(?:$|/)"), Permissions.GATEWAYS_DELETE), 

174 ("POST", re.compile(r"^/admin/gateways/[^/]+/(?:edit|state)(?:$|/)"), Permissions.GATEWAYS_UPDATE), 

175 ("GET", re.compile(r"^/admin/gateways(?:$|/)"), Permissions.GATEWAYS_READ), 

176 # Server management 

177 ("POST", re.compile(r"^/admin/servers/?$"), Permissions.SERVERS_CREATE), 

178 ("POST", re.compile(r"^/admin/servers/[^/]+/delete(?:$|/)"), Permissions.SERVERS_DELETE), 

179 ("POST", re.compile(r"^/admin/servers/[^/]+/(?:edit|state)(?:$|/)"), Permissions.SERVERS_UPDATE), 

180 ("GET", re.compile(r"^/admin/servers(?:$|/)"), Permissions.SERVERS_READ), 

181 # Token/tag read surfaces 

182 ("GET", re.compile(r"^/admin/tokens(?:$|/)"), Permissions.TOKENS_READ), 

183 ("GET", re.compile(r"^/admin/tags(?:$|/)"), Permissions.TAGS_READ), 

184 # A2A management 

185 ("POST", re.compile(r"^/admin/a2a/?$"), Permissions.A2A_CREATE), 

186 ("POST", re.compile(r"^/admin/a2a/[^/]+/delete(?:$|/)"), Permissions.A2A_DELETE), 

187 ("POST", re.compile(r"^/admin/a2a/[^/]+/(?:edit|state)(?:$|/)"), Permissions.A2A_UPDATE), 

188 ("POST", re.compile(r"^/admin/a2a/[^/]+/test(?:$|/)"), Permissions.A2A_INVOKE), 

189 ("GET", re.compile(r"^/admin/a2a(?:$|/)"), Permissions.A2A_READ), 

190 # Section partials 

191 ("GET", re.compile(r"^/admin/sections/resources(?:$|/)"), Permissions.RESOURCES_READ), 

192 ("GET", re.compile(r"^/admin/sections/prompts(?:$|/)"), Permissions.PROMPTS_READ), 

193 ("GET", re.compile(r"^/admin/sections/servers(?:$|/)"), Permissions.SERVERS_READ), 

194 ("GET", re.compile(r"^/admin/sections/gateways(?:$|/)"), Permissions.GATEWAYS_READ), 

195 # Specialized admin domains 

196 ("GET", re.compile(r"^/admin/events(?:$|/)"), Permissions.ADMIN_EVENTS), 

197 ("GET", re.compile(r"^/admin/grpc(?:$|/)"), Permissions.ADMIN_GRPC), 

198 ("POST", re.compile(r"^/admin/grpc(?:$|/)"), Permissions.ADMIN_GRPC), 

199 ("PUT", re.compile(r"^/admin/grpc(?:$|/)"), Permissions.ADMIN_GRPC), 

200 ("GET", re.compile(r"^/admin/plugins(?:$|/)"), Permissions.ADMIN_PLUGINS), 

201 ("POST", re.compile(r"^/admin/plugins(?:$|/)"), Permissions.ADMIN_PLUGINS), 

202 ("PUT", re.compile(r"^/admin/plugins(?:$|/)"), Permissions.ADMIN_PLUGINS), 

203 ("DELETE", re.compile(r"^/admin/plugins(?:$|/)"), Permissions.ADMIN_PLUGINS), 

204 # System configuration/admin operations 

205 ( 

206 "GET", 

207 re.compile(r"^/admin/(?:config|cache|mcp-pool|roots|metrics|logs|export|import|mcp-registry|system|support-bundle|maintenance|observability|performance|llm)(?:$|/)"), 

208 Permissions.ADMIN_SYSTEM_CONFIG, 

209 ), 

210 ( 

211 "POST", 

212 re.compile(r"^/admin/(?:config|cache|mcp-pool|roots|metrics|logs|export|import|mcp-registry|system|support-bundle|maintenance|observability|performance|llm)(?:$|/)"), 

213 Permissions.ADMIN_SYSTEM_CONFIG, 

214 ), 

215 ( 

216 "PUT", 

217 re.compile(r"^/admin/(?:config|cache|mcp-pool|roots|metrics|logs|export|import|mcp-registry|system|support-bundle|maintenance|observability|performance|llm)(?:$|/)"), 

218 Permissions.ADMIN_SYSTEM_CONFIG, 

219 ), 

220 ( 

221 "DELETE", 

222 re.compile(r"^/admin/(?:config|cache|mcp-pool|roots|metrics|logs|export|import|mcp-registry|system|support-bundle|maintenance|observability|performance|llm)(?:$|/)"), 

223 Permissions.ADMIN_SYSTEM_CONFIG, 

224 ), 

225] 

226 

227 

228def _normalize_llm_api_prefix(prefix: Optional[str]) -> str: 

229 """Normalize llm_api_prefix to a canonical path prefix. 

230 

231 Args: 

232 prefix: Raw LLM API prefix setting value. 

233 

234 Returns: 

235 str: Normalized path prefix, or empty string when prefix is empty or "/". 

236 """ 

237 if not prefix: 

238 return "" 

239 normalized = "/" + str(prefix).strip().strip("/") 

240 return "" if normalized == "/" else normalized 

241 

242 

243def _normalize_scope_path(scope_path: str, root_path: str) -> str: 

244 """Strip ``root_path`` from ``scope_path`` when the incoming path includes it. 

245 

246 Args: 

247 scope_path: Request path observed by middleware. 

248 root_path: Application root path prefix, if configured. 

249 

250 Returns: 

251 Path value normalized for permission and scope pattern matching. 

252 """ 

253 if root_path and len(root_path) > 1: 

254 root_path = root_path.rstrip("/") 

255 if root_path and len(root_path) > 1 and scope_path.startswith(root_path): 

256 rest = scope_path[len(root_path) :] 

257 # root_path="/app" must not strip from "/application/..." 

258 if rest == "" or rest.startswith("/"): 

259 return rest or "/" 

260 return scope_path 

261 

262 

263@lru_cache(maxsize=16) 

264def _get_llm_permission_patterns(prefix: str) -> Tuple[Tuple[str, Pattern[str], str], ...]: 

265 """Build precompiled permission patterns for LLM proxy endpoints. 

266 

267 Args: 

268 prefix: LLM API prefix used to mount proxy routes. 

269 

270 Returns: 

271 Tuple[Tuple[str, Pattern[str], str], ...]: Method/path regex to required permission mappings. 

272 """ 

273 normalized_prefix = _normalize_llm_api_prefix(prefix) 

274 escaped_prefix = re.escape(normalized_prefix) 

275 return ( 

276 # LLM proxy routes are exact endpoints (optionally with a trailing slash), 

277 # unlike many REST resources that intentionally include sub-resources. 

278 ("POST", re.compile(rf"^{escaped_prefix}/chat/completions/?$"), Permissions.LLM_INVOKE), 

279 ("GET", re.compile(rf"^{escaped_prefix}/models/?$"), Permissions.LLM_READ), 

280 ) 

281 

282 

283class TokenScopingMiddleware: 

284 """Middleware to enforce token scoping restrictions. 

285 

286 Examples: 

287 >>> middleware = TokenScopingMiddleware() 

288 >>> isinstance(middleware, TokenScopingMiddleware) 

289 True 

290 """ 

291 

292 def __init__(self): 

293 """Initialize token scoping middleware. 

294 

295 Examples: 

296 >>> middleware = TokenScopingMiddleware() 

297 >>> hasattr(middleware, '_extract_token_scopes') 

298 True 

299 """ 

300 

301 def _normalize_teams(self, teams) -> list: 

302 """Normalize teams from token payload to list of team IDs. 

303 

304 Handles various team formats: 

305 - None -> [] 

306 - List of strings -> as-is 

307 - List of dicts with 'id' key -> extract IDs 

308 

309 Args: 

310 teams: Raw teams value from JWT payload 

311 

312 Returns: 

313 List of team ID strings 

314 """ 

315 if not teams: 

316 return [] 

317 normalized = [] 

318 for team in teams: 

319 if isinstance(team, dict): 

320 team_id = team.get("id") 

321 if team_id: 

322 normalized.append(team_id) 

323 elif isinstance(team, str): 

324 normalized.append(team) 

325 return normalized 

326 

327 def _normalize_path_for_matching(self, request_path: str) -> str: 

328 """Normalize a path for team scoping and permission matching. 

329 

330 Args: 

331 request_path: Raw request path. 

332 

333 Returns: 

334 Normalized absolute path suitable for route matching. 

335 """ 

336 normalized = _normalize_scope_path(request_path or "/", settings.app_root_path or "") 

337 if not normalized.startswith("/"): 

338 return f"/{normalized}" 

339 return normalized 

340 

341 def _get_normalized_request_path(self, request: Request) -> str: 

342 """Resolve request path with APP_ROOT_PATH-aware normalization. 

343 

344 Args: 

345 request: Request object containing scope and URL data. 

346 

347 Returns: 

348 Normalized request path suitable for permission checks. 

349 """ 

350 scope = getattr(request, "scope", {}) or {} 

351 if not isinstance(scope, dict): 

352 scope = {} 

353 scope_path = request.url.path or scope.get("path") or "/" 

354 root_path = scope.get("root_path") or settings.app_root_path or "" 

355 normalized = _normalize_scope_path(scope_path, root_path) 

356 if not normalized.startswith("/"): 

357 return f"/{normalized}" 

358 return normalized 

359 

360 def _extract_jwt_token_from_request(self, request: Request) -> Optional[str]: 

361 """Extract JWT token from supported cookie names or Bearer auth header. 

362 

363 Args: 

364 request: Request object carrying cookies and headers. 

365 

366 Returns: 

367 JWT string when present and validly formatted; otherwise ``None``. 

368 """ 

369 cookies = getattr(request, "cookies", None) 

370 if cookies and hasattr(cookies, "get"): 

371 for cookie_name in _AUTH_COOKIE_NAMES: 

372 cookie_token = cookies.get(cookie_name) 

373 if isinstance(cookie_token, str) and cookie_token.strip(): 

374 return cookie_token.strip() 

375 

376 # Get authorization header and parse bearer scheme case-insensitively. 

377 auth_header = request.headers.get("Authorization") 

378 if not auth_header: 

379 return None 

380 

381 parts = auth_header.split(" ", 1) 

382 if len(parts) != 2 or parts[0].lower() != "bearer": 

383 return None 

384 

385 token = parts[1].strip() 

386 return token or None 

387 

388 async def _extract_token_scopes(self, request: Request) -> Optional[dict]: 

389 """Extract token scopes from JWT in request. 

390 

391 Args: 

392 request: FastAPI request object 

393 

394 Returns: 

395 Dict containing token scopes or None if no valid token 

396 """ 

397 token = self._extract_jwt_token_from_request(request) 

398 if not token: 

399 return None 

400 

401 try: 

402 # Use the centralized verify_jwt_token_cached function for consistent JWT validation 

403 payload = await verify_jwt_token_cached(token, request) 

404 return payload 

405 except HTTPException: 

406 # Token validation failed (expired, invalid, etc.) 

407 return None 

408 except Exception: 

409 # Any other error in token validation 

410 return None 

411 

412 def _get_client_ip(self, request: Request) -> str: 

413 """Extract client IP address from request. 

414 

415 Only trusts X-Forwarded-For / X-Real-IP headers when a trusted proxy 

416 configuration is in place (ProxyHeadersMiddleware with specific hosts). 

417 Otherwise, uses the direct connection IP to prevent header spoofing. 

418 

419 Args: 

420 request: FastAPI request object 

421 

422 Returns: 

423 str: Client IP address 

424 """ 

425 # Use direct client IP as the secure default. 

426 # Proxy headers are only trustworthy when Uvicorn/Starlette's 

427 # ProxyHeadersMiddleware has already rewritten request.client from a 

428 # trusted upstream. That middleware replaces request.client.host with 

429 # the real client IP, so we can rely on it directly. 

430 return request.client.host if request.client else "unknown" 

431 

432 def _check_ip_restrictions(self, client_ip: str, ip_restrictions: list) -> bool: 

433 """Check if client IP is allowed by restrictions. 

434 

435 Args: 

436 client_ip: Client's IP address 

437 ip_restrictions: List of allowed IP addresses/CIDR ranges 

438 

439 Returns: 

440 bool: True if IP is allowed, False otherwise 

441 

442 Examples: 

443 Allow specific IP: 

444 >>> m = TokenScopingMiddleware() 

445 >>> m._check_ip_restrictions('192.168.1.10', ['192.168.1.10']) 

446 True 

447 

448 Allow CIDR range: 

449 >>> m._check_ip_restrictions('10.0.0.5', ['10.0.0.0/24']) 

450 True 

451 

452 Deny when not in list: 

453 >>> m._check_ip_restrictions('10.0.1.5', ['10.0.0.0/24']) 

454 False 

455 

456 Empty restrictions allow all: 

457 >>> m._check_ip_restrictions('203.0.113.1', []) 

458 True 

459 """ 

460 if not ip_restrictions: 

461 return True # No restrictions 

462 

463 try: 

464 client_ip_obj = ipaddress.ip_address(client_ip) 

465 

466 for restriction in ip_restrictions: 

467 try: 

468 # Check if it's a CIDR range 

469 if "/" in restriction: 

470 network = ipaddress.ip_network(restriction, strict=False) 

471 if client_ip_obj in network: 

472 return True 

473 else: 

474 # Single IP address 

475 if client_ip_obj == ipaddress.ip_address(restriction): 

476 return True 

477 except (ValueError, ipaddress.AddressValueError): 

478 continue 

479 

480 except (ValueError, ipaddress.AddressValueError): 

481 return False 

482 

483 return False 

484 

485 def _check_time_restrictions(self, time_restrictions: dict) -> bool: 

486 """Check if current time is allowed by restrictions. 

487 

488 Args: 

489 time_restrictions: Dict containing time-based restrictions 

490 

491 Returns: 

492 bool: True if current time is allowed, False otherwise 

493 

494 Examples: 

495 No restrictions allow access: 

496 >>> m = TokenScopingMiddleware() 

497 >>> m._check_time_restrictions({}) 

498 True 

499 

500 Weekdays only: result depends on current weekday (always bool): 

501 >>> isinstance(m._check_time_restrictions({'weekdays_only': True}), bool) 

502 True 

503 

504 Business hours only: result depends on current hour (always bool): 

505 >>> isinstance(m._check_time_restrictions({'business_hours_only': True}), bool) 

506 True 

507 """ 

508 if not time_restrictions: 

509 return True # No restrictions 

510 

511 now = datetime.now(tz=timezone.utc) 

512 

513 # Check business hours restriction 

514 if time_restrictions.get("business_hours_only"): 

515 # Assume business hours are 9 AM to 5 PM UTC 

516 # This could be made configurable 

517 if not 9 <= now.hour < 17: 

518 return False 

519 

520 # Check day of week restrictions 

521 weekdays_only = time_restrictions.get("weekdays_only") 

522 if weekdays_only and now.weekday() >= 5: # Saturday=5, Sunday=6 

523 return False 

524 

525 return True 

526 

527 @staticmethod 

528 def _parse_positive_limit(value: object) -> Optional[int]: 

529 """Parse usage-limit values as positive integers. 

530 

531 Args: 

532 value: Candidate limit value from token scope configuration. 

533 

534 Returns: 

535 Parsed positive integer limit, or ``None`` when invalid/non-positive. 

536 """ 

537 try: 

538 parsed = int(value) 

539 except (TypeError, ValueError): 

540 return None 

541 return parsed if parsed > 0 else None 

542 

543 def _check_usage_limits(self, jti: Optional[str], usage_limits: dict) -> Tuple[bool, Optional[str]]: 

544 """Check token usage limits against recorded usage logs. 

545 

546 Args: 

547 jti: Token JTI identifier. 

548 usage_limits: Usage limits from token scope. 

549 

550 Returns: 

551 Tuple[bool, Optional[str]]: (allowed, denial_reason) 

552 """ 

553 if not isinstance(usage_limits, dict) or not usage_limits or not jti: 

554 return True, None 

555 

556 requests_per_hour = self._parse_positive_limit(usage_limits.get("requests_per_hour")) 

557 requests_per_day = self._parse_positive_limit(usage_limits.get("requests_per_day")) 

558 

559 if not requests_per_hour and not requests_per_day: 

560 return True, None 

561 

562 # First-Party 

563 from mcpgateway.db import get_db, TokenUsageLog # pylint: disable=import-outside-toplevel 

564 

565 db = next(get_db()) 

566 try: 

567 now = datetime.now(timezone.utc) 

568 

569 if requests_per_hour: 

570 hour_window_start = now - timedelta(hours=1) 

571 hourly_count = db.execute( 

572 # Pylint false-positive: SQLAlchemy func namespace is callable at runtime. 

573 # pylint: disable=not-callable 

574 select(func.count(TokenUsageLog.id)).where(and_(TokenUsageLog.token_jti == jti, TokenUsageLog.timestamp >= hour_window_start)) 

575 ).scalar() 

576 if int(hourly_count or 0) >= requests_per_hour: 

577 return False, "Hourly request limit exceeded" 

578 

579 if requests_per_day: 

580 day_window_start = now - timedelta(days=1) 

581 daily_count = db.execute( 

582 # Pylint false-positive: SQLAlchemy func namespace is callable at runtime. 

583 # pylint: disable=not-callable 

584 select(func.count(TokenUsageLog.id)).where(and_(TokenUsageLog.token_jti == jti, TokenUsageLog.timestamp >= day_window_start)) 

585 ).scalar() 

586 if int(daily_count or 0) >= requests_per_day: 

587 return False, "Daily request limit exceeded" 

588 except Exception as exc: 

589 logger.warning("Failed to evaluate token usage limits for jti %s: %s", jti, exc) 

590 return True, None 

591 finally: 

592 try: 

593 db.rollback() 

594 finally: 

595 db.close() 

596 

597 return True, None 

598 

599 def _check_server_restriction(self, request_path: str, server_id: Optional[str]) -> bool: 

600 """Check if request path matches server restriction. 

601 

602 Args: 

603 request_path: The request path/URL 

604 server_id: Required server ID (None means no restriction) 

605 

606 Returns: 

607 bool: True if request is allowed, False otherwise 

608 

609 Examples: 

610 Match server paths: 

611 >>> m = TokenScopingMiddleware() 

612 >>> m._check_server_restriction('/servers/abc/tools', 'abc') 

613 True 

614 >>> m._check_server_restriction('/sse/xyz', 'xyz') 

615 True 

616 >>> m._check_server_restriction('/ws/xyz?x=1', 'xyz') 

617 True 

618 

619 Mismatch denies: 

620 >>> m._check_server_restriction('/servers/def', 'abc') 

621 False 

622 

623 General endpoints allowed: 

624 >>> m._check_server_restriction('/health', 'abc') 

625 True 

626 >>> m._check_server_restriction('/', 'abc') 

627 True 

628 """ 

629 request_path = self._normalize_path_for_matching(request_path) 

630 

631 if not server_id: 

632 return True # No server restriction 

633 

634 # Extract server ID from path patterns (uses precompiled regex) 

635 # /servers/{server_id}/... 

636 # /sse/{server_id} 

637 # /ws/{server_id} 

638 for pattern in _SERVER_PATH_PATTERNS: 

639 match = pattern.search(request_path) 

640 if match: 

641 path_server_id = match.group(1) 

642 return path_server_id == server_id 

643 

644 # If no server ID found in path, allow general endpoints 

645 general_endpoints = ["/health", "/metrics", "/openapi.json", "/docs", "/redoc", "/rpc", "/mcp", "/sse"] 

646 

647 # Check exact root path separately 

648 if request_path == "/": 

649 return True 

650 

651 for endpoint in general_endpoints: 

652 if request_path.startswith(endpoint): 

653 return True 

654 

655 # Default deny for unmatched paths with server restrictions 

656 return False 

657 

658 def _check_permission_restrictions(self, request_path: str, request_method: str, permissions: list) -> bool: 

659 """Check if request is allowed by permission restrictions. 

660 

661 Args: 

662 request_path: The request path/URL 

663 request_method: HTTP method (GET, POST, etc.) 

664 permissions: List of allowed permissions 

665 

666 Returns: 

667 bool: True if request is allowed, False otherwise 

668 

669 Examples: 

670 Wildcard allows all: 

671 >>> m = TokenScopingMiddleware() 

672 >>> m._check_permission_restrictions('/tools', 'GET', ['*']) 

673 True 

674 

675 Requires specific permission: 

676 >>> m._check_permission_restrictions('/tools', 'POST', ['tools.create']) 

677 True 

678 >>> m._check_permission_restrictions('/tools/xyz', 'PUT', ['tools.update']) 

679 True 

680 >>> m._check_permission_restrictions('/resources', 'GET', ['resources.read']) 

681 True 

682 >>> m._check_permission_restrictions('/servers/s1/tools/abc/call', 'POST', ['tools.execute']) 

683 True 

684 

685 Missing permission denies: 

686 >>> m._check_permission_restrictions('/tools', 'POST', ['tools.read']) 

687 False 

688 """ 

689 request_path = self._normalize_path_for_matching(request_path) 

690 

691 if not permissions or "*" in permissions: 

692 return True # No restrictions or full access 

693 

694 # Handle admin routes with granular route-group mapping. 

695 # Unmapped /admin/* paths are denied by default (fail-secure). 

696 if request_path.startswith("/admin"): 

697 for method, path_pattern, required_permission in _ADMIN_PERMISSION_PATTERNS: 

698 if request_method == method and path_pattern.match(request_path): 

699 return required_permission in permissions 

700 return False 

701 

702 # Check each permission mapping (uses precompiled regex patterns) 

703 for method, path_pattern, required_permission in _PERMISSION_PATTERNS: 

704 if request_method == method and path_pattern.match(request_path): 

705 if required_permission in permissions: 

706 return True 

707 # Runtime compensation: tokens with MCP method permissions 

708 # (tools.*, resources.*, prompts.*) implicitly have transport 

709 # access (servers.use) — mirrors the generation-time injection 

710 # in token_catalog_service._generate_token() for pre-existing tokens. 

711 if required_permission == Permissions.SERVERS_USE: 

712 if any(p.startswith(Permissions.MCP_METHOD_PREFIXES) for p in permissions): 

713 logger.debug("Runtime servers.use compensation applied for token with MCP method permissions: %s", permissions) 

714 return True 

715 return False 

716 return False 

717 

718 # LLM proxy permissions (respect configured llm_api_prefix). 

719 for method, path_pattern, required_permission in _get_llm_permission_patterns(settings.llm_api_prefix): 

720 if request_method == method and path_pattern.match(request_path): 

721 return required_permission in permissions 

722 

723 # Default deny for unmatched paths (requires explicit permission mapping) 

724 return False 

725 

726 def _check_team_membership(self, payload: dict, db=None) -> bool: 

727 """ 

728 Check if user still belongs to teams in the token. 

729 

730 For public-only tokens (no teams), always returns True. 

731 For team-scoped tokens, validates membership with caching. 

732 

733 Uses in-memory cache (per gateway instance, 60s TTL) to avoid repeated 

734 email_team_members queries for the same user+teams combination. 

735 Note: Sync path uses in-memory only for performance; Redis is not 

736 consulted to avoid async overhead in the hot path. 

737 

738 Args: 

739 payload: Decoded JWT payload containing teams 

740 db: Optional database session. If provided, caller manages lifecycle. 

741 If None, creates and manages its own session. 

742 

743 Returns: 

744 bool: True if team membership is valid, False otherwise 

745 """ 

746 teams = payload.get("teams", []) 

747 user_email = payload.get("sub") 

748 

749 # PUBLIC-ONLY TOKEN: No team validation needed 

750 if not teams or len(teams) == 0: 

751 logger.debug(f"Public-only token for user {SecurityValidator.sanitize_log_message(user_email)} - no team validation required") 

752 return True 

753 

754 # TEAM-SCOPED TOKEN: Validate membership 

755 if not user_email: 

756 logger.warning("Token missing user email") 

757 return False 

758 

759 # Extract team IDs from token (handles both dict and string formats) 

760 team_ids = [team["id"] if isinstance(team, dict) else team for team in teams] 

761 

762 # First-Party 

763 from mcpgateway.cache.auth_cache import get_auth_cache # pylint: disable=import-outside-toplevel 

764 

765 # Check cache first (synchronous in-memory lookup) 

766 auth_cache = get_auth_cache() 

767 cached_result = auth_cache.get_team_membership_valid_sync(user_email, team_ids) 

768 if cached_result is not None: 

769 if not cached_result: 

770 logger.warning(f"Token invalid (cached): User {SecurityValidator.sanitize_log_message(user_email)} no longer member of teams") 

771 return cached_result 

772 

773 # Cache miss - query database 

774 # First-Party 

775 from mcpgateway.db import EmailTeamMember, get_db # pylint: disable=import-outside-toplevel 

776 

777 # Track if we own the session (and thus must clean it up) 

778 owns_session = db is None 

779 if owns_session: 

780 db = next(get_db()) 

781 

782 try: 

783 # Single query for all teams (fixes N+1 pattern) 

784 memberships = ( 

785 db.execute( 

786 select(EmailTeamMember.team_id).where( 

787 EmailTeamMember.team_id.in_(team_ids), 

788 EmailTeamMember.user_email == user_email, 

789 EmailTeamMember.is_active.is_(True), 

790 ) 

791 ) 

792 .scalars() 

793 .all() 

794 ) 

795 

796 # Check if user is member of ALL teams in token 

797 valid_team_ids = set(memberships) 

798 missing_teams = set(team_ids) - valid_team_ids 

799 

800 if missing_teams: 

801 logger.warning(f"Token invalid: User {SecurityValidator.sanitize_log_message(user_email)} no longer member of teams: {SecurityValidator.sanitize_log_message(str(missing_teams))}") 

802 # Cache negative result 

803 auth_cache.set_team_membership_valid_sync(user_email, team_ids, False) 

804 return False 

805 

806 # Cache positive result 

807 auth_cache.set_team_membership_valid_sync(user_email, team_ids, True) 

808 return True 

809 finally: 

810 # Only commit/close if we created the session 

811 if owns_session: 

812 try: 

813 db.commit() # Commit read-only transaction to avoid implicit rollback 

814 finally: 

815 db.close() 

816 

817 def _check_resource_team_ownership(self, request_path: str, token_teams: list, db=None, _user_email: str = None) -> bool: # noqa: PLR0911 # pylint: disable=too-many-return-statements 

818 """ 

819 Check if the requested resource is accessible by the token. 

820 

821 Implements Three-Tier Resource Visibility (Public/Team/Private): 

822 - PUBLIC: Accessible by all tokens (public-only and team-scoped) 

823 - TEAM: Accessible only by tokens scoped to that specific team 

824 - PRIVATE: Accessible only by tokens scoped to that specific team 

825 

826 Token Access Rules: 

827 - Public-only tokens (empty token_teams): Can access public resources + their own resources 

828 - Team-scoped tokens: Can access their team's resources + public resources 

829 

830 Handles URLs like: 

831 - /servers/{id}/mcp 

832 - /servers/{id}/sse 

833 - /servers/{id} 

834 - /tools/{id}/execute 

835 - /tools/{id} 

836 - /resources/{id} 

837 - /prompts/{id} 

838 

839 Args: 

840 request_path: The request path/URL 

841 token_teams: List of team IDs from the token (empty list = public-only token) 

842 db: Optional database session. If provided, caller manages lifecycle. 

843 If None, creates and manages its own session. 

844 

845 Returns: 

846 bool: True if resource access is allowed, False otherwise 

847 """ 

848 request_path = self._normalize_path_for_matching(request_path) 

849 

850 # Normalize token_teams: extract team IDs from dict objects (backward compatibility) 

851 token_team_ids = [] 

852 for team in token_teams: 

853 if isinstance(team, dict): 

854 token_team_ids.append(team["id"]) 

855 else: 

856 token_team_ids.append(team) 

857 

858 # Determine token type 

859 is_public_token = not token_team_ids or len(token_team_ids) == 0 

860 

861 if is_public_token: 

862 logger.debug("Processing request with PUBLIC-ONLY token") 

863 else: 

864 logger.debug(f"Processing request with TEAM-SCOPED token (teams: {SecurityValidator.sanitize_log_message(str(token_teams))})") 

865 

866 # Extract resource type and ID from path (uses precompiled regex patterns) 

867 # IDs are UUID hex strings (32 chars) or UUID with dashes (36 chars) 

868 resource_id = None 

869 resource_type = None 

870 

871 for pattern, rtype in _RESOURCE_PATTERNS: 

872 match = pattern.search(request_path) 

873 if match: 

874 resource_id = match.group(1) 

875 resource_type = rtype 

876 logger.debug(f"Extracted {rtype} ID: {SecurityValidator.sanitize_log_message(resource_id)} from path: {SecurityValidator.sanitize_log_message(request_path)}") 

877 break 

878 

879 # If no resource ID in path, allow (general endpoints like /health, /tokens, /metrics) 

880 if not resource_id or not resource_type: 

881 logger.debug(f"No resource ID found in path {request_path}, allowing access") 

882 return True 

883 

884 # Import database models 

885 # First-Party 

886 from mcpgateway.db import Gateway, get_db, Prompt, Resource, Server, Tool # pylint: disable=import-outside-toplevel 

887 

888 # Track if we own the session (and thus must clean it up) 

889 owns_session = db is None 

890 if owns_session: 

891 db = next(get_db()) 

892 

893 try: 

894 # Check Virtual Servers 

895 if resource_type == "server": 

896 server = db.execute(select(Server).where(Server.id == resource_id)).scalar_one_or_none() 

897 

898 if not server: 

899 logger.warning(f"Server {SecurityValidator.sanitize_log_message(resource_id)} not found in database") 

900 return False 

901 

902 # Get server visibility (default to 'team' if field doesn't exist) 

903 server_visibility = getattr(server, "visibility", "team") 

904 

905 # PUBLIC SERVERS: Accessible by everyone (including public-only tokens) 

906 if server_visibility == "public": 

907 logger.debug(f"Access granted: Server {SecurityValidator.sanitize_log_message(resource_id)} is PUBLIC") 

908 return True 

909 

910 # PUBLIC-ONLY TOKEN: Can ONLY access public servers (strict public-only policy) 

911 # No owner access - if user needs own resources, use a personal team-scoped token 

912 if is_public_token: 

913 logger.warning( 

914 f"Access denied: Public-only token cannot access {SecurityValidator.sanitize_log_message(server_visibility)} server {SecurityValidator.sanitize_log_message(resource_id)}" 

915 ) 

916 return False 

917 

918 # TEAM-SCOPED SERVERS: Check if server belongs to token's teams 

919 if server_visibility == "team": 

920 if server.team_id in token_team_ids: 

921 logger.debug( 

922 f"Access granted: Team server {SecurityValidator.sanitize_log_message(resource_id)} belongs to token's team {SecurityValidator.sanitize_log_message(str(server.team_id))}" 

923 ) 

924 return True 

925 

926 logger.warning( 

927 f"Access denied: Server {SecurityValidator.sanitize_log_message(resource_id)} is team-scoped to '{SecurityValidator.sanitize_log_message(str(server.team_id))}', token is scoped to teams {SecurityValidator.sanitize_log_message(str(token_team_ids))}" 

928 ) 

929 return False 

930 

931 # PRIVATE SERVERS: Owner-only access (per RBAC doc) 

932 if server_visibility == "private": 

933 server_owner = getattr(server, "owner_email", None) 

934 if server_owner and server_owner == _user_email: 

935 logger.debug(f"Access granted: Private server {SecurityValidator.sanitize_log_message(resource_id)} owned by {SecurityValidator.sanitize_log_message(_user_email)}") 

936 return True 

937 

938 logger.warning( 

939 f"Access denied: Server {SecurityValidator.sanitize_log_message(resource_id)} is private, owner is '{SecurityValidator.sanitize_log_message(str(server_owner))}', requester is '{SecurityValidator.sanitize_log_message(_user_email)}'" 

940 ) 

941 return False 

942 

943 # Unknown visibility - deny by default 

944 logger.warning(f"Access denied: Server {SecurityValidator.sanitize_log_message(resource_id)} has unknown visibility: {SecurityValidator.sanitize_log_message(server_visibility)}") 

945 return False 

946 

947 # CHECK TOOLS 

948 if resource_type == "tool": 

949 tool = db.execute(select(Tool).where(Tool.id == resource_id)).scalar_one_or_none() 

950 

951 if not tool: 

952 logger.warning(f"Tool {SecurityValidator.sanitize_log_message(resource_id)} not found in database") 

953 return False 

954 

955 # Get tool visibility (default to 'team' if field doesn't exist) 

956 tool_visibility = getattr(tool, "visibility", "team") 

957 

958 # PUBLIC TOOLS: Accessible by everyone (including public-only tokens) 

959 if tool_visibility == "public": 

960 logger.debug(f"Access granted: Tool {SecurityValidator.sanitize_log_message(resource_id)} is PUBLIC") 

961 return True 

962 

963 # PUBLIC-ONLY TOKEN: Can ONLY access public tools (strict public-only policy) 

964 # No owner access - if user needs own resources, use a personal team-scoped token 

965 if is_public_token: 

966 logger.warning( 

967 f"Access denied: Public-only token cannot access {SecurityValidator.sanitize_log_message(tool_visibility)} tool {SecurityValidator.sanitize_log_message(resource_id)}" 

968 ) 

969 return False 

970 

971 # TEAM TOOLS: Check if tool's team matches token's teams 

972 if tool_visibility == "team": 

973 tool_team_id = getattr(tool, "team_id", None) 

974 if tool_team_id and tool_team_id in token_team_ids: 

975 logger.debug( 

976 f"Access granted: Team tool {SecurityValidator.sanitize_log_message(resource_id)} belongs to token's team {SecurityValidator.sanitize_log_message(str(tool_team_id))}" 

977 ) 

978 return True 

979 

980 logger.warning( 

981 f"Access denied: Tool {SecurityValidator.sanitize_log_message(resource_id)} is team-scoped to '{SecurityValidator.sanitize_log_message(str(tool_team_id))}', token is scoped to teams {SecurityValidator.sanitize_log_message(str(token_team_ids))}" 

982 ) 

983 return False 

984 

985 # PRIVATE TOOLS: Owner-only access (per RBAC doc) 

986 if tool_visibility in ["private", "user"]: 

987 tool_owner = getattr(tool, "owner_email", None) 

988 if tool_owner and tool_owner == _user_email: 

989 logger.debug(f"Access granted: Private tool {SecurityValidator.sanitize_log_message(resource_id)} owned by {SecurityValidator.sanitize_log_message(_user_email)}") 

990 return True 

991 

992 logger.warning( 

993 f"Access denied: Tool {SecurityValidator.sanitize_log_message(resource_id)} is {SecurityValidator.sanitize_log_message(tool_visibility)}, owner is '{SecurityValidator.sanitize_log_message(str(tool_owner))}', requester is '{SecurityValidator.sanitize_log_message(_user_email)}'" 

994 ) 

995 return False 

996 

997 # Unknown visibility - deny by default 

998 logger.warning(f"Access denied: Tool {SecurityValidator.sanitize_log_message(resource_id)} has unknown visibility: {SecurityValidator.sanitize_log_message(tool_visibility)}") 

999 return False 

1000 

1001 # CHECK RESOURCES 

1002 if resource_type == "resource": 

1003 resource = db.execute(select(Resource).where(Resource.id == resource_id)).scalar_one_or_none() 

1004 

1005 if not resource: 

1006 logger.warning(f"Resource {SecurityValidator.sanitize_log_message(resource_id)} not found in database") 

1007 return False 

1008 

1009 # Get resource visibility (default to 'team' if field doesn't exist) 

1010 resource_visibility = getattr(resource, "visibility", "team") 

1011 

1012 # PUBLIC RESOURCES: Accessible by everyone (including public-only tokens) 

1013 if resource_visibility == "public": 

1014 logger.debug(f"Access granted: Resource {SecurityValidator.sanitize_log_message(resource_id)} is PUBLIC") 

1015 return True 

1016 

1017 # PUBLIC-ONLY TOKEN: Can ONLY access public resources (strict public-only policy) 

1018 # No owner access - if user needs own resources, use a personal team-scoped token 

1019 if is_public_token: 

1020 logger.warning( 

1021 f"Access denied: Public-only token cannot access {SecurityValidator.sanitize_log_message(resource_visibility)} resource {SecurityValidator.sanitize_log_message(resource_id)}" 

1022 ) 

1023 return False 

1024 

1025 # TEAM RESOURCES: Check if resource's team matches token's teams 

1026 if resource_visibility == "team": 

1027 resource_team_id = getattr(resource, "team_id", None) 

1028 if resource_team_id and resource_team_id in token_team_ids: 

1029 logger.debug( 

1030 f"Access granted: Team resource {SecurityValidator.sanitize_log_message(resource_id)} belongs to token's team {SecurityValidator.sanitize_log_message(str(resource_team_id))}" 

1031 ) 

1032 return True 

1033 

1034 logger.warning( 

1035 f"Access denied: Resource {SecurityValidator.sanitize_log_message(resource_id)} is team-scoped to '{SecurityValidator.sanitize_log_message(str(resource_team_id))}', token is scoped to teams {SecurityValidator.sanitize_log_message(str(token_team_ids))}" 

1036 ) 

1037 return False 

1038 

1039 # PRIVATE RESOURCES: Owner-only access (per RBAC doc) 

1040 if resource_visibility in ["private", "user"]: 

1041 resource_owner = getattr(resource, "owner_email", None) 

1042 if resource_owner and resource_owner == _user_email: 

1043 logger.debug(f"Access granted: Private resource {SecurityValidator.sanitize_log_message(resource_id)} owned by {SecurityValidator.sanitize_log_message(_user_email)}") 

1044 return True 

1045 

1046 logger.warning( 

1047 f"Access denied: Resource {SecurityValidator.sanitize_log_message(resource_id)} is {SecurityValidator.sanitize_log_message(resource_visibility)}, owner is '{SecurityValidator.sanitize_log_message(str(resource_owner))}', requester is '{SecurityValidator.sanitize_log_message(_user_email)}'" 

1048 ) 

1049 return False 

1050 

1051 # Unknown visibility - deny by default 

1052 logger.warning(f"Access denied: Resource {SecurityValidator.sanitize_log_message(resource_id)} has unknown visibility: {SecurityValidator.sanitize_log_message(resource_visibility)}") 

1053 return False 

1054 

1055 # CHECK PROMPTS 

1056 if resource_type == "prompt": 

1057 prompt = db.execute(select(Prompt).where(Prompt.id == resource_id)).scalar_one_or_none() 

1058 

1059 if not prompt: 

1060 logger.warning(f"Prompt {SecurityValidator.sanitize_log_message(resource_id)} not found in database") 

1061 return False 

1062 

1063 # Get prompt visibility (default to 'team' if field doesn't exist) 

1064 prompt_visibility = getattr(prompt, "visibility", "team") 

1065 

1066 # PUBLIC PROMPTS: Accessible by everyone (including public-only tokens) 

1067 if prompt_visibility == "public": 

1068 logger.debug(f"Access granted: Prompt {SecurityValidator.sanitize_log_message(resource_id)} is PUBLIC") 

1069 return True 

1070 

1071 # PUBLIC-ONLY TOKEN: Can ONLY access public prompts (strict public-only policy) 

1072 # No owner access - if user needs own resources, use a personal team-scoped token 

1073 if is_public_token: 

1074 logger.warning( 

1075 f"Access denied: Public-only token cannot access {SecurityValidator.sanitize_log_message(prompt_visibility)} prompt {SecurityValidator.sanitize_log_message(resource_id)}" 

1076 ) 

1077 return False 

1078 

1079 # TEAM PROMPTS: Check if prompt's team matches token's teams 

1080 if prompt_visibility == "team": 

1081 prompt_team_id = getattr(prompt, "team_id", None) 

1082 if prompt_team_id and prompt_team_id in token_team_ids: 

1083 logger.debug( 

1084 f"Access granted: Team prompt {SecurityValidator.sanitize_log_message(resource_id)} belongs to token's team {SecurityValidator.sanitize_log_message(str(prompt_team_id))}" 

1085 ) 

1086 return True 

1087 

1088 logger.warning( 

1089 f"Access denied: Prompt {SecurityValidator.sanitize_log_message(resource_id)} is team-scoped to '{SecurityValidator.sanitize_log_message(str(prompt_team_id))}', token is scoped to teams {SecurityValidator.sanitize_log_message(str(token_team_ids))}" 

1090 ) 

1091 return False 

1092 

1093 # PRIVATE PROMPTS: Owner-only access (per RBAC doc) 

1094 if prompt_visibility in ["private", "user"]: 

1095 prompt_owner = getattr(prompt, "owner_email", None) 

1096 if prompt_owner and prompt_owner == _user_email: 

1097 logger.debug(f"Access granted: Private prompt {SecurityValidator.sanitize_log_message(resource_id)} owned by {SecurityValidator.sanitize_log_message(_user_email)}") 

1098 return True 

1099 

1100 logger.warning( 

1101 f"Access denied: Prompt {SecurityValidator.sanitize_log_message(resource_id)} is {SecurityValidator.sanitize_log_message(prompt_visibility)}, owner is '{SecurityValidator.sanitize_log_message(str(prompt_owner))}', requester is '{SecurityValidator.sanitize_log_message(_user_email)}'" 

1102 ) 

1103 return False 

1104 

1105 # Unknown visibility - deny by default 

1106 logger.warning(f"Access denied: Prompt {SecurityValidator.sanitize_log_message(resource_id)} has unknown visibility: {SecurityValidator.sanitize_log_message(prompt_visibility)}") 

1107 return False 

1108 

1109 # CHECK GATEWAYS 

1110 if resource_type == "gateway": 

1111 gateway = db.execute(select(Gateway).where(Gateway.id == resource_id)).scalar_one_or_none() 

1112 

1113 if not gateway: 

1114 logger.warning(f"Gateway {SecurityValidator.sanitize_log_message(resource_id)} not found in database") 

1115 return False 

1116 

1117 # Get gateway visibility (default to 'team' if field doesn't exist) 

1118 gateway_visibility = getattr(gateway, "visibility", "team") 

1119 

1120 # PUBLIC GATEWAYS: Accessible by everyone (including public-only tokens) 

1121 if gateway_visibility == "public": 

1122 logger.debug(f"Access granted: Gateway {SecurityValidator.sanitize_log_message(resource_id)} is PUBLIC") 

1123 return True 

1124 

1125 # PUBLIC-ONLY TOKEN: Can ONLY access public gateways (strict public-only policy) 

1126 # No owner access - if user needs own resources, use a personal team-scoped token 

1127 if is_public_token: 

1128 logger.warning( 

1129 f"Access denied: Public-only token cannot access {SecurityValidator.sanitize_log_message(gateway_visibility)} gateway {SecurityValidator.sanitize_log_message(resource_id)}" 

1130 ) 

1131 return False 

1132 

1133 # TEAM GATEWAYS: Check if gateway's team matches token's teams 

1134 if gateway_visibility == "team": 

1135 gateway_team_id = getattr(gateway, "team_id", None) 

1136 if gateway_team_id and gateway_team_id in token_team_ids: 

1137 logger.debug( 

1138 f"Access granted: Team gateway {SecurityValidator.sanitize_log_message(resource_id)} belongs to token's team {SecurityValidator.sanitize_log_message(str(gateway_team_id))}" 

1139 ) 

1140 return True 

1141 

1142 logger.warning( 

1143 f"Access denied: Gateway {SecurityValidator.sanitize_log_message(resource_id)} is team-scoped to '{SecurityValidator.sanitize_log_message(str(gateway_team_id))}', token is scoped to teams {SecurityValidator.sanitize_log_message(str(token_team_ids))}" 

1144 ) 

1145 return False 

1146 

1147 # PRIVATE GATEWAYS: Owner-only access (per RBAC doc) 

1148 if gateway_visibility in ["private", "user"]: 

1149 gateway_owner = getattr(gateway, "owner_email", None) 

1150 if gateway_owner and gateway_owner == _user_email: 

1151 logger.debug(f"Access granted: Private gateway {SecurityValidator.sanitize_log_message(resource_id)} owned by {SecurityValidator.sanitize_log_message(_user_email)}") 

1152 return True 

1153 

1154 logger.warning( 

1155 f"Access denied: Gateway {SecurityValidator.sanitize_log_message(resource_id)} is {SecurityValidator.sanitize_log_message(gateway_visibility)}, owner is '{SecurityValidator.sanitize_log_message(str(gateway_owner))}', requester is '{SecurityValidator.sanitize_log_message(_user_email)}'" 

1156 ) 

1157 return False 

1158 

1159 # Unknown visibility - deny by default 

1160 logger.warning(f"Access denied: Gateway {SecurityValidator.sanitize_log_message(resource_id)} has unknown visibility: {SecurityValidator.sanitize_log_message(gateway_visibility)}") 

1161 return False 

1162 

1163 # UNKNOWN RESOURCE TYPE 

1164 logger.warning(f"Unknown resource type '{SecurityValidator.sanitize_log_message(str(resource_type))}' for path: {SecurityValidator.sanitize_log_message(request_path)}") 

1165 return False 

1166 

1167 except Exception as e: 

1168 logger.error(f"Error checking resource team ownership for {request_path}: {e}", exc_info=True) 

1169 # Fail securely - deny access on error 

1170 return False 

1171 finally: 

1172 # Only commit/close if we created the session 

1173 if owns_session: 

1174 try: 

1175 db.commit() # Commit read-only transaction to avoid implicit rollback 

1176 finally: 

1177 db.close() 

1178 

1179 async def __call__(self, request: Request, call_next): 

1180 """Middleware function to check token scoping including team-level validation. 

1181 

1182 Args: 

1183 request: FastAPI request object 

1184 call_next: Next middleware/handler in chain 

1185 

1186 Returns: 

1187 Response from next handler or HTTPException 

1188 

1189 Raises: 

1190 HTTPException: If token scoping restrictions are violated 

1191 """ 

1192 try: 

1193 # Skip if already scoped (prevents double-scoping for /mcp requests) 

1194 # MCPPathRewriteMiddleware runs scoping via dispatch, then routes through 

1195 # middleware stack which hits BaseHTTPMiddleware's scoping again. 

1196 # Use request.state flag which persists across middleware invocations. 

1197 if getattr(request.state, "_token_scoping_done", False): 

1198 return await call_next(request) 

1199 

1200 # Mark as scoped before doing any work 

1201 request.state._token_scoping_done = True 

1202 

1203 normalized_path = self._get_normalized_request_path(request) 

1204 

1205 # Skip scoping for certain paths (truly public endpoints only) 

1206 skip_paths = [ 

1207 "/health", 

1208 "/openapi.json", 

1209 "/docs", 

1210 "/redoc", 

1211 "/auth/email/login", 

1212 "/auth/email/register", 

1213 "/.well-known/", 

1214 ] 

1215 

1216 # Check exact root path separately 

1217 if normalized_path == "/": 

1218 return await call_next(request) 

1219 

1220 # Trusted internal Rust -> Python MCP dispatch already carries a 

1221 # normalized auth context and is re-authorized by the internal MCP 

1222 # handlers. Re-applying token-scoping path checks here would reject 

1223 # the private /_internal/mcp/* hop for scoped tokens. 

1224 if self._is_trusted_internal_mcp_runtime_request(request, normalized_path): 

1225 return await call_next(request) 

1226 

1227 if any(normalized_path.startswith(path) for path in skip_paths): 

1228 return await call_next(request) 

1229 

1230 # Skip server-specific well-known endpoints (RFC 9728) 

1231 if re.match(r"^/servers/[^/]+/\.well-known/", normalized_path): 

1232 return await call_next(request) 

1233 

1234 # Extract full token payload (not just scopes) 

1235 payload = await self._extract_token_scopes(request) 

1236 

1237 # If no payload, continue (regular auth will handle this) 

1238 if not payload: 

1239 return await call_next(request) 

1240 

1241 # TEAM VALIDATION: Use single DB session for both team checks 

1242 # This reduces connection pool overhead from 2 sessions to 1 for resource endpoints 

1243 user_email = payload.get("sub") or payload.get("email") # Extract user email for ownership check 

1244 

1245 # Resolve teams based on token_use claim 

1246 token_use = payload.get("token_use") 

1247 if token_use == "session": # nosec B105 - Not a password; token_use is a JWT claim type 

1248 # Session token: resolve teams from DB/cache directly 

1249 # Cannot rely on request.state.token_teams — AuthContextMiddleware 

1250 # is gated by security_logging_enabled (defaults to False) 

1251 is_admin = payload.get("is_admin", False) or payload.get("user", {}).get("is_admin", False) 

1252 user_info = {"is_admin": is_admin} 

1253 token_teams = await resolve_session_teams(payload, user_email, user_info) 

1254 else: 

1255 # API token or legacy: use embedded teams with normalize_token_teams 

1256 token_teams = normalize_token_teams(payload) 

1257 

1258 # Check if admin bypass is active (token_teams is None means admin with explicit null teams) 

1259 is_admin_bypass = token_teams is None 

1260 

1261 # Admin with explicit null teams bypasses team validation entirely 

1262 if is_admin_bypass: 

1263 logger.debug(f"Admin bypass: skipping team validation for {SecurityValidator.sanitize_log_message(user_email)}") 

1264 # Skip to other checks (server_id, IP, etc.) 

1265 elif token_teams: 

1266 # First-Party 

1267 from mcpgateway.db import get_db # pylint: disable=import-outside-toplevel 

1268 

1269 db = next(get_db()) 

1270 try: 

1271 # Check team membership — only for API/legacy tokens whose teams 

1272 # come from JWT claims and may be stale. Session tokens skip this 

1273 # because resolve_session_teams() already resolved membership from 

1274 # the DB; re-checking the raw JWT claim here would conflict with 

1275 # the intersection semantics (stale JWT teams would cause a 403 

1276 # even though the user has valid DB teams). 

1277 # NOTE: session-token membership staleness is bounded by the 

1278 # auth_cache TTL (see _resolve_teams_from_db). 

1279 if token_use != "session" and not self._check_team_membership(payload, db=db): # nosec B105 - Not a password; token_use is a JWT claim type 

1280 logger.warning("Token rejected: User no longer member of associated team(s)") 

1281 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token is invalid: User is no longer a member of the associated team") 

1282 

1283 # Check resource team ownership with shared session 

1284 if not self._check_resource_team_ownership(normalized_path, token_teams, db=db, _user_email=user_email): 

1285 logger.warning(f"Access denied: Resource does not belong to token's teams {SecurityValidator.sanitize_log_message(str(token_teams))}") 

1286 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG) 

1287 finally: 

1288 # Ensure session cleanup even if checks raise exceptions 

1289 try: 

1290 db.commit() 

1291 finally: 

1292 db.close() 

1293 else: 

1294 # Public-only token (or session token with empty intersection): 

1295 # skip _check_team_membership for session tokens — the empty 

1296 # intersection already means no team-scoped access. 

1297 # Membership staleness bounded by auth_cache TTL. 

1298 if token_use != "session" and not self._check_team_membership(payload): # nosec B105 - Not a password; token_use is a JWT claim type 

1299 logger.warning("Token rejected: User no longer member of associated team(s)") 

1300 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token is invalid: User is no longer a member of the associated team") 

1301 

1302 if not self._check_resource_team_ownership(normalized_path, token_teams, _user_email=user_email): 

1303 logger.warning(f"Access denied: Resource does not belong to token's teams {SecurityValidator.sanitize_log_message(str(token_teams))}") 

1304 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG) 

1305 

1306 # Extract scopes from payload 

1307 scopes = payload.get("scopes", {}) 

1308 

1309 # Check server ID restriction 

1310 server_id = scopes.get("server_id") 

1311 if not self._check_server_restriction(normalized_path, server_id): 

1312 logger.warning(f"Token not authorized for this server. Required: {SecurityValidator.sanitize_log_message(str(server_id))}") 

1313 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG) 

1314 

1315 # Check IP restrictions 

1316 ip_restrictions = scopes.get("ip_restrictions", []) 

1317 if ip_restrictions: 

1318 client_ip = self._get_client_ip(request) 

1319 if not self._check_ip_restrictions(client_ip, ip_restrictions): 

1320 logger.warning(f"Request from IP {client_ip} not allowed by token restrictions") 

1321 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG) 

1322 

1323 # Check time restrictions 

1324 time_restrictions = scopes.get("time_restrictions", {}) 

1325 if not self._check_time_restrictions(time_restrictions): 

1326 logger.warning("Request not allowed at this time by token restrictions") 

1327 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Request not allowed at this time by token restrictions") 

1328 

1329 # Check permission restrictions 

1330 permissions = scopes.get("permissions", []) 

1331 if not self._check_permission_restrictions(normalized_path, request.method, permissions): 

1332 logger.warning("Insufficient permissions for this operation") 

1333 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=_ACCESS_DENIED_MSG) 

1334 

1335 # Check optional token usage limits. 

1336 usage_limits = scopes.get("usage_limits", {}) 

1337 usage_allowed, usage_reason = self._check_usage_limits(payload.get("jti"), usage_limits) 

1338 if not usage_allowed: 

1339 logger.warning("Token usage limit exceeded for jti %s: %s", payload.get("jti"), usage_reason) 

1340 raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=usage_reason or "Token usage limit exceeded") 

1341 

1342 # All scoping checks passed, continue 

1343 return await call_next(request) 

1344 

1345 except HTTPException as exc: 

1346 # Return clean JSON response instead of traceback 

1347 return ORJSONResponse( 

1348 status_code=exc.status_code, 

1349 content={"detail": exc.detail}, 

1350 ) 

1351 

1352 def _is_trusted_internal_mcp_runtime_request(self, request: Request, normalized_path: str) -> bool: 

1353 """Return whether the request is a trusted loopback Rust MCP sidecar hop. 

1354 

1355 Args: 

1356 request: Incoming HTTP request. 

1357 normalized_path: Canonicalized request path used for route matching. 

1358 

1359 Returns: 

1360 ``True`` when the request originated from the local Rust MCP runtime and 

1361 includes the expected trusted headers. 

1362 """ 

1363 if normalized_path != _INTERNAL_MCP_PATH_PREFIX and not normalized_path.startswith(f"{_INTERNAL_MCP_PATH_PREFIX}/"): 

1364 return False 

1365 

1366 if request.headers.get(_INTERNAL_MCP_RUNTIME_HEADER) != "rust": 

1367 return False 

1368 

1369 provided_auth = request.headers.get(_INTERNAL_MCP_RUNTIME_AUTH_HEADER) 

1370 if not provided_auth: 

1371 return False 

1372 

1373 expected_auth = self._expected_internal_mcp_runtime_auth_header() 

1374 if not hmac.compare_digest(provided_auth, expected_auth): 

1375 return False 

1376 

1377 if not request.headers.get(_INTERNAL_MCP_AUTH_CONTEXT_HEADER): 

1378 return False 

1379 

1380 client_host = getattr(getattr(request, "client", None), "host", None) 

1381 return client_host in ("127.0.0.1", "::1") 

1382 

1383 @staticmethod 

1384 def _auth_encryption_secret_value() -> str: 

1385 """Return the configured auth-encryption secret as a plain string. 

1386 

1387 Returns: 

1388 The auth-encryption secret, normalized to a regular string. 

1389 """ 

1390 secret = settings.auth_encryption_secret 

1391 if hasattr(secret, "get_secret_value"): 

1392 return secret.get_secret_value() 

1393 return str(secret) 

1394 

1395 @staticmethod 

1396 @lru_cache(maxsize=8) 

1397 def _expected_internal_mcp_runtime_auth_header_for_secret(secret: str) -> str: 

1398 """Return the expected shared internal-auth header for a specific secret. 

1399 

1400 Args: 

1401 secret: Auth-encryption secret to derive the trust header from. 

1402 

1403 Returns: 

1404 Hex-encoded SHA-256 digest derived from the provided auth secret. 

1405 """ 

1406 material = f"{secret}:{_INTERNAL_MCP_RUNTIME_AUTH_CONTEXT}".encode("utf-8") 

1407 return hashlib.sha256(material).hexdigest() 

1408 

1409 @staticmethod 

1410 def _expected_internal_mcp_runtime_auth_header() -> str: 

1411 """Return the expected shared internal-auth header for Rust MCP hops. 

1412 

1413 Returns: 

1414 Shared secret-derived digest expected on trusted internal Rust MCP calls. 

1415 """ 

1416 return TokenScopingMiddleware._expected_internal_mcp_runtime_auth_header_for_secret(TokenScopingMiddleware._auth_encryption_secret_value()) 

1417 

1418 

1419# Create middleware instance 

1420token_scoping_middleware = TokenScopingMiddleware()