Coverage for mcpgateway / services / content_security.py: 100%

98 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/content_security.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5 

6Content Security Service for ContextForge. 

7Provides validation for user-submitted content including size limits, 

8MIME type restrictions, and malicious pattern detection. 

9 

10This module implements Content Size Limits and MIME Type Restrictions (US-2) 

11from issue #538. 

12""" 

13 

14# Standard 

15import hashlib 

16import logging 

17import threading 

18from typing import List, Optional, Union 

19 

20# First-Party 

21from mcpgateway.config import settings 

22 

23# Import metrics with error handling for test environments 

24try: 

25 # First-Party 

26 from mcpgateway.services.metrics import content_size_violations_counter, content_type_violations_counter 

27except ImportError: 

28 # Metrics not available in test environment - create no-op counters 

29 class NoOpCounter: 

30 """No-op counter for test environments where metrics are unavailable.""" 

31 

32 def labels(self, **_kwargs): 

33 """Return self to allow method chaining. 

34 

35 Args: 

36 **_kwargs: Arbitrary keyword arguments (ignored) 

37 

38 Returns: 

39 self: Returns self for method chaining 

40 """ 

41 return self 

42 

43 def inc(self, _amount=1): 

44 """No-op increment method.""" 

45 

46 content_size_violations_counter = NoOpCounter() 

47 content_type_violations_counter = NoOpCounter() 

48 

49logger = logging.getLogger(__name__) 

50 

51 

52def _sanitize_pii_for_logging(user_email: Optional[str] = None, ip_address: Optional[str] = None) -> dict: 

53 """Sanitize PII data for secure logging. 

54 

55 Args: 

56 user_email: User email to sanitize (returns first 8 chars of SHA256 hash) 

57 ip_address: IP address to sanitize (masks last octet) 

58 

59 Returns: 

60 Dictionary with sanitized values suitable for logging 

61 

62 Examples: 

63 >>> result = _sanitize_pii_for_logging("user@example.com", "192.168.1.100") 

64 >>> 'user_hash' in result and 'ip_subnet' in result 

65 True 

66 >>> result = _sanitize_pii_for_logging(None, None) 

67 >>> result 

68 {'user_hash': None, 'ip_subnet': None} 

69 """ 

70 user_hash = None 

71 if user_email: 

72 user_hash = hashlib.sha256(user_email.encode()).hexdigest()[:8] 

73 

74 ip_subnet = None 

75 if ip_address: 

76 # Mask last octet for IPv4, or last segment for IPv6 

77 if ":" in ip_address: # IPv6 

78 parts = ip_address.split(":") 

79 ip_subnet = ":".join(parts[:-1]) + ":xxxx" 

80 else: # IPv4 

81 ip_subnet = ip_address.rsplit(".", 1)[0] + ".xxx" 

82 

83 return {"user_hash": user_hash, "ip_subnet": ip_subnet} 

84 

85 

86def _format_bytes(bytes_val: int) -> str: 

87 """Format bytes as human-readable size. 

88 

89 Args: 

90 bytes_val: Size in bytes 

91 

92 Returns: 

93 Human-readable size string (e.g., "195.3 KB") 

94 

95 Examples: 

96 >>> _format_bytes(1024) 

97 '1.0 KB' 

98 >>> _format_bytes(1536) 

99 '1.5 KB' 

100 >>> _format_bytes(1048576) 

101 '1.0 MB' 

102 >>> _format_bytes(500) 

103 '500 B' 

104 """ 

105 if bytes_val < 1024: 

106 return f"{bytes_val} B" 

107 

108 size_kb = bytes_val / 1024.0 

109 if size_kb < 1024: 

110 return f"{size_kb:.1f} KB" 

111 

112 size_mb = size_kb / 1024.0 

113 if size_mb < 1024: 

114 return f"{size_mb:.1f} MB" 

115 

116 size_gb = size_mb / 1024.0 

117 return f"{size_gb:.1f} GB" 

118 

119 

120class ContentSizeError(Exception): 

121 """Raised when content exceeds size limits.""" 

122 

123 def __init__(self, content_type: str, actual_size: int, max_size: int): 

124 """Initialize ContentSizeError with size details. 

125 

126 Args: 

127 content_type: Type of content (e.g., "Resource content", "Prompt template") 

128 actual_size: Actual size of the content in bytes 

129 max_size: Maximum allowed size in bytes 

130 """ 

131 self.content_type = content_type 

132 self.actual_size = actual_size 

133 self.max_size = max_size 

134 

135 # Format sizes for human readability 

136 actual_formatted = _format_bytes(actual_size) 

137 max_formatted = _format_bytes(max_size) 

138 

139 super().__init__(f"{content_type} size ({actual_formatted}) exceeds " f"maximum allowed size ({max_formatted})") 

140 

141 

142class ContentTypeError(Exception): 

143 """Raised when a resource MIME type is not in the allowed list.""" 

144 

145 def __init__(self, mime_type: str, allowed_types: List[str]): 

146 """Initialize ContentTypeError with MIME type details. 

147 

148 Args: 

149 mime_type: The disallowed MIME type that was submitted 

150 allowed_types: List of allowed MIME types from configuration 

151 

152 Examples: 

153 >>> err = ContentTypeError("application/evil", ["text/plain", "text/markdown"]) 

154 >>> err.mime_type 

155 'application/evil' 

156 >>> err.allowed_types 

157 ['text/plain', 'text/markdown'] 

158 >>> "application/evil" in str(err) 

159 True 

160 """ 

161 self.mime_type = mime_type 

162 self.allowed_types = allowed_types 

163 

164 # Show up to 5 allowed types in the message for readability 

165 display = ", ".join(allowed_types[:5]) 

166 if len(allowed_types) > 5: 

167 display += f", ... ({len(allowed_types)} total)" 

168 

169 super().__init__(f"MIME type '{mime_type}' is not allowed. Allowed types: {display}") 

170 

171 

172class ContentSecurityService: 

173 """Service for validating content security constraints. 

174 

175 This service provides validation for: 

176 - Content size limits (US-1) 

177 - MIME type restrictions (US-2) 

178 - Malicious pattern detection (US-3, future) 

179 - Template syntax validation (US-4, future) 

180 

181 Examples: 

182 >>> service = ContentSecurityService() 

183 >>> service.validate_resource_size("x" * 50000) # 50KB - OK 

184 >>> try: 

185 ... service.validate_resource_size("x" * 200000) # 200KB - Too large 

186 ... except ContentSizeError as e: 

187 ... print(f"Error: {e.actual_size} > {e.max_size}") 

188 Error: 200000 > 102400 

189 """ 

190 

191 def __init__(self): 

192 """Initialize the content security service.""" 

193 self.max_resource_size = settings.content_max_resource_size 

194 self.max_prompt_size = settings.content_max_prompt_size 

195 logger.info( 

196 "ContentSecurityService initialized", 

197 extra={ 

198 "max_resource_size": self.max_resource_size, 

199 "max_prompt_size": self.max_prompt_size, 

200 "strict_mime_validation": settings.content_strict_mime_validation, 

201 "allowed_resource_mimetypes_count": len(settings.content_allowed_resource_mimetypes), 

202 }, 

203 ) 

204 

205 def validate_resource_size(self, content: Union[str, bytes], uri: Optional[str] = None, user_email: Optional[str] = None, ip_address: Optional[str] = None) -> None: 

206 """Validate resource content size. 

207 

208 Args: 

209 content: The resource content to validate (string or bytes) 

210 uri: Optional resource URI for logging 

211 user_email: Optional user email for logging 

212 ip_address: Optional IP address for logging 

213 

214 Raises: 

215 ContentSizeError: If content exceeds maximum size 

216 

217 Examples: 

218 >>> service = ContentSecurityService() 

219 >>> service.validate_resource_size("small content") # OK 

220 >>> try: 

221 ... service.validate_resource_size("x" * 200000) 

222 ... except ContentSizeError: 

223 ... print("Too large") 

224 Too large 

225 """ 

226 content_bytes = content.encode("utf-8") if isinstance(content, str) else content 

227 actual_size = len(content_bytes) 

228 

229 if actual_size > self.max_resource_size: 

230 # Increment Prometheus metric 

231 content_size_violations_counter.labels(content_type="resource").inc() 

232 

233 # Log security violation with sanitized PII 

234 sanitized = _sanitize_pii_for_logging(user_email, ip_address) 

235 logger.warning( 

236 "Resource size limit exceeded", extra={"actual_size": actual_size, "max_size": self.max_resource_size, "content_type": "resource", "uri_provided": uri is not None, **sanitized} 

237 ) 

238 raise ContentSizeError("Resource content", actual_size, self.max_resource_size) 

239 

240 logger.debug(f"Resource size validation passed: {actual_size} bytes") 

241 

242 def validate_prompt_size(self, template: str, name: Optional[str] = None, user_email: Optional[str] = None, ip_address: Optional[str] = None) -> None: 

243 """Validate prompt template size. 

244 

245 Args: 

246 template: The prompt template to validate 

247 name: Optional prompt name for logging 

248 user_email: Optional user email for logging 

249 ip_address: Optional IP address for logging 

250 

251 Raises: 

252 ContentSizeError: If template exceeds maximum size 

253 

254 Examples: 

255 >>> service = ContentSecurityService() 

256 >>> service.validate_prompt_size("Hello {{user}}") # OK 

257 >>> try: 

258 ... service.validate_prompt_size("x" * 20000) 

259 ... except ContentSizeError: 

260 ... print("Too large") 

261 Too large 

262 """ 

263 template_bytes = template.encode("utf-8") if isinstance(template, str) else template 

264 actual_size = len(template_bytes) 

265 

266 if actual_size > self.max_prompt_size: 

267 # Increment Prometheus metric 

268 content_size_violations_counter.labels(content_type="prompt").inc() 

269 

270 # Log security violation with sanitized PII 

271 sanitized = _sanitize_pii_for_logging(user_email, ip_address) 

272 logger.warning("Prompt size limit exceeded", extra={"actual_size": actual_size, "max_size": self.max_prompt_size, "content_type": "prompt", "name_provided": name is not None, **sanitized}) 

273 raise ContentSizeError("Prompt template", actual_size, self.max_prompt_size) 

274 

275 logger.debug(f"Prompt size validation passed: {actual_size} bytes") 

276 

277 def validate_resource_mime_type( 

278 self, 

279 mime_type: Optional[str], 

280 uri: Optional[str] = None, 

281 user_email: Optional[str] = None, 

282 ip_address: Optional[str] = None, 

283 ) -> None: 

284 """Validate a resource MIME type against the configured allowlist. 

285 

286 When :attr:`~mcpgateway.config.Settings.content_strict_mime_validation` 

287 is ``True``, only MIME types explicitly listed in the allowlist are accepted. 

288 This includes vendor types (``application/x-*``, ``text/x-*``) and 

289 structured-syntax suffix types (e.g. ``application/vnd.api+json``) which 

290 must be explicitly added to the allowlist if needed. 

291 

292 When :attr:`~mcpgateway.config.Settings.content_strict_mime_validation` 

293 is ``False`` the method logs a warning but does **not** raise, enabling 

294 a log-only migration mode. 

295 

296 Args: 

297 mime_type: The MIME type declared by the caller. ``None`` or empty 

298 string is accepted without validation. 

299 uri: Optional resource URI included in log output (not logged raw). 

300 user_email: Optional user e-mail for PII-safe audit logging. 

301 ip_address: Optional client IP for PII-safe audit logging. 

302 

303 Raises: 

304 ContentTypeError: If ``mime_type`` is not in the allowlist and 

305 ``content_strict_mime_validation`` is ``True``. 

306 

307 Examples: 

308 >>> service = ContentSecurityService() 

309 >>> service.validate_resource_mime_type("text/plain") # OK if in allowlist 

310 >>> service.validate_resource_mime_type(None) # OK - no type declared 

311 >>> from unittest.mock import patch 

312 >>> with patch("mcpgateway.services.content_security.settings") as mock_settings: 

313 ... mock_settings.content_strict_mime_validation = True 

314 ... mock_settings.content_allowed_resource_mimetypes = ["text/plain"] 

315 ... try: 

316 ... service.validate_resource_mime_type("application/evil") 

317 ... except ContentTypeError as e: 

318 ... print("blocked:", e.mime_type) 

319 blocked: application/evil 

320 >>> # Vendor types must be explicitly in allowlist 

321 >>> with patch("mcpgateway.services.content_security.settings") as mock_settings: 

322 ... mock_settings.content_strict_mime_validation = True 

323 ... mock_settings.content_allowed_resource_mimetypes = ["text/plain"] 

324 ... try: 

325 ... service.validate_resource_mime_type("application/x-custom") 

326 ... except ContentTypeError as e: 

327 ... print("vendor type blocked:", e.mime_type) 

328 vendor type blocked: application/x-custom 

329 """ 

330 # Allow absent MIME types - callers may omit the field legitimately 

331 if not mime_type: 

332 return 

333 

334 allowed_types: List[str] = settings.content_allowed_resource_mimetypes 

335 strict = settings.content_strict_mime_validation 

336 

337 # Strip parameters from MIME type for comparison (e.g., "text/plain; charset=utf-8" -> "text/plain") 

338 base_mime_type = mime_type.split(";")[0].strip() 

339 

340 # Fast path: exact match in allowlist (check both full and base MIME type) 

341 if mime_type in allowed_types or base_mime_type in allowed_types: 

342 logger.debug("Resource MIME type validation passed: %s", mime_type) 

343 return 

344 

345 # Violation detected — always increment metric and log regardless of mode. 

346 # In strict mode, also raise to block the request. 

347 content_type_violations_counter.labels(content_type="resource").inc() 

348 

349 sanitized = _sanitize_pii_for_logging(user_email, ip_address) 

350 logger.warning( 

351 "Resource MIME type not in allowlist%s", 

352 " (log-only mode, not blocking)" if not strict else "", 

353 extra={ 

354 "mime_type": mime_type, 

355 "allowed_count": len(allowed_types), 

356 "uri_provided": uri is not None, 

357 "strict": strict, 

358 **sanitized, 

359 }, 

360 ) 

361 

362 if strict: 

363 raise ContentTypeError(mime_type, allowed_types) 

364 

365 

366# Singleton instance with thread-safe initialization 

367_content_security_service: Optional[ContentSecurityService] = None 

368_content_security_service_lock = threading.Lock() 

369 

370 

371def get_content_security_service() -> ContentSecurityService: 

372 """Get or create the singleton ContentSecurityService instance. 

373 

374 Thread-safe singleton implementation using double-checked locking pattern 

375 to prevent race conditions (CWE-362). 

376 

377 Returns: 

378 ContentSecurityService: The singleton instance 

379 

380 Examples: 

381 >>> service1 = get_content_security_service() 

382 >>> service2 = get_content_security_service() 

383 >>> service1 is service2 

384 True 

385 """ 

386 global _content_security_service # pylint: disable=global-statement 

387 

388 # First check (without lock for performance) 

389 if _content_security_service is None: 

390 # Acquire lock for thread-safe initialization 

391 with _content_security_service_lock: 

392 # Second check (with lock to prevent race condition) 

393 if _content_security_service is None: 

394 _content_security_service = ContentSecurityService() 

395 

396 return _content_security_service