Coverage for mcpgateway / middleware / security_headers.py: 100%

62 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/middleware/security_headers.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Security Headers Middleware for ContextForge. 

8 

9This module implements essential security headers to prevent common attacks including 

10XSS, clickjacking, MIME sniffing, and cross-origin attacks. 

11""" 

12 

13# Third-Party 

14from starlette.middleware.base import BaseHTTPMiddleware 

15from starlette.requests import Request 

16from starlette.responses import Response 

17 

18# First-Party 

19from mcpgateway.config import settings 

20 

21 

22class SecurityHeadersMiddleware(BaseHTTPMiddleware): 

23 """ 

24 Security headers middleware that adds essential security headers to all responses. 

25 

26 This middleware implements security best practices by adding headers that help 

27 prevent various types of attacks and security vulnerabilities. 

28 

29 Security headers added: 

30 - X-Content-Type-Options: Prevents MIME type sniffing 

31 - X-Frame-Options: Prevents clickjacking attacks 

32 - X-XSS-Protection: Disables legacy XSS protection (modern browsers use CSP) 

33 - Referrer-Policy: Controls referrer information sent with requests 

34 - Content-Security-Policy: Prevents XSS and other code injection attacks 

35 - Strict-Transport-Security: Forces HTTPS connections (when appropriate) 

36 

37 Sensitive headers removed: 

38 - X-Powered-By: Removes server technology disclosure 

39 - Server: Removes server version information 

40 

41 Examples: 

42 >>> middleware = SecurityHeadersMiddleware(None) 

43 >>> isinstance(middleware, SecurityHeadersMiddleware) 

44 True 

45 >>> # Test CSP directive construction 

46 >>> csp_directives = [ 

47 ... "default-src 'self'", 

48 ... "script-src 'self' 'unsafe-inline'", 

49 ... "style-src 'self' 'unsafe-inline'" 

50 ... ] 

51 >>> csp = "; ".join(csp_directives) + ";" 

52 >>> "default-src 'self'" in csp 

53 True 

54 >>> csp.endswith(";") 

55 True 

56 >>> # Test HSTS value construction 

57 >>> hsts_max_age = 31536000 

58 >>> hsts_value = f"max-age={hsts_max_age}" 

59 >>> include_subdomains = True 

60 >>> if include_subdomains: 

61 ... hsts_value += "; includeSubDomains" 

62 >>> "max-age=31536000" in hsts_value 

63 True 

64 >>> "includeSubDomains" in hsts_value 

65 True 

66 >>> # Test CORS origin validation logic 

67 >>> allowed_origins = ["https://example.com", "https://app.example.com"] 

68 >>> origin = "https://example.com" 

69 >>> origin in allowed_origins 

70 True 

71 >>> "https://malicious.com" in allowed_origins 

72 False 

73 >>> # Test Vary header construction 

74 >>> existing_vary = "Accept-Encoding" 

75 >>> vary_val = "Origin" if not existing_vary else (existing_vary + ", Origin") 

76 >>> vary_val 

77 'Accept-Encoding, Origin' 

78 """ 

79 

80 async def dispatch(self, request: Request, call_next) -> Response: 

81 """ 

82 Process the request and add security headers to the response. 

83 

84 Args: 

85 request: The incoming HTTP request 

86 call_next: The next middleware or endpoint handler 

87 

88 Returns: 

89 Response with security headers added 

90 

91 Examples: 

92 Test middleware instantiation: 

93 >>> from mcpgateway.middleware.security_headers import SecurityHeadersMiddleware 

94 >>> middleware = SecurityHeadersMiddleware(app=None) 

95 >>> isinstance(middleware, SecurityHeadersMiddleware) 

96 True 

97 

98 Test security header values: 

99 >>> # X-Content-Type-Options 

100 >>> x_content_type = "nosniff" 

101 >>> x_content_type == "nosniff" 

102 True 

103 

104 >>> # X-XSS-Protection modern value 

105 >>> x_xss_protection = "0" # Modern browsers use CSP 

106 >>> x_xss_protection == "0" 

107 True 

108 

109 >>> # X-Download-Options for IE 

110 >>> x_download_options = "noopen" 

111 >>> x_download_options == "noopen" 

112 True 

113 

114 >>> # Referrer-Policy value 

115 >>> referrer_policy = "strict-origin-when-cross-origin" 

116 >>> "strict-origin" in referrer_policy 

117 True 

118 

119 Test CSP directive construction: 

120 >>> csp_directives = [ 

121 ... "default-src 'self'", 

122 ... "script-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdnjs.cloudflare.com", 

123 ... "style-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com", 

124 ... "img-src 'self' data: https:", 

125 ... "font-src 'self' data: https://cdnjs.cloudflare.com", 

126 ... "connect-src 'self' ws: wss: https:", 

127 ... "frame-ancestors 'self'", # Example for SAMEORIGIN 

128 ... ] 

129 >>> csp_header = "; ".join(csp_directives) + ";" 

130 >>> "default-src 'self'" in csp_header 

131 True 

132 >>> "frame-ancestors 'self'" in csp_header 

133 True 

134 >>> csp_header.endswith(";") 

135 True 

136 

137 Test HSTS header construction: 

138 >>> hsts_max_age = 31536000 # 1 year 

139 >>> hsts_value = f"max-age={hsts_max_age}" 

140 >>> hsts_include_subdomains = True 

141 >>> if hsts_include_subdomains: 

142 ... hsts_value += "; includeSubDomains" 

143 >>> "max-age=31536000" in hsts_value 

144 True 

145 >>> "includeSubDomains" in hsts_value 

146 True 

147 

148 Test CORS origin validation logic: 

149 >>> # Test allowed origins check 

150 >>> allowed_origins = ["https://example.com", "https://app.example.com"] 

151 >>> test_origin = "https://example.com" 

152 >>> test_origin in allowed_origins 

153 True 

154 >>> "https://malicious.com" in allowed_origins 

155 False 

156 

157 >>> # Test CORS credentials header 

158 >>> cors_allow_credentials = True 

159 >>> credentials_header = "true" if cors_allow_credentials else "false" 

160 >>> credentials_header == "true" 

161 True 

162 

163 Test Vary header construction: 

164 >>> # Test with no existing Vary header 

165 >>> existing_vary = None 

166 >>> vary_val = "Origin" if not existing_vary else (existing_vary + ", Origin") 

167 >>> vary_val 

168 'Origin' 

169 

170 >>> # Test with existing Vary header 

171 >>> existing_vary = "Accept-Encoding" 

172 >>> vary_val = "Origin" if not existing_vary else (existing_vary + ", Origin") 

173 >>> vary_val 

174 'Accept-Encoding, Origin' 

175 

176 Test Access-Control-Expose-Headers: 

177 >>> exposed_headers = ["Content-Length", "X-Request-ID"] 

178 >>> expose_header_value = ", ".join(exposed_headers) 

179 >>> "Content-Length" in expose_header_value 

180 True 

181 >>> "X-Request-ID" in expose_header_value 

182 True 

183 

184 Test server header removal logic: 

185 >>> # Headers that should be removed 

186 >>> sensitive_headers = ["X-Powered-By", "Server"] 

187 >>> "X-Powered-By" in sensitive_headers 

188 True 

189 >>> "Server" in sensitive_headers 

190 True 

191 

192 Test environment-based CORS logic: 

193 >>> # Production environment requires explicit allowlist 

194 >>> environment = "production" 

195 >>> origin = "https://example.com" 

196 >>> allowed_origins = ["https://example.com"] 

197 >>> allow = origin in allowed_origins if environment == "production" else True 

198 >>> allow 

199 True 

200 

201 >>> # Non-production with empty allowed_origins allows all 

202 >>> environment = "development" 

203 >>> allowed_origins = [] 

204 >>> allow = (not allowed_origins) if environment != "production" else False 

205 >>> allow 

206 True 

207 

208 Execute middleware end-to-end with a dummy call_next: 

209 >>> import asyncio 

210 >>> from unittest.mock import patch 

211 >>> from starlette.requests import Request 

212 >>> from starlette.responses import Response 

213 >>> async def call_next(req): 

214 ... return Response("ok") 

215 >>> scope = { 

216 ... 'type': 'http', 'method': 'GET', 'path': '/', 'scheme': 'https', 

217 ... 'headers': [(b'origin', b'https://example.com'), (b'x-forwarded-proto', b'https')] 

218 ... } 

219 >>> request = Request(scope) 

220 >>> mw = SecurityHeadersMiddleware(app=None) 

221 >>> with patch('mcpgateway.middleware.security_headers.settings') as s: 

222 ... s.security_headers_enabled = True 

223 ... s.x_content_type_options_enabled = True 

224 ... s.x_frame_options = 'DENY' 

225 ... s.x_xss_protection_enabled = True 

226 ... s.x_download_options_enabled = True 

227 ... s.hsts_enabled = True 

228 ... s.hsts_max_age = 31536000 

229 ... s.hsts_include_subdomains = True 

230 ... s.remove_server_headers = True 

231 ... s.environment = 'production' 

232 ... s.allowed_origins = ['https://example.com'] 

233 ... s.cors_allow_credentials = True 

234 ... resp = asyncio.run(mw.dispatch(request, call_next)) 

235 >>> resp.headers['X-Content-Type-Options'] 

236 'nosniff' 

237 >>> resp.headers['X-Frame-Options'] 

238 'DENY' 

239 >>> 'Content-Security-Policy' in resp.headers 

240 True 

241 >>> resp.headers['Strict-Transport-Security'].startswith('max-age=') 

242 True 

243 >>> resp.headers['Access-Control-Allow-Origin'] 

244 'https://example.com' 

245 >>> 'Vary' in resp.headers and 'Origin' in resp.headers['Vary'] 

246 True 

247 """ 

248 response = await call_next(request) 

249 

250 # Only apply security headers if enabled 

251 if not settings.security_headers_enabled: 

252 return response 

253 

254 # Essential security headers (configurable) 

255 if settings.x_content_type_options_enabled: 

256 response.headers["X-Content-Type-Options"] = "nosniff" 

257 

258 # Handle X-Frame-Options: None/empty = don't set header (allow embedding), other values = set header 

259 # Note: config validator normalizes ""/"null"/"none" to None, but we guard here too for safety 

260 x_frame = settings.x_frame_options 

261 if isinstance(x_frame, str) and not x_frame.strip(): 

262 x_frame = None 

263 if x_frame is not None: 

264 response.headers["X-Frame-Options"] = x_frame 

265 

266 if settings.x_xss_protection_enabled: 

267 response.headers["X-XSS-Protection"] = "0" # Modern browsers use CSP instead 

268 

269 if settings.x_download_options_enabled: 

270 response.headers["X-Download-Options"] = "noopen" # Prevent IE from executing downloads 

271 

272 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" 

273 

274 # Content Security Policy 

275 # This CSP is designed to work with the Admin UI while providing security 

276 # Dynamically set frame-ancestors based on X_FRAME_OPTIONS setting to stay consistent 

277 csp_directives = [ 

278 "default-src 'self'", 

279 "script-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdnjs.cloudflare.com https://cdn.tailwindcss.com https://cdn.jsdelivr.net https://unpkg.com", 

280 "style-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com https://cdn.jsdelivr.net", 

281 "img-src 'self' data: https:", 

282 "font-src 'self' data: https://cdnjs.cloudflare.com", 

283 "connect-src 'self' ws: wss: https:", 

284 ] 

285 

286 # Only add frame-ancestors if x_frame is set (None/empty = allow all embedding) 

287 if x_frame is not None: 

288 x_frame_upper = x_frame.upper() 

289 

290 if x_frame_upper == "DENY": 

291 frame_ancestors = "'none'" 

292 elif x_frame_upper == "SAMEORIGIN": 

293 frame_ancestors = "'self'" 

294 elif x_frame_upper.startswith("ALLOW-FROM"): 

295 allowed_uri = x_frame.split(" ", 1)[1] if " " in x_frame else "'none'" 

296 frame_ancestors = allowed_uri 

297 elif x_frame_upper == "ALLOW-ALL": 

298 frame_ancestors = "* file: http: https:" 

299 else: 

300 # Default to none for unknown values (matches DENY default) 

301 frame_ancestors = "'none'" 

302 

303 csp_directives.append(f"frame-ancestors {frame_ancestors}") 

304 response.headers["Content-Security-Policy"] = "; ".join(csp_directives) + ";" 

305 

306 # HSTS for HTTPS connections (configurable) 

307 if settings.hsts_enabled and (request.url.scheme == "https" or request.headers.get("X-Forwarded-Proto") == "https"): 

308 hsts_value = f"max-age={settings.hsts_max_age}" 

309 if settings.hsts_include_subdomains: 

310 hsts_value += "; includeSubDomains" 

311 response.headers["Strict-Transport-Security"] = hsts_value 

312 

313 # Remove sensitive headers that might disclose server information (configurable) 

314 if settings.remove_server_headers: 

315 if "X-Powered-By" in response.headers: 

316 del response.headers["X-Powered-By"] 

317 if "Server" in response.headers: 

318 del response.headers["Server"] 

319 

320 # Lightweight dynamic CORS reflection based on current settings 

321 origin = request.headers.get("Origin") 

322 if origin: 

323 allow = False 

324 if settings.environment != "production": 

325 # In non-production, honor allowed_origins dynamically 

326 allow = (not settings.allowed_origins) or (origin in settings.allowed_origins) 

327 else: 

328 # In production, require explicit allow-list 

329 allow = origin in settings.allowed_origins 

330 if allow: 

331 response.headers["Access-Control-Allow-Origin"] = origin 

332 # Standard CORS helpers 

333 if settings.cors_allow_credentials: 

334 response.headers["Access-Control-Allow-Credentials"] = "true" 

335 # Expose common headers for clients 

336 exposed = ["Content-Length", "X-Request-ID"] 

337 response.headers["Access-Control-Expose-Headers"] = ", ".join(exposed) 

338 # Ensure caches vary on Origin 

339 existing_vary = response.headers.get("Vary") 

340 vary_val = "Origin" if not existing_vary else (existing_vary + ", Origin") 

341 response.headers["Vary"] = vary_val 

342 

343 return response