Coverage for mcpgateway / routers / well_known.py: 99%

110 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/routers/well_known.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Well-Known URI Handler Router. 

8This module implements a flexible /.well-known/* endpoint handler that supports 

9standard well-known URIs like security.txt and robots.txt with user-configurable content. 

10Defaults assume private API deployment with crawling disabled. 

11""" 

12 

13# Standard 

14from datetime import datetime, timedelta, timezone 

15import re 

16from typing import Optional 

17from urllib.parse import urlparse, urlunparse 

18 

19# Third-Party 

20from fastapi import APIRouter, Depends, HTTPException, Request, Response 

21from fastapi.responses import JSONResponse, PlainTextResponse 

22from sqlalchemy.orm import Session 

23 

24# First-Party 

25from mcpgateway.config import settings 

26from mcpgateway.db import get_db 

27from mcpgateway.services.logging_service import LoggingService 

28from mcpgateway.services.server_service import ServerError, ServerNotFoundError, ServerService 

29from mcpgateway.utils.log_sanitizer import sanitize_for_log 

30from mcpgateway.utils.verify_credentials import require_auth 

31 

32# Get logger instance 

33logging_service = LoggingService() 

34logger = logging_service.get_logger(__name__) 

35 

36router = APIRouter(tags=["well-known"]) 

37 

38# UUID validation pattern for RFC 9728 endpoint 

39UUID_PATTERN = re.compile(r"^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$", re.IGNORECASE) 

40 

41# Well-known URI registry with validation 

42WELL_KNOWN_REGISTRY = { 

43 "robots.txt": {"content_type": "text/plain", "description": "Robot exclusion standard", "rfc": "RFC 9309"}, 

44 "security.txt": {"content_type": "text/plain", "description": "Security contact information", "rfc": "RFC 9116"}, 

45 "ai.txt": {"content_type": "text/plain", "description": "AI usage policies", "rfc": "Draft"}, 

46 "dnt-policy.txt": {"content_type": "text/plain", "description": "Do Not Track policy", "rfc": "W3C"}, 

47 "change-password": {"content_type": "text/plain", "description": "Change password URL", "rfc": "RFC 8615"}, 

48} 

49 

50 

51def get_base_url_with_protocol(request: Request) -> str: 

52 """ 

53 Build base URL with correct protocol based on proxy headers. 

54 

55 Uses X-Forwarded-Proto header if present (proxy scenario), 

56 otherwise falls back to request.url.scheme. 

57 

58 Note: request.base_url already includes root_path in FastAPI. 

59 

60 Args: 

61 request: The FastAPI request object. 

62 

63 Returns: 

64 Base URL string with correct protocol, without trailing slash. 

65 

66 Examples: 

67 >>> from mcpgateway.routers.well_known import get_base_url_with_protocol 

68 >>> callable(get_base_url_with_protocol) 

69 True 

70 """ 

71 forwarded_proto = request.headers.get("x-forwarded-proto") 

72 if forwarded_proto: 

73 proto = forwarded_proto.split(",")[0].strip() 

74 else: 

75 proto = request.url.scheme 

76 

77 parsed = urlparse(str(request.base_url)) 

78 new_parsed = parsed._replace(scheme=proto) 

79 return str(urlunparse(new_parsed)).rstrip("/") 

80 

81 

82def validate_security_txt(content: str) -> Optional[str]: 

83 """Validate security.txt format and add headers if missing. 

84 

85 Args: 

86 content: The security.txt content to validate. 

87 

88 Returns: 

89 Validated security.txt content with added headers, or None if content is empty. 

90 """ 

91 if not content: 

92 return None 

93 

94 lines = content.strip().split("\n") 

95 

96 # Check if Expires field exists 

97 has_expires = any(line.strip().startswith("Expires:") for line in lines) 

98 

99 # Add Expires field if missing (6 months from now) 

100 if not has_expires: 

101 expires = datetime.now(timezone.utc).replace(microsecond=0) + timedelta(days=180) 

102 lines.append(f"Expires: {expires.isoformat()}Z") 

103 

104 # Ensure it starts with required headers 

105 validated = [] 

106 

107 # Add header comment if not present 

108 if not lines[0].startswith("#"): 

109 validated.append("# Security contact information for ContextForge") 

110 validated.append(f"# Generated: {datetime.now(timezone.utc).replace(microsecond=0).isoformat()}Z") 

111 validated.append("") 

112 

113 validated.extend(lines) 

114 

115 return "\n".join(validated) 

116 

117 

118@router.get("/.well-known/oauth-protected-resource/{path:path}") 

119async def get_oauth_protected_resource_rfc9728( 

120 path: str, 

121 request: Request, 

122 db: Session = Depends(get_db), 

123) -> JSONResponse: 

124 """ 

125 RFC 9728 OAuth 2.0 Protected Resource Metadata endpoint (path-based). 

126 

127 Per RFC 9728 Section 3.1, the well-known URI is constructed by: 

128 1. Taking the resource URL: http://localhost:4444/servers/{UUID}/mcp 

129 2. Removing trailing slash and inserting /.well-known/oauth-protected-resource/ 

130 3. Result: http://localhost:4444/.well-known/oauth-protected-resource/servers/{UUID}/mcp 

131 

132 This endpoint does not require authentication per RFC 9728 requirements. 

133 

134 Args: 

135 path: The resource path after oauth-protected-resource/ (e.g., "servers/{UUID}/mcp") 

136 request: FastAPI request object for building resource URL 

137 db: Database session dependency 

138 

139 Returns: 

140 JSONResponse with RFC 9728 Protected Resource Metadata: 

141 { 

142 "resource": "http://localhost:4444/servers/{UUID}/mcp", 

143 "authorization_servers": ["https://auth.example.com"], 

144 "bearer_methods_supported": ["header"], 

145 "scopes_supported": ["read", "write"] 

146 } 

147 

148 Raises: 

149 HTTPException: 404 if path format invalid, server not found, disabled, 

150 non-public, OAuth not enabled, or not configured. 

151 

152 Examples: 

153 >>> # Request OAuth metadata for a server 

154 >>> # GET /.well-known/oauth-protected-resource/servers/abc123/mcp 

155 >>> # Returns RFC 9728 compliant metadata 

156 """ 

157 if not settings.well_known_enabled: 

158 raise HTTPException(status_code=404, detail="Not found") 

159 

160 # Parse path to extract server_id with validation 

161 # Expected formats: 

162 # - "servers/{UUID}/mcp" (standard MCP endpoint) 

163 # - "servers/{UUID}" (fallback without /mcp suffix) 

164 path_parts = path.strip("/").split("/") 

165 

166 # Validate path structure 

167 if len(path_parts) < 2 or path_parts[0] != "servers": 

168 # Sanitize untrusted path before logging to prevent log injection 

169 logger.debug(f"Invalid RFC 9728 path format: {sanitize_for_log(path)}") 

170 raise HTTPException(status_code=404, detail="Invalid resource path format. Expected: /.well-known/oauth-protected-resource/servers/{server_id}/mcp") 

171 

172 server_id = path_parts[1] 

173 

174 # Validate server_id is a valid UUID (prevents path traversal and injection) 

175 if not UUID_PATTERN.match(server_id): 

176 # Sanitize untrusted server_id before logging to prevent log injection 

177 logger.warning(f"Invalid server_id format (not a UUID): {sanitize_for_log(server_id)}") 

178 raise HTTPException(status_code=404, detail="Invalid server_id format. Must be a valid UUID.") 

179 

180 # Reject paths with extra segments after /mcp (e.g., servers/uuid/mcp/extra) 

181 if len(path_parts) > 3: 

182 # Sanitize untrusted path before logging to prevent log injection 

183 logger.warning(f"RFC 9728 path has unexpected segments: {sanitize_for_log(path)}") 

184 raise HTTPException(status_code=404, detail="Invalid resource path format. Expected: /.well-known/oauth-protected-resource/servers/{server_id}/mcp") 

185 

186 # Build resource URL with /mcp suffix per MCP specification 

187 base_url = get_base_url_with_protocol(request) 

188 resource_url = f"{base_url}/servers/{server_id}/mcp" 

189 

190 server_service = ServerService() 

191 try: 

192 response_data = server_service.get_oauth_protected_resource_metadata(db=db, server_id=server_id, resource_base_url=resource_url) 

193 except ServerNotFoundError: 

194 raise HTTPException(status_code=404, detail="Server not found") 

195 except ServerError as e: 

196 raise HTTPException(status_code=404, detail=str(e)) 

197 

198 # Add cache headers per RFC 9728 recommendations 

199 headers = {"Cache-Control": f"public, max-age={settings.well_known_cache_max_age}"} 

200 

201 logger.debug(f"Served RFC 9728 OAuth metadata for server {server_id}") 

202 return JSONResponse(content=response_data, headers=headers) 

203 

204 

205@router.get("/.well-known/oauth-protected-resource") 

206async def get_oauth_protected_resource( 

207 request: Request, 

208 server_id: Optional[str] = None, 

209): 

210 """ 

211 DEPRECATED: OAuth 2.0 Protected Resource Metadata endpoint (query parameter based). 

212 

213 This endpoint is deprecated and non-compliant with RFC 9728. It returns 404. 

214 

215 RFC 9728 requires path-based discovery, not query parameters. 

216 Use the RFC 9728 compliant endpoint instead: 

217 /.well-known/oauth-protected-resource/servers/{server_id}/mcp 

218 

219 Args: 

220 request: FastAPI request object (unused). 

221 server_id: Server ID query parameter (ignored). 

222 

223 Raises: 

224 HTTPException: Always raises 404 with deprecation notice. 

225 """ 

226 if not settings.well_known_enabled: 

227 raise HTTPException(status_code=404, detail="Not found") 

228 

229 logger.warning("Deprecated query-param OAuth metadata endpoint called. " "Use RFC 9728 compliant path-based endpoint: " "/.well-known/oauth-protected-resource/servers/{server_id}/mcp") 

230 raise HTTPException( 

231 status_code=404, detail=("This endpoint is deprecated and non-compliant with RFC 9728. " "Use the path-based endpoint: " "/.well-known/oauth-protected-resource/servers/{server_id}/mcp") 

232 ) 

233 

234 

235def get_well_known_file_content(filename: str) -> PlainTextResponse: 

236 """ 

237 Get the response for a well-known URI file. 

238 

239 This is a shared helper function used by both the root-level and 

240 virtual server well-known endpoints. 

241 

242 Supports: 

243 - robots.txt: Robot exclusion (default: disallow all) 

244 - security.txt: Security contact information (if configured) 

245 - ai.txt: AI usage policies (if configured) 

246 - dnt-policy.txt: Do Not Track policy (if configured) 

247 - Custom files: Additional well-known files via configuration 

248 

249 Args: 

250 filename: The well-known filename requested (without path prefix). 

251 

252 Returns: 

253 PlainTextResponse with the file content. 

254 

255 Raises: 

256 HTTPException: 404 if file not found, not configured, or well-known disabled. 

257 

258 Examples: 

259 >>> from mcpgateway.routers.well_known import get_well_known_file_content 

260 >>> callable(get_well_known_file_content) 

261 True 

262 """ 

263 if not settings.well_known_enabled: 

264 raise HTTPException(status_code=404, detail="Not found") 

265 

266 # Normalize filename (remove any leading slashes) 

267 filename = filename.strip("/") 

268 

269 # Prepare common headers 

270 common_headers = {"Cache-Control": f"public, max-age={settings.well_known_cache_max_age}"} 

271 

272 # Handle robots.txt 

273 if filename == "robots.txt": 

274 headers = {**common_headers, "X-Robots-Tag": "noindex, nofollow"} 

275 return PlainTextResponse(content=settings.well_known_robots_txt, media_type="text/plain; charset=utf-8", headers=headers) 

276 

277 # Handle security.txt 

278 elif filename == "security.txt": 

279 if not settings.well_known_security_txt_enabled: 

280 raise HTTPException(status_code=404, detail="security.txt not configured") 

281 

282 content = validate_security_txt(settings.well_known_security_txt) 

283 if not content: 

284 raise HTTPException(status_code=404, detail="security.txt not configured") 

285 

286 return PlainTextResponse(content=content, media_type="text/plain; charset=utf-8", headers=common_headers) 

287 

288 # Handle custom files (includes ai.txt, dnt-policy.txt if configured) 

289 elif filename in settings.custom_well_known_files: 

290 content = settings.custom_well_known_files[filename] 

291 

292 # Determine content type 

293 content_type = "text/plain; charset=utf-8" 

294 if filename in WELL_KNOWN_REGISTRY: 

295 content_type = f"{WELL_KNOWN_REGISTRY[filename]['content_type']}; charset=utf-8" 

296 

297 return PlainTextResponse(content=content, media_type=content_type, headers=common_headers) 

298 

299 # File not found 

300 else: 

301 # Provide helpful error for known well-known URIs 

302 if filename in WELL_KNOWN_REGISTRY: 

303 raise HTTPException(status_code=404, detail=f"{filename} is not configured. This is a {WELL_KNOWN_REGISTRY[filename]['description']} file.") 

304 else: 

305 raise HTTPException(status_code=404, detail="Not found") 

306 

307 

308@router.get("/.well-known/{filename:path}", include_in_schema=False) 

309async def get_well_known_file(filename: str, response: Response, request: Request): 

310 """ 

311 Serve well-known URI files at the root level. 

312 

313 Supports: 

314 - robots.txt: Robot exclusion (default: disallow all) 

315 - security.txt: Security contact information (if configured) 

316 - ai.txt: AI usage policies (if configured) 

317 - dnt-policy.txt: Do Not Track policy (if configured) 

318 - Custom files: Additional well-known files via configuration 

319 

320 Args: 

321 filename: The well-known filename requested 

322 response: FastAPI response object for headers 

323 request: FastAPI request object for logging 

324 

325 Returns: 

326 Plain text content of the requested file 

327 

328 Raises: 

329 HTTPException: 404 if file not found or well-known disabled 

330 

331 Examples: 

332 >>> import asyncio 

333 >>> asyncio.iscoroutinefunction(get_well_known_file) 

334 True 

335 """ 

336 return get_well_known_file_content(filename) 

337 

338 

339@router.get("/admin/well-known", response_model=dict) 

340async def get_well_known_status(user: str = Depends(require_auth)): 

341 """ 

342 Get status of well-known URI configuration. 

343 

344 Args: 

345 user: Authenticated user from dependency injection. 

346 

347 Returns: 

348 Dict containing well-known configuration status and available files. 

349 """ 

350 configured_files = [] 

351 

352 # Always available 

353 configured_files.append({"path": "/.well-known/robots.txt", "enabled": True, "description": "Robot exclusion standard", "cache_max_age": settings.well_known_cache_max_age}) 

354 

355 # Conditionally available 

356 if settings.well_known_security_txt_enabled: 

357 configured_files.append({"path": "/.well-known/security.txt", "enabled": True, "description": "Security contact information", "cache_max_age": settings.well_known_cache_max_age}) 

358 

359 # Custom files 

360 for filename in settings.custom_well_known_files: 

361 configured_files.append({"path": f"/.well-known/{filename}", "enabled": True, "description": "Custom well-known file", "cache_max_age": settings.well_known_cache_max_age}) 

362 

363 return {"enabled": settings.well_known_enabled, "configured_files": configured_files, "supported_files": list(WELL_KNOWN_REGISTRY.keys()), "cache_max_age": settings.well_known_cache_max_age}