Coverage for mcpgateway / middleware / security_headers.py: 100%

61 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/middleware/security_headers.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Security Headers Middleware for MCP Gateway. 

8 

9This module implements essential security headers to prevent common attacks including 

10XSS, clickjacking, MIME sniffing, and cross-origin attacks. 

11""" 

12 

13# Third-Party 

14from starlette.middleware.base import BaseHTTPMiddleware 

15from starlette.requests import Request 

16from starlette.responses import Response 

17 

18# First-Party 

19from mcpgateway.config import settings 

20 

21 

22class SecurityHeadersMiddleware(BaseHTTPMiddleware): 

23 """ 

24 Security headers middleware that adds essential security headers to all responses. 

25 

26 This middleware implements security best practices by adding headers that help 

27 prevent various types of attacks and security vulnerabilities. 

28 

29 Security headers added: 

30 - X-Content-Type-Options: Prevents MIME type sniffing 

31 - X-Frame-Options: Prevents clickjacking attacks 

32 - X-XSS-Protection: Disables legacy XSS protection (modern browsers use CSP) 

33 - Referrer-Policy: Controls referrer information sent with requests 

34 - Content-Security-Policy: Prevents XSS and other code injection attacks 

35 - Strict-Transport-Security: Forces HTTPS connections (when appropriate) 

36 

37 Sensitive headers removed: 

38 - X-Powered-By: Removes server technology disclosure 

39 - Server: Removes server version information 

40 

41 Examples: 

42 >>> middleware = SecurityHeadersMiddleware(None) 

43 >>> isinstance(middleware, SecurityHeadersMiddleware) 

44 True 

45 >>> # Test CSP directive construction 

46 >>> csp_directives = [ 

47 ... "default-src 'self'", 

48 ... "script-src 'self' 'unsafe-inline'", 

49 ... "style-src 'self' 'unsafe-inline'" 

50 ... ] 

51 >>> csp = "; ".join(csp_directives) + ";" 

52 >>> "default-src 'self'" in csp 

53 True 

54 >>> csp.endswith(";") 

55 True 

56 >>> # Test HSTS value construction 

57 >>> hsts_max_age = 31536000 

58 >>> hsts_value = f"max-age={hsts_max_age}" 

59 >>> include_subdomains = True 

60 >>> if include_subdomains: 

61 ... hsts_value += "; includeSubDomains" 

62 >>> "max-age=31536000" in hsts_value 

63 True 

64 >>> "includeSubDomains" in hsts_value 

65 True 

66 >>> # Test CORS origin validation logic 

67 >>> allowed_origins = ["https://example.com", "https://app.example.com"] 

68 >>> origin = "https://example.com" 

69 >>> origin in allowed_origins 

70 True 

71 >>> "https://malicious.com" in allowed_origins 

72 False 

73 >>> # Test Vary header construction 

74 >>> existing_vary = "Accept-Encoding" 

75 >>> vary_val = "Origin" if not existing_vary else (existing_vary + ", Origin") 

76 >>> vary_val 

77 'Accept-Encoding, Origin' 

78 """ 

79 

80 async def dispatch(self, request: Request, call_next) -> Response: 

81 """ 

82 Process the request and add security headers to the response. 

83 

84 Args: 

85 request: The incoming HTTP request 

86 call_next: The next middleware or endpoint handler 

87 

88 Returns: 

89 Response with security headers added 

90 

91 Examples: 

92 Test middleware instantiation: 

93 >>> from mcpgateway.middleware.security_headers import SecurityHeadersMiddleware 

94 >>> middleware = SecurityHeadersMiddleware(app=None) 

95 >>> isinstance(middleware, SecurityHeadersMiddleware) 

96 True 

97 

98 Test security header values: 

99 >>> # X-Content-Type-Options 

100 >>> x_content_type = "nosniff" 

101 >>> x_content_type == "nosniff" 

102 True 

103 

104 >>> # X-XSS-Protection modern value 

105 >>> x_xss_protection = "0" # Modern browsers use CSP 

106 >>> x_xss_protection == "0" 

107 True 

108 

109 >>> # X-Download-Options for IE 

110 >>> x_download_options = "noopen" 

111 >>> x_download_options == "noopen" 

112 True 

113 

114 >>> # Referrer-Policy value 

115 >>> referrer_policy = "strict-origin-when-cross-origin" 

116 >>> "strict-origin" in referrer_policy 

117 True 

118 

119 Test CSP directive construction: 

120 >>> csp_directives = [ 

121 ... "default-src 'self'", 

122 ... "script-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdnjs.cloudflare.com", 

123 ... "style-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com", 

124 ... "img-src 'self' data: https:", 

125 ... "font-src 'self' data: https://cdnjs.cloudflare.com", 

126 ... "connect-src 'self' ws: wss: https:", 

127 ... "frame-ancestors 'self'", # Example for SAMEORIGIN 

128 ... ] 

129 >>> csp_header = "; ".join(csp_directives) + ";" 

130 >>> "default-src 'self'" in csp_header 

131 True 

132 >>> "frame-ancestors 'self'" in csp_header 

133 True 

134 >>> csp_header.endswith(";") 

135 True 

136 

137 Test HSTS header construction: 

138 >>> hsts_max_age = 31536000 # 1 year 

139 >>> hsts_value = f"max-age={hsts_max_age}" 

140 >>> hsts_include_subdomains = True 

141 >>> if hsts_include_subdomains: 

142 ... hsts_value += "; includeSubDomains" 

143 >>> "max-age=31536000" in hsts_value 

144 True 

145 >>> "includeSubDomains" in hsts_value 

146 True 

147 

148 Test CORS origin validation logic: 

149 >>> # Test allowed origins check 

150 >>> allowed_origins = ["https://example.com", "https://app.example.com"] 

151 >>> test_origin = "https://example.com" 

152 >>> test_origin in allowed_origins 

153 True 

154 >>> "https://malicious.com" in allowed_origins 

155 False 

156 

157 >>> # Test CORS credentials header 

158 >>> cors_allow_credentials = True 

159 >>> credentials_header = "true" if cors_allow_credentials else "false" 

160 >>> credentials_header == "true" 

161 True 

162 

163 Test Vary header construction: 

164 >>> # Test with no existing Vary header 

165 >>> existing_vary = None 

166 >>> vary_val = "Origin" if not existing_vary else (existing_vary + ", Origin") 

167 >>> vary_val 

168 'Origin' 

169 

170 >>> # Test with existing Vary header 

171 >>> existing_vary = "Accept-Encoding" 

172 >>> vary_val = "Origin" if not existing_vary else (existing_vary + ", Origin") 

173 >>> vary_val 

174 'Accept-Encoding, Origin' 

175 

176 Test Access-Control-Expose-Headers: 

177 >>> exposed_headers = ["Content-Length", "X-Request-ID"] 

178 >>> expose_header_value = ", ".join(exposed_headers) 

179 >>> "Content-Length" in expose_header_value 

180 True 

181 >>> "X-Request-ID" in expose_header_value 

182 True 

183 

184 Test server header removal logic: 

185 >>> # Headers that should be removed 

186 >>> sensitive_headers = ["X-Powered-By", "Server"] 

187 >>> "X-Powered-By" in sensitive_headers 

188 True 

189 >>> "Server" in sensitive_headers 

190 True 

191 

192 Test environment-based CORS logic: 

193 >>> # Production environment requires explicit allowlist 

194 >>> environment = "production" 

195 >>> origin = "https://example.com" 

196 >>> allowed_origins = ["https://example.com"] 

197 >>> allow = origin in allowed_origins if environment == "production" else True 

198 >>> allow 

199 True 

200 

201 >>> # Non-production with empty allowed_origins allows all 

202 >>> environment = "development" 

203 >>> allowed_origins = [] 

204 >>> allow = (not allowed_origins) if environment != "production" else False 

205 >>> allow 

206 True 

207 

208 Execute middleware end-to-end with a dummy call_next: 

209 >>> import asyncio 

210 >>> from unittest.mock import patch 

211 >>> from starlette.requests import Request 

212 >>> from starlette.responses import Response 

213 >>> async def call_next(req): 

214 ... return Response("ok") 

215 >>> scope = { 

216 ... 'type': 'http', 'method': 'GET', 'path': '/', 'scheme': 'https', 

217 ... 'headers': [(b'origin', b'https://example.com'), (b'x-forwarded-proto', b'https')] 

218 ... } 

219 >>> request = Request(scope) 

220 >>> mw = SecurityHeadersMiddleware(app=None) 

221 >>> with patch('mcpgateway.middleware.security_headers.settings') as s: 

222 ... s.security_headers_enabled = True 

223 ... s.x_content_type_options_enabled = True 

224 ... s.x_frame_options = 'DENY' 

225 ... s.x_xss_protection_enabled = True 

226 ... s.x_download_options_enabled = True 

227 ... s.hsts_enabled = True 

228 ... s.hsts_max_age = 31536000 

229 ... s.hsts_include_subdomains = True 

230 ... s.remove_server_headers = True 

231 ... s.environment = 'production' 

232 ... s.allowed_origins = ['https://example.com'] 

233 ... s.cors_allow_credentials = True 

234 ... resp = asyncio.run(mw.dispatch(request, call_next)) 

235 >>> resp.headers['X-Content-Type-Options'] 

236 'nosniff' 

237 >>> resp.headers['X-Frame-Options'] 

238 'DENY' 

239 >>> 'Content-Security-Policy' in resp.headers 

240 True 

241 >>> resp.headers['Strict-Transport-Security'].startswith('max-age=') 

242 True 

243 >>> resp.headers['Access-Control-Allow-Origin'] 

244 'https://example.com' 

245 >>> 'Vary' in resp.headers and 'Origin' in resp.headers['Vary'] 

246 True 

247 """ 

248 response = await call_next(request) 

249 

250 # Only apply security headers if enabled 

251 if not settings.security_headers_enabled: 

252 return response 

253 

254 # Essential security headers (configurable) 

255 if settings.x_content_type_options_enabled: 

256 response.headers["X-Content-Type-Options"] = "nosniff" 

257 

258 # Handle X-Frame-Options: None = don't set header, empty string = allow all, other values = set header 

259 if settings.x_frame_options is not None: 

260 if settings.x_frame_options: # Non-empty string 

261 response.headers["X-Frame-Options"] = settings.x_frame_options 

262 # Empty string means user wants to disable the header (allow all frames) 

263 # Don't set the header in this case 

264 

265 if settings.x_xss_protection_enabled: 

266 response.headers["X-XSS-Protection"] = "0" # Modern browsers use CSP instead 

267 

268 if settings.x_download_options_enabled: 

269 response.headers["X-Download-Options"] = "noopen" # Prevent IE from executing downloads 

270 

271 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" 

272 

273 # Content Security Policy 

274 # This CSP is designed to work with the Admin UI while providing security 

275 # Dynamically set frame-ancestors based on X_FRAME_OPTIONS setting to stay consistent 

276 csp_directives = [ 

277 "default-src 'self'", 

278 "script-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdnjs.cloudflare.com https://cdn.tailwindcss.com https://cdn.jsdelivr.net https://unpkg.com", 

279 "style-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com https://cdn.jsdelivr.net", 

280 "img-src 'self' data: https:", 

281 "font-src 'self' data: https://cdnjs.cloudflare.com", 

282 "connect-src 'self' ws: wss: https:", 

283 ] 

284 

285 # Only add frame-ancestors if x_frame_options is not None 

286 # When None (or "null"/"none" string), completely disable iframe restrictions 

287 if settings.x_frame_options is not None: 

288 x_frame = str(settings.x_frame_options) 

289 x_frame_upper = x_frame.upper() 

290 

291 if x_frame_upper == "DENY": 

292 frame_ancestors = "'none'" 

293 elif x_frame_upper == "SAMEORIGIN": 

294 frame_ancestors = "'self'" 

295 elif x_frame_upper.startswith("ALLOW-FROM"): 

296 allowed_uri = x_frame.split(" ", 1)[1] if " " in x_frame else "'none'" 

297 frame_ancestors = allowed_uri 

298 elif x_frame_upper == "ALLOW-ALL": 

299 frame_ancestors = "* file: http: https:" 

300 else: 

301 # Default to none for unknown values (matches DENY default) 

302 frame_ancestors = "'none'" 

303 

304 csp_directives.append(f"frame-ancestors {frame_ancestors}") 

305 response.headers["Content-Security-Policy"] = "; ".join(csp_directives) + ";" 

306 

307 # HSTS for HTTPS connections (configurable) 

308 if settings.hsts_enabled and (request.url.scheme == "https" or request.headers.get("X-Forwarded-Proto") == "https"): 

309 hsts_value = f"max-age={settings.hsts_max_age}" 

310 if settings.hsts_include_subdomains: 

311 hsts_value += "; includeSubDomains" 

312 response.headers["Strict-Transport-Security"] = hsts_value 

313 

314 # Remove sensitive headers that might disclose server information (configurable) 

315 if settings.remove_server_headers: 

316 if "X-Powered-By" in response.headers: 

317 del response.headers["X-Powered-By"] 

318 if "Server" in response.headers: 

319 del response.headers["Server"] 

320 

321 # Lightweight dynamic CORS reflection based on current settings 

322 origin = request.headers.get("Origin") 

323 if origin: 

324 allow = False 

325 if settings.environment != "production": 

326 # In non-production, honor allowed_origins dynamically 

327 allow = (not settings.allowed_origins) or (origin in settings.allowed_origins) 

328 else: 

329 # In production, require explicit allow-list 

330 allow = origin in settings.allowed_origins 

331 if allow: 

332 response.headers["Access-Control-Allow-Origin"] = origin 

333 # Standard CORS helpers 

334 if settings.cors_allow_credentials: 

335 response.headers["Access-Control-Allow-Credentials"] = "true" 

336 # Expose common headers for clients 

337 exposed = ["Content-Length", "X-Request-ID"] 

338 response.headers["Access-Control-Expose-Headers"] = ", ".join(exposed) 

339 # Ensure caches vary on Origin 

340 existing_vary = response.headers.get("Vary") 

341 vary_val = "Origin" if not existing_vary else (existing_vary + ", Origin") 

342 response.headers["Vary"] = vary_val 

343 

344 return response