Coverage for mcpgateway / middleware / security_headers.py: 100%
62 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/middleware/security_headers.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Security Headers Middleware for ContextForge.
9This module implements essential security headers to prevent common attacks including
10XSS, clickjacking, MIME sniffing, and cross-origin attacks.
11"""
13# Third-Party
14from starlette.middleware.base import BaseHTTPMiddleware
15from starlette.requests import Request
16from starlette.responses import Response
18# First-Party
19from mcpgateway.config import settings
22class SecurityHeadersMiddleware(BaseHTTPMiddleware):
23 """
24 Security headers middleware that adds essential security headers to all responses.
26 This middleware implements security best practices by adding headers that help
27 prevent various types of attacks and security vulnerabilities.
29 Security headers added:
30 - X-Content-Type-Options: Prevents MIME type sniffing
31 - X-Frame-Options: Prevents clickjacking attacks
32 - X-XSS-Protection: Disables legacy XSS protection (modern browsers use CSP)
33 - Referrer-Policy: Controls referrer information sent with requests
34 - Content-Security-Policy: Prevents XSS and other code injection attacks
35 - Strict-Transport-Security: Forces HTTPS connections (when appropriate)
37 Sensitive headers removed:
38 - X-Powered-By: Removes server technology disclosure
39 - Server: Removes server version information
41 Examples:
42 >>> middleware = SecurityHeadersMiddleware(None)
43 >>> isinstance(middleware, SecurityHeadersMiddleware)
44 True
45 >>> # Test CSP directive construction
46 >>> csp_directives = [
47 ... "default-src 'self'",
48 ... "script-src 'self' 'unsafe-inline'",
49 ... "style-src 'self' 'unsafe-inline'"
50 ... ]
51 >>> csp = "; ".join(csp_directives) + ";"
52 >>> "default-src 'self'" in csp
53 True
54 >>> csp.endswith(";")
55 True
56 >>> # Test HSTS value construction
57 >>> hsts_max_age = 31536000
58 >>> hsts_value = f"max-age={hsts_max_age}"
59 >>> include_subdomains = True
60 >>> if include_subdomains:
61 ... hsts_value += "; includeSubDomains"
62 >>> "max-age=31536000" in hsts_value
63 True
64 >>> "includeSubDomains" in hsts_value
65 True
66 >>> # Test CORS origin validation logic
67 >>> allowed_origins = ["https://example.com", "https://app.example.com"]
68 >>> origin = "https://example.com"
69 >>> origin in allowed_origins
70 True
71 >>> "https://malicious.com" in allowed_origins
72 False
73 >>> # Test Vary header construction
74 >>> existing_vary = "Accept-Encoding"
75 >>> vary_val = "Origin" if not existing_vary else (existing_vary + ", Origin")
76 >>> vary_val
77 'Accept-Encoding, Origin'
78 """
80 async def dispatch(self, request: Request, call_next) -> Response:
81 """
82 Process the request and add security headers to the response.
84 Args:
85 request: The incoming HTTP request
86 call_next: The next middleware or endpoint handler
88 Returns:
89 Response with security headers added
91 Examples:
92 Test middleware instantiation:
93 >>> from mcpgateway.middleware.security_headers import SecurityHeadersMiddleware
94 >>> middleware = SecurityHeadersMiddleware(app=None)
95 >>> isinstance(middleware, SecurityHeadersMiddleware)
96 True
98 Test security header values:
99 >>> # X-Content-Type-Options
100 >>> x_content_type = "nosniff"
101 >>> x_content_type == "nosniff"
102 True
104 >>> # X-XSS-Protection modern value
105 >>> x_xss_protection = "0" # Modern browsers use CSP
106 >>> x_xss_protection == "0"
107 True
109 >>> # X-Download-Options for IE
110 >>> x_download_options = "noopen"
111 >>> x_download_options == "noopen"
112 True
114 >>> # Referrer-Policy value
115 >>> referrer_policy = "strict-origin-when-cross-origin"
116 >>> "strict-origin" in referrer_policy
117 True
119 Test CSP directive construction:
120 >>> csp_directives = [
121 ... "default-src 'self'",
122 ... "script-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdnjs.cloudflare.com",
123 ... "style-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com",
124 ... "img-src 'self' data: https:",
125 ... "font-src 'self' data: https://cdnjs.cloudflare.com",
126 ... "connect-src 'self' ws: wss: https:",
127 ... "frame-ancestors 'self'", # Example for SAMEORIGIN
128 ... ]
129 >>> csp_header = "; ".join(csp_directives) + ";"
130 >>> "default-src 'self'" in csp_header
131 True
132 >>> "frame-ancestors 'self'" in csp_header
133 True
134 >>> csp_header.endswith(";")
135 True
137 Test HSTS header construction:
138 >>> hsts_max_age = 31536000 # 1 year
139 >>> hsts_value = f"max-age={hsts_max_age}"
140 >>> hsts_include_subdomains = True
141 >>> if hsts_include_subdomains:
142 ... hsts_value += "; includeSubDomains"
143 >>> "max-age=31536000" in hsts_value
144 True
145 >>> "includeSubDomains" in hsts_value
146 True
148 Test CORS origin validation logic:
149 >>> # Test allowed origins check
150 >>> allowed_origins = ["https://example.com", "https://app.example.com"]
151 >>> test_origin = "https://example.com"
152 >>> test_origin in allowed_origins
153 True
154 >>> "https://malicious.com" in allowed_origins
155 False
157 >>> # Test CORS credentials header
158 >>> cors_allow_credentials = True
159 >>> credentials_header = "true" if cors_allow_credentials else "false"
160 >>> credentials_header == "true"
161 True
163 Test Vary header construction:
164 >>> # Test with no existing Vary header
165 >>> existing_vary = None
166 >>> vary_val = "Origin" if not existing_vary else (existing_vary + ", Origin")
167 >>> vary_val
168 'Origin'
170 >>> # Test with existing Vary header
171 >>> existing_vary = "Accept-Encoding"
172 >>> vary_val = "Origin" if not existing_vary else (existing_vary + ", Origin")
173 >>> vary_val
174 'Accept-Encoding, Origin'
176 Test Access-Control-Expose-Headers:
177 >>> exposed_headers = ["Content-Length", "X-Request-ID"]
178 >>> expose_header_value = ", ".join(exposed_headers)
179 >>> "Content-Length" in expose_header_value
180 True
181 >>> "X-Request-ID" in expose_header_value
182 True
184 Test server header removal logic:
185 >>> # Headers that should be removed
186 >>> sensitive_headers = ["X-Powered-By", "Server"]
187 >>> "X-Powered-By" in sensitive_headers
188 True
189 >>> "Server" in sensitive_headers
190 True
192 Test environment-based CORS logic:
193 >>> # Production environment requires explicit allowlist
194 >>> environment = "production"
195 >>> origin = "https://example.com"
196 >>> allowed_origins = ["https://example.com"]
197 >>> allow = origin in allowed_origins if environment == "production" else True
198 >>> allow
199 True
201 >>> # Non-production with empty allowed_origins allows all
202 >>> environment = "development"
203 >>> allowed_origins = []
204 >>> allow = (not allowed_origins) if environment != "production" else False
205 >>> allow
206 True
208 Execute middleware end-to-end with a dummy call_next:
209 >>> import asyncio
210 >>> from unittest.mock import patch
211 >>> from starlette.requests import Request
212 >>> from starlette.responses import Response
213 >>> async def call_next(req):
214 ... return Response("ok")
215 >>> scope = {
216 ... 'type': 'http', 'method': 'GET', 'path': '/', 'scheme': 'https',
217 ... 'headers': [(b'origin', b'https://example.com'), (b'x-forwarded-proto', b'https')]
218 ... }
219 >>> request = Request(scope)
220 >>> mw = SecurityHeadersMiddleware(app=None)
221 >>> with patch('mcpgateway.middleware.security_headers.settings') as s:
222 ... s.security_headers_enabled = True
223 ... s.x_content_type_options_enabled = True
224 ... s.x_frame_options = 'DENY'
225 ... s.x_xss_protection_enabled = True
226 ... s.x_download_options_enabled = True
227 ... s.hsts_enabled = True
228 ... s.hsts_max_age = 31536000
229 ... s.hsts_include_subdomains = True
230 ... s.remove_server_headers = True
231 ... s.environment = 'production'
232 ... s.allowed_origins = ['https://example.com']
233 ... s.cors_allow_credentials = True
234 ... resp = asyncio.run(mw.dispatch(request, call_next))
235 >>> resp.headers['X-Content-Type-Options']
236 'nosniff'
237 >>> resp.headers['X-Frame-Options']
238 'DENY'
239 >>> 'Content-Security-Policy' in resp.headers
240 True
241 >>> resp.headers['Strict-Transport-Security'].startswith('max-age=')
242 True
243 >>> resp.headers['Access-Control-Allow-Origin']
244 'https://example.com'
245 >>> 'Vary' in resp.headers and 'Origin' in resp.headers['Vary']
246 True
247 """
248 response = await call_next(request)
250 # Only apply security headers if enabled
251 if not settings.security_headers_enabled:
252 return response
254 # Essential security headers (configurable)
255 if settings.x_content_type_options_enabled:
256 response.headers["X-Content-Type-Options"] = "nosniff"
258 # Handle X-Frame-Options: None/empty = don't set header (allow embedding), other values = set header
259 # Note: config validator normalizes ""/"null"/"none" to None, but we guard here too for safety
260 x_frame = settings.x_frame_options
261 if isinstance(x_frame, str) and not x_frame.strip():
262 x_frame = None
263 if x_frame is not None:
264 response.headers["X-Frame-Options"] = x_frame
266 if settings.x_xss_protection_enabled:
267 response.headers["X-XSS-Protection"] = "0" # Modern browsers use CSP instead
269 if settings.x_download_options_enabled:
270 response.headers["X-Download-Options"] = "noopen" # Prevent IE from executing downloads
272 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
274 # Content Security Policy
275 # This CSP is designed to work with the Admin UI while providing security
276 # Dynamically set frame-ancestors based on X_FRAME_OPTIONS setting to stay consistent
277 csp_directives = [
278 "default-src 'self'",
279 "script-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdnjs.cloudflare.com https://cdn.tailwindcss.com https://cdn.jsdelivr.net https://unpkg.com",
280 "style-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com https://cdn.jsdelivr.net",
281 "img-src 'self' data: https:",
282 "font-src 'self' data: https://cdnjs.cloudflare.com",
283 "connect-src 'self' ws: wss: https:",
284 ]
286 # Only add frame-ancestors if x_frame is set (None/empty = allow all embedding)
287 if x_frame is not None:
288 x_frame_upper = x_frame.upper()
290 if x_frame_upper == "DENY":
291 frame_ancestors = "'none'"
292 elif x_frame_upper == "SAMEORIGIN":
293 frame_ancestors = "'self'"
294 elif x_frame_upper.startswith("ALLOW-FROM"):
295 allowed_uri = x_frame.split(" ", 1)[1] if " " in x_frame else "'none'"
296 frame_ancestors = allowed_uri
297 elif x_frame_upper == "ALLOW-ALL":
298 frame_ancestors = "* file: http: https:"
299 else:
300 # Default to none for unknown values (matches DENY default)
301 frame_ancestors = "'none'"
303 csp_directives.append(f"frame-ancestors {frame_ancestors}")
304 response.headers["Content-Security-Policy"] = "; ".join(csp_directives) + ";"
306 # HSTS for HTTPS connections (configurable)
307 if settings.hsts_enabled and (request.url.scheme == "https" or request.headers.get("X-Forwarded-Proto") == "https"):
308 hsts_value = f"max-age={settings.hsts_max_age}"
309 if settings.hsts_include_subdomains:
310 hsts_value += "; includeSubDomains"
311 response.headers["Strict-Transport-Security"] = hsts_value
313 # Remove sensitive headers that might disclose server information (configurable)
314 if settings.remove_server_headers:
315 if "X-Powered-By" in response.headers:
316 del response.headers["X-Powered-By"]
317 if "Server" in response.headers:
318 del response.headers["Server"]
320 # Lightweight dynamic CORS reflection based on current settings
321 origin = request.headers.get("Origin")
322 if origin:
323 allow = False
324 if settings.environment != "production":
325 # In non-production, honor allowed_origins dynamically
326 allow = (not settings.allowed_origins) or (origin in settings.allowed_origins)
327 else:
328 # In production, require explicit allow-list
329 allow = origin in settings.allowed_origins
330 if allow:
331 response.headers["Access-Control-Allow-Origin"] = origin
332 # Standard CORS helpers
333 if settings.cors_allow_credentials:
334 response.headers["Access-Control-Allow-Credentials"] = "true"
335 # Expose common headers for clients
336 exposed = ["Content-Length", "X-Request-ID"]
337 response.headers["Access-Control-Expose-Headers"] = ", ".join(exposed)
338 # Ensure caches vary on Origin
339 existing_vary = response.headers.get("Vary")
340 vary_val = "Origin" if not existing_vary else (existing_vary + ", Origin")
341 response.headers["Vary"] = vary_val
343 return response