Coverage for mcpgateway / routers / well_known.py: 98%

108 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/routers/well_known.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Well-Known URI Handler Router. 

8This module implements a flexible /.well-known/* endpoint handler that supports 

9standard well-known URIs like security.txt and robots.txt with user-configurable content. 

10Defaults assume private API deployment with crawling disabled. 

11""" 

12 

13# Standard 

14from datetime import datetime, timedelta, timezone 

15from typing import Optional 

16from urllib.parse import urlparse, urlunparse 

17 

18# Third-Party 

19from fastapi import APIRouter, Depends, HTTPException, Request, Response 

20from fastapi.responses import JSONResponse, PlainTextResponse 

21from sqlalchemy.orm import Session 

22 

23# First-Party 

24from mcpgateway.config import settings 

25from mcpgateway.db import get_db 

26from mcpgateway.db import Server as DbServer 

27from mcpgateway.services.logging_service import LoggingService 

28from mcpgateway.utils.verify_credentials import require_auth 

29 

30# Get logger instance 

31logging_service = LoggingService() 

32logger = logging_service.get_logger(__name__) 

33 

34router = APIRouter(tags=["well-known"]) 

35 

36# Well-known URI registry with validation 

37WELL_KNOWN_REGISTRY = { 

38 "robots.txt": {"content_type": "text/plain", "description": "Robot exclusion standard", "rfc": "RFC 9309"}, 

39 "security.txt": {"content_type": "text/plain", "description": "Security contact information", "rfc": "RFC 9116"}, 

40 "ai.txt": {"content_type": "text/plain", "description": "AI usage policies", "rfc": "Draft"}, 

41 "dnt-policy.txt": {"content_type": "text/plain", "description": "Do Not Track policy", "rfc": "W3C"}, 

42 "change-password": {"content_type": "text/plain", "description": "Change password URL", "rfc": "RFC 8615"}, 

43} 

44 

45 

46def get_base_url_with_protocol(request: Request) -> str: 

47 """ 

48 Build base URL with correct protocol based on proxy headers. 

49 

50 Uses X-Forwarded-Proto header if present (proxy scenario), 

51 otherwise falls back to request.url.scheme. 

52 

53 Note: request.base_url already includes root_path in FastAPI. 

54 

55 Args: 

56 request: The FastAPI request object. 

57 

58 Returns: 

59 Base URL string with correct protocol, without trailing slash. 

60 

61 Examples: 

62 >>> from mcpgateway.routers.well_known import get_base_url_with_protocol 

63 >>> callable(get_base_url_with_protocol) 

64 True 

65 """ 

66 forwarded_proto = request.headers.get("x-forwarded-proto") 

67 if forwarded_proto: 

68 proto = forwarded_proto.split(",")[0].strip() 

69 else: 

70 proto = request.url.scheme 

71 

72 parsed = urlparse(str(request.base_url)) 

73 new_parsed = parsed._replace(scheme=proto) 

74 return str(urlunparse(new_parsed)).rstrip("/") 

75 

76 

77def validate_security_txt(content: str) -> Optional[str]: 

78 """Validate security.txt format and add headers if missing. 

79 

80 Args: 

81 content: The security.txt content to validate. 

82 

83 Returns: 

84 Validated security.txt content with added headers, or None if content is empty. 

85 """ 

86 if not content: 

87 return None 

88 

89 lines = content.strip().split("\n") 

90 

91 # Check if Expires field exists 

92 has_expires = any(line.strip().startswith("Expires:") for line in lines) 

93 

94 # Add Expires field if missing (6 months from now) 

95 if not has_expires: 

96 expires = datetime.now(timezone.utc).replace(microsecond=0) + timedelta(days=180) 

97 lines.append(f"Expires: {expires.isoformat()}Z") 

98 

99 # Ensure it starts with required headers 

100 validated = [] 

101 

102 # Add header comment if not present 

103 if not lines[0].startswith("#"): 

104 validated.append("# Security contact information for MCP Gateway") 

105 validated.append(f"# Generated: {datetime.now(timezone.utc).replace(microsecond=0).isoformat()}Z") 

106 validated.append("") 

107 

108 validated.extend(lines) 

109 

110 return "\n".join(validated) 

111 

112 

113@router.get("/.well-known/oauth-protected-resource") 

114async def get_oauth_protected_resource( 

115 request: Request, 

116 server_id: Optional[str] = None, 

117 db: Session = Depends(get_db), 

118): 

119 """ 

120 RFC 9728 OAuth 2.0 Protected Resource Metadata endpoint. 

121 

122 Returns OAuth configuration for a server per RFC 9728, enabling MCP clients 

123 to discover OAuth authorization servers and authenticate using browser-based SSO. 

124 

125 Args: 

126 request: FastAPI request object for building resource URL. 

127 server_id: The ID of the server to get OAuth configuration for. 

128 db: Database session dependency. 

129 

130 Returns: 

131 JSONResponse with RFC 9728 Protected Resource Metadata. 

132 

133 Raises: 

134 HTTPException: 404 if server_id not provided, server not found, disabled, 

135 non-public, OAuth not enabled, or not configured. 

136 

137 Examples: 

138 >>> # Request OAuth metadata for a server 

139 >>> # GET /.well-known/oauth-protected-resource?server_id=server-123 

140 >>> # Returns: 

141 >>> # { 

142 >>> # "resource": "https://gateway.example.com/servers/server-123", 

143 >>> # "authorization_servers": ["https://idp.example.com"], 

144 >>> # "bearer_methods_supported": ["header"], 

145 >>> # "scopes_supported": ["openid", "profile"] 

146 >>> # } 

147 """ 

148 if not settings.well_known_enabled: 

149 raise HTTPException(status_code=404, detail="Well-known endpoints are disabled") 

150 

151 # Return 404 when no server_id to avoid exposing Admin UI SSO configuration 

152 if not server_id: 

153 raise HTTPException(status_code=404, detail="Not found") 

154 

155 server = db.get(DbServer, server_id) 

156 

157 if not server: 

158 raise HTTPException(status_code=404, detail="Server not found") 

159 

160 # Return 404 for disabled servers 

161 if not server.enabled: 

162 raise HTTPException(status_code=404, detail="Server not found") 

163 

164 # Only expose OAuth metadata for public servers to avoid leaking metadata 

165 if getattr(server, "visibility", "public") != "public": 

166 raise HTTPException(status_code=404, detail="Server not found") 

167 

168 if not getattr(server, "oauth_enabled", False): 

169 raise HTTPException(status_code=404, detail="OAuth not enabled for this server") 

170 

171 oauth_config = getattr(server, "oauth_config", None) 

172 if not oauth_config: 

173 raise HTTPException(status_code=404, detail="OAuth not configured for this server") 

174 

175 # Build RFC 9728 Protected Resource Metadata response 

176 # Note: get_base_url_with_protocol uses request.base_url which already includes root_path 

177 base_url = get_base_url_with_protocol(request) 

178 resource_url = f"{base_url}/servers/{server_id}" 

179 

180 # Extract authorization server(s) - support both list and single value 

181 authorization_servers = oauth_config.get("authorization_servers", []) 

182 if not authorization_servers: 

183 auth_server = oauth_config.get("authorization_server") 

184 if auth_server: 184 ↛ 187line 184 didn't jump to line 187 because the condition on line 184 was always true

185 authorization_servers = [auth_server] 

186 

187 if not authorization_servers: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true

188 raise HTTPException(status_code=404, detail="OAuth authorization_server not configured") 

189 

190 response_data = { 

191 "resource": resource_url, 

192 "authorization_servers": authorization_servers, 

193 "bearer_methods_supported": ["header"], 

194 } 

195 

196 # Add optional scopes if configured (never echo secrets from oauth_config) 

197 scopes = oauth_config.get("scopes_supported") or oauth_config.get("scopes") 

198 if scopes: 

199 response_data["scopes_supported"] = scopes 

200 

201 # Add cache headers 

202 headers = {"Cache-Control": f"public, max-age={settings.well_known_cache_max_age}"} 

203 

204 logger.debug(f"Returning OAuth protected resource metadata for server {server_id}") 

205 return JSONResponse(content=response_data, headers=headers) 

206 

207 

208def get_well_known_file_content(filename: str) -> PlainTextResponse: 

209 """ 

210 Get the response for a well-known URI file. 

211 

212 This is a shared helper function used by both the root-level and 

213 virtual server well-known endpoints. 

214 

215 Supports: 

216 - robots.txt: Robot exclusion (default: disallow all) 

217 - security.txt: Security contact information (if configured) 

218 - ai.txt: AI usage policies (if configured) 

219 - dnt-policy.txt: Do Not Track policy (if configured) 

220 - Custom files: Additional well-known files via configuration 

221 

222 Args: 

223 filename: The well-known filename requested (without path prefix). 

224 

225 Returns: 

226 PlainTextResponse with the file content. 

227 

228 Raises: 

229 HTTPException: 404 if file not found, not configured, or well-known disabled. 

230 

231 Examples: 

232 >>> from mcpgateway.routers.well_known import get_well_known_file_content 

233 >>> callable(get_well_known_file_content) 

234 True 

235 """ 

236 if not settings.well_known_enabled: 

237 raise HTTPException(status_code=404, detail="Not found") 

238 

239 # Normalize filename (remove any leading slashes) 

240 filename = filename.strip("/") 

241 

242 # Prepare common headers 

243 common_headers = {"Cache-Control": f"public, max-age={settings.well_known_cache_max_age}"} 

244 

245 # Handle robots.txt 

246 if filename == "robots.txt": 

247 headers = {**common_headers, "X-Robots-Tag": "noindex, nofollow"} 

248 return PlainTextResponse(content=settings.well_known_robots_txt, media_type="text/plain; charset=utf-8", headers=headers) 

249 

250 # Handle security.txt 

251 elif filename == "security.txt": 

252 if not settings.well_known_security_txt_enabled: 

253 raise HTTPException(status_code=404, detail="security.txt not configured") 

254 

255 content = validate_security_txt(settings.well_known_security_txt) 

256 if not content: 

257 raise HTTPException(status_code=404, detail="security.txt not configured") 

258 

259 return PlainTextResponse(content=content, media_type="text/plain; charset=utf-8", headers=common_headers) 

260 

261 # Handle custom files (includes ai.txt, dnt-policy.txt if configured) 

262 elif filename in settings.custom_well_known_files: 

263 content = settings.custom_well_known_files[filename] 

264 

265 # Determine content type 

266 content_type = "text/plain; charset=utf-8" 

267 if filename in WELL_KNOWN_REGISTRY: 

268 content_type = f"{WELL_KNOWN_REGISTRY[filename]['content_type']}; charset=utf-8" 

269 

270 return PlainTextResponse(content=content, media_type=content_type, headers=common_headers) 

271 

272 # File not found 

273 else: 

274 # Provide helpful error for known well-known URIs 

275 if filename in WELL_KNOWN_REGISTRY: 

276 raise HTTPException(status_code=404, detail=f"{filename} is not configured. This is a {WELL_KNOWN_REGISTRY[filename]['description']} file.") 

277 else: 

278 raise HTTPException(status_code=404, detail="Not found") 

279 

280 

281@router.get("/.well-known/{filename:path}", include_in_schema=False) 

282async def get_well_known_file(filename: str, response: Response, request: Request): 

283 """ 

284 Serve well-known URI files at the root level. 

285 

286 Supports: 

287 - robots.txt: Robot exclusion (default: disallow all) 

288 - security.txt: Security contact information (if configured) 

289 - ai.txt: AI usage policies (if configured) 

290 - dnt-policy.txt: Do Not Track policy (if configured) 

291 - Custom files: Additional well-known files via configuration 

292 

293 Args: 

294 filename: The well-known filename requested 

295 response: FastAPI response object for headers 

296 request: FastAPI request object for logging 

297 

298 Returns: 

299 Plain text content of the requested file 

300 

301 Raises: 

302 HTTPException: 404 if file not found or well-known disabled 

303 

304 Examples: 

305 >>> import asyncio 

306 >>> asyncio.iscoroutinefunction(get_well_known_file) 

307 True 

308 """ 

309 return get_well_known_file_content(filename) 

310 

311 

312@router.get("/admin/well-known", response_model=dict) 

313async def get_well_known_status(user: str = Depends(require_auth)): 

314 """ 

315 Get status of well-known URI configuration. 

316 

317 Args: 

318 user: Authenticated user from dependency injection. 

319 

320 Returns: 

321 Dict containing well-known configuration status and available files. 

322 """ 

323 configured_files = [] 

324 

325 # Always available 

326 configured_files.append({"path": "/.well-known/robots.txt", "enabled": True, "description": "Robot exclusion standard", "cache_max_age": settings.well_known_cache_max_age}) 

327 

328 # Conditionally available 

329 if settings.well_known_security_txt_enabled: 

330 configured_files.append({"path": "/.well-known/security.txt", "enabled": True, "description": "Security contact information", "cache_max_age": settings.well_known_cache_max_age}) 

331 

332 # Custom files 

333 for filename in settings.custom_well_known_files: 

334 configured_files.append({"path": f"/.well-known/{filename}", "enabled": True, "description": "Custom well-known file", "cache_max_age": settings.well_known_cache_max_age}) 

335 

336 return {"enabled": settings.well_known_enabled, "configured_files": configured_files, "supported_files": list(WELL_KNOWN_REGISTRY.keys()), "cache_max_age": settings.well_known_cache_max_age}