Coverage for mcpgateway / services / content_security.py: 100%
98 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/content_security.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
6Content Security Service for ContextForge.
7Provides validation for user-submitted content including size limits,
8MIME type restrictions, and malicious pattern detection.
10This module implements Content Size Limits and MIME Type Restrictions (US-2)
11from issue #538.
12"""
14# Standard
15import hashlib
16import logging
17import threading
18from typing import List, Optional, Union
20# First-Party
21from mcpgateway.config import settings
23# Import metrics with error handling for test environments
24try:
25 # First-Party
26 from mcpgateway.services.metrics import content_size_violations_counter, content_type_violations_counter
27except ImportError:
28 # Metrics not available in test environment - create no-op counters
29 class NoOpCounter:
30 """No-op counter for test environments where metrics are unavailable."""
32 def labels(self, **_kwargs):
33 """Return self to allow method chaining.
35 Args:
36 **_kwargs: Arbitrary keyword arguments (ignored)
38 Returns:
39 self: Returns self for method chaining
40 """
41 return self
43 def inc(self, _amount=1):
44 """No-op increment method."""
46 content_size_violations_counter = NoOpCounter()
47 content_type_violations_counter = NoOpCounter()
49logger = logging.getLogger(__name__)
52def _sanitize_pii_for_logging(user_email: Optional[str] = None, ip_address: Optional[str] = None) -> dict:
53 """Sanitize PII data for secure logging.
55 Args:
56 user_email: User email to sanitize (returns first 8 chars of SHA256 hash)
57 ip_address: IP address to sanitize (masks last octet)
59 Returns:
60 Dictionary with sanitized values suitable for logging
62 Examples:
63 >>> result = _sanitize_pii_for_logging("user@example.com", "192.168.1.100")
64 >>> 'user_hash' in result and 'ip_subnet' in result
65 True
66 >>> result = _sanitize_pii_for_logging(None, None)
67 >>> result
68 {'user_hash': None, 'ip_subnet': None}
69 """
70 user_hash = None
71 if user_email:
72 user_hash = hashlib.sha256(user_email.encode()).hexdigest()[:8]
74 ip_subnet = None
75 if ip_address:
76 # Mask last octet for IPv4, or last segment for IPv6
77 if ":" in ip_address: # IPv6
78 parts = ip_address.split(":")
79 ip_subnet = ":".join(parts[:-1]) + ":xxxx"
80 else: # IPv4
81 ip_subnet = ip_address.rsplit(".", 1)[0] + ".xxx"
83 return {"user_hash": user_hash, "ip_subnet": ip_subnet}
86def _format_bytes(bytes_val: int) -> str:
87 """Format bytes as human-readable size.
89 Args:
90 bytes_val: Size in bytes
92 Returns:
93 Human-readable size string (e.g., "195.3 KB")
95 Examples:
96 >>> _format_bytes(1024)
97 '1.0 KB'
98 >>> _format_bytes(1536)
99 '1.5 KB'
100 >>> _format_bytes(1048576)
101 '1.0 MB'
102 >>> _format_bytes(500)
103 '500 B'
104 """
105 if bytes_val < 1024:
106 return f"{bytes_val} B"
108 size_kb = bytes_val / 1024.0
109 if size_kb < 1024:
110 return f"{size_kb:.1f} KB"
112 size_mb = size_kb / 1024.0
113 if size_mb < 1024:
114 return f"{size_mb:.1f} MB"
116 size_gb = size_mb / 1024.0
117 return f"{size_gb:.1f} GB"
120class ContentSizeError(Exception):
121 """Raised when content exceeds size limits."""
123 def __init__(self, content_type: str, actual_size: int, max_size: int):
124 """Initialize ContentSizeError with size details.
126 Args:
127 content_type: Type of content (e.g., "Resource content", "Prompt template")
128 actual_size: Actual size of the content in bytes
129 max_size: Maximum allowed size in bytes
130 """
131 self.content_type = content_type
132 self.actual_size = actual_size
133 self.max_size = max_size
135 # Format sizes for human readability
136 actual_formatted = _format_bytes(actual_size)
137 max_formatted = _format_bytes(max_size)
139 super().__init__(f"{content_type} size ({actual_formatted}) exceeds " f"maximum allowed size ({max_formatted})")
142class ContentTypeError(Exception):
143 """Raised when a resource MIME type is not in the allowed list."""
145 def __init__(self, mime_type: str, allowed_types: List[str]):
146 """Initialize ContentTypeError with MIME type details.
148 Args:
149 mime_type: The disallowed MIME type that was submitted
150 allowed_types: List of allowed MIME types from configuration
152 Examples:
153 >>> err = ContentTypeError("application/evil", ["text/plain", "text/markdown"])
154 >>> err.mime_type
155 'application/evil'
156 >>> err.allowed_types
157 ['text/plain', 'text/markdown']
158 >>> "application/evil" in str(err)
159 True
160 """
161 self.mime_type = mime_type
162 self.allowed_types = allowed_types
164 # Show up to 5 allowed types in the message for readability
165 display = ", ".join(allowed_types[:5])
166 if len(allowed_types) > 5:
167 display += f", ... ({len(allowed_types)} total)"
169 super().__init__(f"MIME type '{mime_type}' is not allowed. Allowed types: {display}")
172class ContentSecurityService:
173 """Service for validating content security constraints.
175 This service provides validation for:
176 - Content size limits (US-1)
177 - MIME type restrictions (US-2)
178 - Malicious pattern detection (US-3, future)
179 - Template syntax validation (US-4, future)
181 Examples:
182 >>> service = ContentSecurityService()
183 >>> service.validate_resource_size("x" * 50000) # 50KB - OK
184 >>> try:
185 ... service.validate_resource_size("x" * 200000) # 200KB - Too large
186 ... except ContentSizeError as e:
187 ... print(f"Error: {e.actual_size} > {e.max_size}")
188 Error: 200000 > 102400
189 """
191 def __init__(self):
192 """Initialize the content security service."""
193 self.max_resource_size = settings.content_max_resource_size
194 self.max_prompt_size = settings.content_max_prompt_size
195 logger.info(
196 "ContentSecurityService initialized",
197 extra={
198 "max_resource_size": self.max_resource_size,
199 "max_prompt_size": self.max_prompt_size,
200 "strict_mime_validation": settings.content_strict_mime_validation,
201 "allowed_resource_mimetypes_count": len(settings.content_allowed_resource_mimetypes),
202 },
203 )
205 def validate_resource_size(self, content: Union[str, bytes], uri: Optional[str] = None, user_email: Optional[str] = None, ip_address: Optional[str] = None) -> None:
206 """Validate resource content size.
208 Args:
209 content: The resource content to validate (string or bytes)
210 uri: Optional resource URI for logging
211 user_email: Optional user email for logging
212 ip_address: Optional IP address for logging
214 Raises:
215 ContentSizeError: If content exceeds maximum size
217 Examples:
218 >>> service = ContentSecurityService()
219 >>> service.validate_resource_size("small content") # OK
220 >>> try:
221 ... service.validate_resource_size("x" * 200000)
222 ... except ContentSizeError:
223 ... print("Too large")
224 Too large
225 """
226 content_bytes = content.encode("utf-8") if isinstance(content, str) else content
227 actual_size = len(content_bytes)
229 if actual_size > self.max_resource_size:
230 # Increment Prometheus metric
231 content_size_violations_counter.labels(content_type="resource").inc()
233 # Log security violation with sanitized PII
234 sanitized = _sanitize_pii_for_logging(user_email, ip_address)
235 logger.warning(
236 "Resource size limit exceeded", extra={"actual_size": actual_size, "max_size": self.max_resource_size, "content_type": "resource", "uri_provided": uri is not None, **sanitized}
237 )
238 raise ContentSizeError("Resource content", actual_size, self.max_resource_size)
240 logger.debug(f"Resource size validation passed: {actual_size} bytes")
242 def validate_prompt_size(self, template: str, name: Optional[str] = None, user_email: Optional[str] = None, ip_address: Optional[str] = None) -> None:
243 """Validate prompt template size.
245 Args:
246 template: The prompt template to validate
247 name: Optional prompt name for logging
248 user_email: Optional user email for logging
249 ip_address: Optional IP address for logging
251 Raises:
252 ContentSizeError: If template exceeds maximum size
254 Examples:
255 >>> service = ContentSecurityService()
256 >>> service.validate_prompt_size("Hello {{user}}") # OK
257 >>> try:
258 ... service.validate_prompt_size("x" * 20000)
259 ... except ContentSizeError:
260 ... print("Too large")
261 Too large
262 """
263 template_bytes = template.encode("utf-8") if isinstance(template, str) else template
264 actual_size = len(template_bytes)
266 if actual_size > self.max_prompt_size:
267 # Increment Prometheus metric
268 content_size_violations_counter.labels(content_type="prompt").inc()
270 # Log security violation with sanitized PII
271 sanitized = _sanitize_pii_for_logging(user_email, ip_address)
272 logger.warning("Prompt size limit exceeded", extra={"actual_size": actual_size, "max_size": self.max_prompt_size, "content_type": "prompt", "name_provided": name is not None, **sanitized})
273 raise ContentSizeError("Prompt template", actual_size, self.max_prompt_size)
275 logger.debug(f"Prompt size validation passed: {actual_size} bytes")
277 def validate_resource_mime_type(
278 self,
279 mime_type: Optional[str],
280 uri: Optional[str] = None,
281 user_email: Optional[str] = None,
282 ip_address: Optional[str] = None,
283 ) -> None:
284 """Validate a resource MIME type against the configured allowlist.
286 When :attr:`~mcpgateway.config.Settings.content_strict_mime_validation`
287 is ``True``, only MIME types explicitly listed in the allowlist are accepted.
288 This includes vendor types (``application/x-*``, ``text/x-*``) and
289 structured-syntax suffix types (e.g. ``application/vnd.api+json``) which
290 must be explicitly added to the allowlist if needed.
292 When :attr:`~mcpgateway.config.Settings.content_strict_mime_validation`
293 is ``False`` the method logs a warning but does **not** raise, enabling
294 a log-only migration mode.
296 Args:
297 mime_type: The MIME type declared by the caller. ``None`` or empty
298 string is accepted without validation.
299 uri: Optional resource URI included in log output (not logged raw).
300 user_email: Optional user e-mail for PII-safe audit logging.
301 ip_address: Optional client IP for PII-safe audit logging.
303 Raises:
304 ContentTypeError: If ``mime_type`` is not in the allowlist and
305 ``content_strict_mime_validation`` is ``True``.
307 Examples:
308 >>> service = ContentSecurityService()
309 >>> service.validate_resource_mime_type("text/plain") # OK if in allowlist
310 >>> service.validate_resource_mime_type(None) # OK - no type declared
311 >>> from unittest.mock import patch
312 >>> with patch("mcpgateway.services.content_security.settings") as mock_settings:
313 ... mock_settings.content_strict_mime_validation = True
314 ... mock_settings.content_allowed_resource_mimetypes = ["text/plain"]
315 ... try:
316 ... service.validate_resource_mime_type("application/evil")
317 ... except ContentTypeError as e:
318 ... print("blocked:", e.mime_type)
319 blocked: application/evil
320 >>> # Vendor types must be explicitly in allowlist
321 >>> with patch("mcpgateway.services.content_security.settings") as mock_settings:
322 ... mock_settings.content_strict_mime_validation = True
323 ... mock_settings.content_allowed_resource_mimetypes = ["text/plain"]
324 ... try:
325 ... service.validate_resource_mime_type("application/x-custom")
326 ... except ContentTypeError as e:
327 ... print("vendor type blocked:", e.mime_type)
328 vendor type blocked: application/x-custom
329 """
330 # Allow absent MIME types - callers may omit the field legitimately
331 if not mime_type:
332 return
334 allowed_types: List[str] = settings.content_allowed_resource_mimetypes
335 strict = settings.content_strict_mime_validation
337 # Strip parameters from MIME type for comparison (e.g., "text/plain; charset=utf-8" -> "text/plain")
338 base_mime_type = mime_type.split(";")[0].strip()
340 # Fast path: exact match in allowlist (check both full and base MIME type)
341 if mime_type in allowed_types or base_mime_type in allowed_types:
342 logger.debug("Resource MIME type validation passed: %s", mime_type)
343 return
345 # Violation detected — always increment metric and log regardless of mode.
346 # In strict mode, also raise to block the request.
347 content_type_violations_counter.labels(content_type="resource").inc()
349 sanitized = _sanitize_pii_for_logging(user_email, ip_address)
350 logger.warning(
351 "Resource MIME type not in allowlist%s",
352 " (log-only mode, not blocking)" if not strict else "",
353 extra={
354 "mime_type": mime_type,
355 "allowed_count": len(allowed_types),
356 "uri_provided": uri is not None,
357 "strict": strict,
358 **sanitized,
359 },
360 )
362 if strict:
363 raise ContentTypeError(mime_type, allowed_types)
366# Singleton instance with thread-safe initialization
367_content_security_service: Optional[ContentSecurityService] = None
368_content_security_service_lock = threading.Lock()
371def get_content_security_service() -> ContentSecurityService:
372 """Get or create the singleton ContentSecurityService instance.
374 Thread-safe singleton implementation using double-checked locking pattern
375 to prevent race conditions (CWE-362).
377 Returns:
378 ContentSecurityService: The singleton instance
380 Examples:
381 >>> service1 = get_content_security_service()
382 >>> service2 = get_content_security_service()
383 >>> service1 is service2
384 True
385 """
386 global _content_security_service # pylint: disable=global-statement
388 # First check (without lock for performance)
389 if _content_security_service is None:
390 # Acquire lock for thread-safe initialization
391 with _content_security_service_lock:
392 # Second check (with lock to prevent race condition)
393 if _content_security_service is None:
394 _content_security_service = ContentSecurityService()
396 return _content_security_service