Coverage for mcpgateway / routers / well_known.py: 99%
110 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/routers/well_known.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Well-Known URI Handler Router.
8This module implements a flexible /.well-known/* endpoint handler that supports
9standard well-known URIs like security.txt and robots.txt with user-configurable content.
10Defaults assume private API deployment with crawling disabled.
11"""
13# Standard
14from datetime import datetime, timedelta, timezone
15import re
16from typing import Optional
17from urllib.parse import urlparse, urlunparse
19# Third-Party
20from fastapi import APIRouter, Depends, HTTPException, Request, Response
21from fastapi.responses import JSONResponse, PlainTextResponse
22from sqlalchemy.orm import Session
24# First-Party
25from mcpgateway.config import settings
26from mcpgateway.db import get_db
27from mcpgateway.services.logging_service import LoggingService
28from mcpgateway.services.server_service import ServerError, ServerNotFoundError, ServerService
29from mcpgateway.utils.log_sanitizer import sanitize_for_log
30from mcpgateway.utils.verify_credentials import require_auth
32# Get logger instance
33logging_service = LoggingService()
34logger = logging_service.get_logger(__name__)
36router = APIRouter(tags=["well-known"])
38# UUID validation pattern for RFC 9728 endpoint
39UUID_PATTERN = re.compile(r"^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$", re.IGNORECASE)
41# Well-known URI registry with validation
42WELL_KNOWN_REGISTRY = {
43 "robots.txt": {"content_type": "text/plain", "description": "Robot exclusion standard", "rfc": "RFC 9309"},
44 "security.txt": {"content_type": "text/plain", "description": "Security contact information", "rfc": "RFC 9116"},
45 "ai.txt": {"content_type": "text/plain", "description": "AI usage policies", "rfc": "Draft"},
46 "dnt-policy.txt": {"content_type": "text/plain", "description": "Do Not Track policy", "rfc": "W3C"},
47 "change-password": {"content_type": "text/plain", "description": "Change password URL", "rfc": "RFC 8615"},
48}
51def get_base_url_with_protocol(request: Request) -> str:
52 """
53 Build base URL with correct protocol based on proxy headers.
55 Uses X-Forwarded-Proto header if present (proxy scenario),
56 otherwise falls back to request.url.scheme.
58 Note: request.base_url already includes root_path in FastAPI.
60 Args:
61 request: The FastAPI request object.
63 Returns:
64 Base URL string with correct protocol, without trailing slash.
66 Examples:
67 >>> from mcpgateway.routers.well_known import get_base_url_with_protocol
68 >>> callable(get_base_url_with_protocol)
69 True
70 """
71 forwarded_proto = request.headers.get("x-forwarded-proto")
72 if forwarded_proto:
73 proto = forwarded_proto.split(",")[0].strip()
74 else:
75 proto = request.url.scheme
77 parsed = urlparse(str(request.base_url))
78 new_parsed = parsed._replace(scheme=proto)
79 return str(urlunparse(new_parsed)).rstrip("/")
82def validate_security_txt(content: str) -> Optional[str]:
83 """Validate security.txt format and add headers if missing.
85 Args:
86 content: The security.txt content to validate.
88 Returns:
89 Validated security.txt content with added headers, or None if content is empty.
90 """
91 if not content:
92 return None
94 lines = content.strip().split("\n")
96 # Check if Expires field exists
97 has_expires = any(line.strip().startswith("Expires:") for line in lines)
99 # Add Expires field if missing (6 months from now)
100 if not has_expires:
101 expires = datetime.now(timezone.utc).replace(microsecond=0) + timedelta(days=180)
102 lines.append(f"Expires: {expires.isoformat()}Z")
104 # Ensure it starts with required headers
105 validated = []
107 # Add header comment if not present
108 if not lines[0].startswith("#"):
109 validated.append("# Security contact information for ContextForge")
110 validated.append(f"# Generated: {datetime.now(timezone.utc).replace(microsecond=0).isoformat()}Z")
111 validated.append("")
113 validated.extend(lines)
115 return "\n".join(validated)
118@router.get("/.well-known/oauth-protected-resource/{path:path}")
119async def get_oauth_protected_resource_rfc9728(
120 path: str,
121 request: Request,
122 db: Session = Depends(get_db),
123) -> JSONResponse:
124 """
125 RFC 9728 OAuth 2.0 Protected Resource Metadata endpoint (path-based).
127 Per RFC 9728 Section 3.1, the well-known URI is constructed by:
128 1. Taking the resource URL: http://localhost:4444/servers/{UUID}/mcp
129 2. Removing trailing slash and inserting /.well-known/oauth-protected-resource/
130 3. Result: http://localhost:4444/.well-known/oauth-protected-resource/servers/{UUID}/mcp
132 This endpoint does not require authentication per RFC 9728 requirements.
134 Args:
135 path: The resource path after oauth-protected-resource/ (e.g., "servers/{UUID}/mcp")
136 request: FastAPI request object for building resource URL
137 db: Database session dependency
139 Returns:
140 JSONResponse with RFC 9728 Protected Resource Metadata:
141 {
142 "resource": "http://localhost:4444/servers/{UUID}/mcp",
143 "authorization_servers": ["https://auth.example.com"],
144 "bearer_methods_supported": ["header"],
145 "scopes_supported": ["read", "write"]
146 }
148 Raises:
149 HTTPException: 404 if path format invalid, server not found, disabled,
150 non-public, OAuth not enabled, or not configured.
152 Examples:
153 >>> # Request OAuth metadata for a server
154 >>> # GET /.well-known/oauth-protected-resource/servers/abc123/mcp
155 >>> # Returns RFC 9728 compliant metadata
156 """
157 if not settings.well_known_enabled:
158 raise HTTPException(status_code=404, detail="Not found")
160 # Parse path to extract server_id with validation
161 # Expected formats:
162 # - "servers/{UUID}/mcp" (standard MCP endpoint)
163 # - "servers/{UUID}" (fallback without /mcp suffix)
164 path_parts = path.strip("/").split("/")
166 # Validate path structure
167 if len(path_parts) < 2 or path_parts[0] != "servers":
168 # Sanitize untrusted path before logging to prevent log injection
169 logger.debug(f"Invalid RFC 9728 path format: {sanitize_for_log(path)}")
170 raise HTTPException(status_code=404, detail="Invalid resource path format. Expected: /.well-known/oauth-protected-resource/servers/{server_id}/mcp")
172 server_id = path_parts[1]
174 # Validate server_id is a valid UUID (prevents path traversal and injection)
175 if not UUID_PATTERN.match(server_id):
176 # Sanitize untrusted server_id before logging to prevent log injection
177 logger.warning(f"Invalid server_id format (not a UUID): {sanitize_for_log(server_id)}")
178 raise HTTPException(status_code=404, detail="Invalid server_id format. Must be a valid UUID.")
180 # Reject paths with extra segments after /mcp (e.g., servers/uuid/mcp/extra)
181 if len(path_parts) > 3:
182 # Sanitize untrusted path before logging to prevent log injection
183 logger.warning(f"RFC 9728 path has unexpected segments: {sanitize_for_log(path)}")
184 raise HTTPException(status_code=404, detail="Invalid resource path format. Expected: /.well-known/oauth-protected-resource/servers/{server_id}/mcp")
186 # Build resource URL with /mcp suffix per MCP specification
187 base_url = get_base_url_with_protocol(request)
188 resource_url = f"{base_url}/servers/{server_id}/mcp"
190 server_service = ServerService()
191 try:
192 response_data = server_service.get_oauth_protected_resource_metadata(db=db, server_id=server_id, resource_base_url=resource_url)
193 except ServerNotFoundError:
194 raise HTTPException(status_code=404, detail="Server not found")
195 except ServerError as e:
196 raise HTTPException(status_code=404, detail=str(e))
198 # Add cache headers per RFC 9728 recommendations
199 headers = {"Cache-Control": f"public, max-age={settings.well_known_cache_max_age}"}
201 logger.debug(f"Served RFC 9728 OAuth metadata for server {server_id}")
202 return JSONResponse(content=response_data, headers=headers)
205@router.get("/.well-known/oauth-protected-resource")
206async def get_oauth_protected_resource(
207 request: Request,
208 server_id: Optional[str] = None,
209):
210 """
211 DEPRECATED: OAuth 2.0 Protected Resource Metadata endpoint (query parameter based).
213 This endpoint is deprecated and non-compliant with RFC 9728. It returns 404.
215 RFC 9728 requires path-based discovery, not query parameters.
216 Use the RFC 9728 compliant endpoint instead:
217 /.well-known/oauth-protected-resource/servers/{server_id}/mcp
219 Args:
220 request: FastAPI request object (unused).
221 server_id: Server ID query parameter (ignored).
223 Raises:
224 HTTPException: Always raises 404 with deprecation notice.
225 """
226 if not settings.well_known_enabled:
227 raise HTTPException(status_code=404, detail="Not found")
229 logger.warning("Deprecated query-param OAuth metadata endpoint called. " "Use RFC 9728 compliant path-based endpoint: " "/.well-known/oauth-protected-resource/servers/{server_id}/mcp")
230 raise HTTPException(
231 status_code=404, detail=("This endpoint is deprecated and non-compliant with RFC 9728. " "Use the path-based endpoint: " "/.well-known/oauth-protected-resource/servers/{server_id}/mcp")
232 )
235def get_well_known_file_content(filename: str) -> PlainTextResponse:
236 """
237 Get the response for a well-known URI file.
239 This is a shared helper function used by both the root-level and
240 virtual server well-known endpoints.
242 Supports:
243 - robots.txt: Robot exclusion (default: disallow all)
244 - security.txt: Security contact information (if configured)
245 - ai.txt: AI usage policies (if configured)
246 - dnt-policy.txt: Do Not Track policy (if configured)
247 - Custom files: Additional well-known files via configuration
249 Args:
250 filename: The well-known filename requested (without path prefix).
252 Returns:
253 PlainTextResponse with the file content.
255 Raises:
256 HTTPException: 404 if file not found, not configured, or well-known disabled.
258 Examples:
259 >>> from mcpgateway.routers.well_known import get_well_known_file_content
260 >>> callable(get_well_known_file_content)
261 True
262 """
263 if not settings.well_known_enabled:
264 raise HTTPException(status_code=404, detail="Not found")
266 # Normalize filename (remove any leading slashes)
267 filename = filename.strip("/")
269 # Prepare common headers
270 common_headers = {"Cache-Control": f"public, max-age={settings.well_known_cache_max_age}"}
272 # Handle robots.txt
273 if filename == "robots.txt":
274 headers = {**common_headers, "X-Robots-Tag": "noindex, nofollow"}
275 return PlainTextResponse(content=settings.well_known_robots_txt, media_type="text/plain; charset=utf-8", headers=headers)
277 # Handle security.txt
278 elif filename == "security.txt":
279 if not settings.well_known_security_txt_enabled:
280 raise HTTPException(status_code=404, detail="security.txt not configured")
282 content = validate_security_txt(settings.well_known_security_txt)
283 if not content:
284 raise HTTPException(status_code=404, detail="security.txt not configured")
286 return PlainTextResponse(content=content, media_type="text/plain; charset=utf-8", headers=common_headers)
288 # Handle custom files (includes ai.txt, dnt-policy.txt if configured)
289 elif filename in settings.custom_well_known_files:
290 content = settings.custom_well_known_files[filename]
292 # Determine content type
293 content_type = "text/plain; charset=utf-8"
294 if filename in WELL_KNOWN_REGISTRY:
295 content_type = f"{WELL_KNOWN_REGISTRY[filename]['content_type']}; charset=utf-8"
297 return PlainTextResponse(content=content, media_type=content_type, headers=common_headers)
299 # File not found
300 else:
301 # Provide helpful error for known well-known URIs
302 if filename in WELL_KNOWN_REGISTRY:
303 raise HTTPException(status_code=404, detail=f"{filename} is not configured. This is a {WELL_KNOWN_REGISTRY[filename]['description']} file.")
304 else:
305 raise HTTPException(status_code=404, detail="Not found")
308@router.get("/.well-known/{filename:path}", include_in_schema=False)
309async def get_well_known_file(filename: str, response: Response, request: Request):
310 """
311 Serve well-known URI files at the root level.
313 Supports:
314 - robots.txt: Robot exclusion (default: disallow all)
315 - security.txt: Security contact information (if configured)
316 - ai.txt: AI usage policies (if configured)
317 - dnt-policy.txt: Do Not Track policy (if configured)
318 - Custom files: Additional well-known files via configuration
320 Args:
321 filename: The well-known filename requested
322 response: FastAPI response object for headers
323 request: FastAPI request object for logging
325 Returns:
326 Plain text content of the requested file
328 Raises:
329 HTTPException: 404 if file not found or well-known disabled
331 Examples:
332 >>> import asyncio
333 >>> asyncio.iscoroutinefunction(get_well_known_file)
334 True
335 """
336 return get_well_known_file_content(filename)
339@router.get("/admin/well-known", response_model=dict)
340async def get_well_known_status(user: str = Depends(require_auth)):
341 """
342 Get status of well-known URI configuration.
344 Args:
345 user: Authenticated user from dependency injection.
347 Returns:
348 Dict containing well-known configuration status and available files.
349 """
350 configured_files = []
352 # Always available
353 configured_files.append({"path": "/.well-known/robots.txt", "enabled": True, "description": "Robot exclusion standard", "cache_max_age": settings.well_known_cache_max_age})
355 # Conditionally available
356 if settings.well_known_security_txt_enabled:
357 configured_files.append({"path": "/.well-known/security.txt", "enabled": True, "description": "Security contact information", "cache_max_age": settings.well_known_cache_max_age})
359 # Custom files
360 for filename in settings.custom_well_known_files:
361 configured_files.append({"path": f"/.well-known/{filename}", "enabled": True, "description": "Custom well-known file", "cache_max_age": settings.well_known_cache_max_age})
363 return {"enabled": settings.well_known_enabled, "configured_files": configured_files, "supported_files": list(WELL_KNOWN_REGISTRY.keys()), "cache_max_age": settings.well_known_cache_max_age}