Coverage for mcpgateway / routers / well_known.py: 98%
108 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/routers/well_known.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Well-Known URI Handler Router.
8This module implements a flexible /.well-known/* endpoint handler that supports
9standard well-known URIs like security.txt and robots.txt with user-configurable content.
10Defaults assume private API deployment with crawling disabled.
11"""
13# Standard
14from datetime import datetime, timedelta, timezone
15from typing import Optional
16from urllib.parse import urlparse, urlunparse
18# Third-Party
19from fastapi import APIRouter, Depends, HTTPException, Request, Response
20from fastapi.responses import JSONResponse, PlainTextResponse
21from sqlalchemy.orm import Session
23# First-Party
24from mcpgateway.config import settings
25from mcpgateway.db import get_db
26from mcpgateway.db import Server as DbServer
27from mcpgateway.services.logging_service import LoggingService
28from mcpgateway.utils.verify_credentials import require_auth
30# Get logger instance
31logging_service = LoggingService()
32logger = logging_service.get_logger(__name__)
34router = APIRouter(tags=["well-known"])
36# Well-known URI registry with validation
37WELL_KNOWN_REGISTRY = {
38 "robots.txt": {"content_type": "text/plain", "description": "Robot exclusion standard", "rfc": "RFC 9309"},
39 "security.txt": {"content_type": "text/plain", "description": "Security contact information", "rfc": "RFC 9116"},
40 "ai.txt": {"content_type": "text/plain", "description": "AI usage policies", "rfc": "Draft"},
41 "dnt-policy.txt": {"content_type": "text/plain", "description": "Do Not Track policy", "rfc": "W3C"},
42 "change-password": {"content_type": "text/plain", "description": "Change password URL", "rfc": "RFC 8615"},
43}
46def get_base_url_with_protocol(request: Request) -> str:
47 """
48 Build base URL with correct protocol based on proxy headers.
50 Uses X-Forwarded-Proto header if present (proxy scenario),
51 otherwise falls back to request.url.scheme.
53 Note: request.base_url already includes root_path in FastAPI.
55 Args:
56 request: The FastAPI request object.
58 Returns:
59 Base URL string with correct protocol, without trailing slash.
61 Examples:
62 >>> from mcpgateway.routers.well_known import get_base_url_with_protocol
63 >>> callable(get_base_url_with_protocol)
64 True
65 """
66 forwarded_proto = request.headers.get("x-forwarded-proto")
67 if forwarded_proto:
68 proto = forwarded_proto.split(",")[0].strip()
69 else:
70 proto = request.url.scheme
72 parsed = urlparse(str(request.base_url))
73 new_parsed = parsed._replace(scheme=proto)
74 return str(urlunparse(new_parsed)).rstrip("/")
77def validate_security_txt(content: str) -> Optional[str]:
78 """Validate security.txt format and add headers if missing.
80 Args:
81 content: The security.txt content to validate.
83 Returns:
84 Validated security.txt content with added headers, or None if content is empty.
85 """
86 if not content:
87 return None
89 lines = content.strip().split("\n")
91 # Check if Expires field exists
92 has_expires = any(line.strip().startswith("Expires:") for line in lines)
94 # Add Expires field if missing (6 months from now)
95 if not has_expires:
96 expires = datetime.now(timezone.utc).replace(microsecond=0) + timedelta(days=180)
97 lines.append(f"Expires: {expires.isoformat()}Z")
99 # Ensure it starts with required headers
100 validated = []
102 # Add header comment if not present
103 if not lines[0].startswith("#"):
104 validated.append("# Security contact information for MCP Gateway")
105 validated.append(f"# Generated: {datetime.now(timezone.utc).replace(microsecond=0).isoformat()}Z")
106 validated.append("")
108 validated.extend(lines)
110 return "\n".join(validated)
113@router.get("/.well-known/oauth-protected-resource")
114async def get_oauth_protected_resource(
115 request: Request,
116 server_id: Optional[str] = None,
117 db: Session = Depends(get_db),
118):
119 """
120 RFC 9728 OAuth 2.0 Protected Resource Metadata endpoint.
122 Returns OAuth configuration for a server per RFC 9728, enabling MCP clients
123 to discover OAuth authorization servers and authenticate using browser-based SSO.
125 Args:
126 request: FastAPI request object for building resource URL.
127 server_id: The ID of the server to get OAuth configuration for.
128 db: Database session dependency.
130 Returns:
131 JSONResponse with RFC 9728 Protected Resource Metadata.
133 Raises:
134 HTTPException: 404 if server_id not provided, server not found, disabled,
135 non-public, OAuth not enabled, or not configured.
137 Examples:
138 >>> # Request OAuth metadata for a server
139 >>> # GET /.well-known/oauth-protected-resource?server_id=server-123
140 >>> # Returns:
141 >>> # {
142 >>> # "resource": "https://gateway.example.com/servers/server-123",
143 >>> # "authorization_servers": ["https://idp.example.com"],
144 >>> # "bearer_methods_supported": ["header"],
145 >>> # "scopes_supported": ["openid", "profile"]
146 >>> # }
147 """
148 if not settings.well_known_enabled:
149 raise HTTPException(status_code=404, detail="Well-known endpoints are disabled")
151 # Return 404 when no server_id to avoid exposing Admin UI SSO configuration
152 if not server_id:
153 raise HTTPException(status_code=404, detail="Not found")
155 server = db.get(DbServer, server_id)
157 if not server:
158 raise HTTPException(status_code=404, detail="Server not found")
160 # Return 404 for disabled servers
161 if not server.enabled:
162 raise HTTPException(status_code=404, detail="Server not found")
164 # Only expose OAuth metadata for public servers to avoid leaking metadata
165 if getattr(server, "visibility", "public") != "public":
166 raise HTTPException(status_code=404, detail="Server not found")
168 if not getattr(server, "oauth_enabled", False):
169 raise HTTPException(status_code=404, detail="OAuth not enabled for this server")
171 oauth_config = getattr(server, "oauth_config", None)
172 if not oauth_config:
173 raise HTTPException(status_code=404, detail="OAuth not configured for this server")
175 # Build RFC 9728 Protected Resource Metadata response
176 # Note: get_base_url_with_protocol uses request.base_url which already includes root_path
177 base_url = get_base_url_with_protocol(request)
178 resource_url = f"{base_url}/servers/{server_id}"
180 # Extract authorization server(s) - support both list and single value
181 authorization_servers = oauth_config.get("authorization_servers", [])
182 if not authorization_servers:
183 auth_server = oauth_config.get("authorization_server")
184 if auth_server: 184 ↛ 187line 184 didn't jump to line 187 because the condition on line 184 was always true
185 authorization_servers = [auth_server]
187 if not authorization_servers: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true
188 raise HTTPException(status_code=404, detail="OAuth authorization_server not configured")
190 response_data = {
191 "resource": resource_url,
192 "authorization_servers": authorization_servers,
193 "bearer_methods_supported": ["header"],
194 }
196 # Add optional scopes if configured (never echo secrets from oauth_config)
197 scopes = oauth_config.get("scopes_supported") or oauth_config.get("scopes")
198 if scopes:
199 response_data["scopes_supported"] = scopes
201 # Add cache headers
202 headers = {"Cache-Control": f"public, max-age={settings.well_known_cache_max_age}"}
204 logger.debug(f"Returning OAuth protected resource metadata for server {server_id}")
205 return JSONResponse(content=response_data, headers=headers)
208def get_well_known_file_content(filename: str) -> PlainTextResponse:
209 """
210 Get the response for a well-known URI file.
212 This is a shared helper function used by both the root-level and
213 virtual server well-known endpoints.
215 Supports:
216 - robots.txt: Robot exclusion (default: disallow all)
217 - security.txt: Security contact information (if configured)
218 - ai.txt: AI usage policies (if configured)
219 - dnt-policy.txt: Do Not Track policy (if configured)
220 - Custom files: Additional well-known files via configuration
222 Args:
223 filename: The well-known filename requested (without path prefix).
225 Returns:
226 PlainTextResponse with the file content.
228 Raises:
229 HTTPException: 404 if file not found, not configured, or well-known disabled.
231 Examples:
232 >>> from mcpgateway.routers.well_known import get_well_known_file_content
233 >>> callable(get_well_known_file_content)
234 True
235 """
236 if not settings.well_known_enabled:
237 raise HTTPException(status_code=404, detail="Not found")
239 # Normalize filename (remove any leading slashes)
240 filename = filename.strip("/")
242 # Prepare common headers
243 common_headers = {"Cache-Control": f"public, max-age={settings.well_known_cache_max_age}"}
245 # Handle robots.txt
246 if filename == "robots.txt":
247 headers = {**common_headers, "X-Robots-Tag": "noindex, nofollow"}
248 return PlainTextResponse(content=settings.well_known_robots_txt, media_type="text/plain; charset=utf-8", headers=headers)
250 # Handle security.txt
251 elif filename == "security.txt":
252 if not settings.well_known_security_txt_enabled:
253 raise HTTPException(status_code=404, detail="security.txt not configured")
255 content = validate_security_txt(settings.well_known_security_txt)
256 if not content:
257 raise HTTPException(status_code=404, detail="security.txt not configured")
259 return PlainTextResponse(content=content, media_type="text/plain; charset=utf-8", headers=common_headers)
261 # Handle custom files (includes ai.txt, dnt-policy.txt if configured)
262 elif filename in settings.custom_well_known_files:
263 content = settings.custom_well_known_files[filename]
265 # Determine content type
266 content_type = "text/plain; charset=utf-8"
267 if filename in WELL_KNOWN_REGISTRY:
268 content_type = f"{WELL_KNOWN_REGISTRY[filename]['content_type']}; charset=utf-8"
270 return PlainTextResponse(content=content, media_type=content_type, headers=common_headers)
272 # File not found
273 else:
274 # Provide helpful error for known well-known URIs
275 if filename in WELL_KNOWN_REGISTRY:
276 raise HTTPException(status_code=404, detail=f"{filename} is not configured. This is a {WELL_KNOWN_REGISTRY[filename]['description']} file.")
277 else:
278 raise HTTPException(status_code=404, detail="Not found")
281@router.get("/.well-known/{filename:path}", include_in_schema=False)
282async def get_well_known_file(filename: str, response: Response, request: Request):
283 """
284 Serve well-known URI files at the root level.
286 Supports:
287 - robots.txt: Robot exclusion (default: disallow all)
288 - security.txt: Security contact information (if configured)
289 - ai.txt: AI usage policies (if configured)
290 - dnt-policy.txt: Do Not Track policy (if configured)
291 - Custom files: Additional well-known files via configuration
293 Args:
294 filename: The well-known filename requested
295 response: FastAPI response object for headers
296 request: FastAPI request object for logging
298 Returns:
299 Plain text content of the requested file
301 Raises:
302 HTTPException: 404 if file not found or well-known disabled
304 Examples:
305 >>> import asyncio
306 >>> asyncio.iscoroutinefunction(get_well_known_file)
307 True
308 """
309 return get_well_known_file_content(filename)
312@router.get("/admin/well-known", response_model=dict)
313async def get_well_known_status(user: str = Depends(require_auth)):
314 """
315 Get status of well-known URI configuration.
317 Args:
318 user: Authenticated user from dependency injection.
320 Returns:
321 Dict containing well-known configuration status and available files.
322 """
323 configured_files = []
325 # Always available
326 configured_files.append({"path": "/.well-known/robots.txt", "enabled": True, "description": "Robot exclusion standard", "cache_max_age": settings.well_known_cache_max_age})
328 # Conditionally available
329 if settings.well_known_security_txt_enabled:
330 configured_files.append({"path": "/.well-known/security.txt", "enabled": True, "description": "Security contact information", "cache_max_age": settings.well_known_cache_max_age})
332 # Custom files
333 for filename in settings.custom_well_known_files:
334 configured_files.append({"path": f"/.well-known/{filename}", "enabled": True, "description": "Custom well-known file", "cache_max_age": settings.well_known_cache_max_age})
336 return {"enabled": settings.well_known_enabled, "configured_files": configured_files, "supported_files": list(WELL_KNOWN_REGISTRY.keys()), "cache_max_age": settings.well_known_cache_max_age}