Coverage for mcpgateway / common / validators.py: 100%
390 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/common/validators.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti, Madhav Kandukuri
7SecurityValidator for ContextForge
8This module defines the `SecurityValidator` class, which provides centralized, configurable
9validation logic for user-generated content in MCP-based applications.
11The validator enforces strict security and structural rules across common input types such as:
12- Display text (e.g., names, descriptions)
13- Identifiers and tool names
14- URIs and URLs
15- JSON object depth
16- Templates (including limited HTML/Jinja2)
17- MIME types
19Key Features:
20- Pattern-based validation using settings-defined regex for HTML/script safety
21- Configurable max lengths and depth limits
22- Whitelist-based URL scheme and MIME type validation
23- Safe escaping of user-visible text fields
24- Reusable static/class methods for field-level and form-level validation
26Intended to be used with Pydantic or similar schema-driven systems to validate and sanitize
27user input in a consistent, centralized way.
29Dependencies:
30- Standard Library: re, html, logging, urllib.parse
31- First-party: `settings` from `mcpgateway.config`
33Example usage:
34 SecurityValidator.validate_name("my_tool", field_name="Tool Name")
35 SecurityValidator.validate_url("https://example.com")
36 SecurityValidator.validate_json_depth({...})
38Examples:
39 >>> from mcpgateway.common.validators import SecurityValidator
40 >>> SecurityValidator.sanitize_display_text('<b>Test</b>', 'test')
41 'Test'
42 >>> SecurityValidator.validate_name('valid_name-123', 'test')
43 'valid_name-123'
44 >>> SecurityValidator.validate_identifier('my.test.id_123', 'test')
45 'my.test.id_123'
46 >>> SecurityValidator.validate_json_depth({'a': {'b': 1}})
47 >>> SecurityValidator.validate_json_depth({'a': 1})
48"""
50# Standard
51from html.parser import HTMLParser
52import ipaddress
53import logging
54from pathlib import Path
55import re
56import shlex
57import socket
58from typing import Any, Iterable, List, Optional, Pattern
59from urllib.parse import urlparse
60import uuid
62# First-Party
63from mcpgateway.config import settings
65logger = logging.getLogger(__name__)
67# ============================================================================
68# Precompiled regex patterns (compiled once at module load for performance)
69# ============================================================================
70# Note: Settings-based patterns (DANGEROUS_HTML_PATTERN, DANGEROUS_JS_PATTERN,
71# NAME_PATTERN, IDENTIFIER_PATTERN, etc.) are NOT precompiled here because tests
72# override the class attributes at runtime. Only truly static patterns are
73# precompiled at module level.
75# Static inline patterns used multiple times
76_HTML_SPECIAL_CHARS_RE: Pattern[str] = re.compile(r'[<>"\']') # / removed per SEP-986
77_DANGEROUS_TEMPLATE_TAGS_RE: Pattern[str] = re.compile(r"<(script|iframe|object|embed|link|meta|base|form)\b", re.IGNORECASE)
78_EVENT_HANDLER_RE: Pattern[str] = re.compile(r"on\w+\s*=", re.IGNORECASE)
79_MIME_TYPE_RE: Pattern[str] = re.compile( # noqa: DUO138 - no ReDoS: inner groups require literal ; and = delimiters preventing backtrack ambiguity
80 r'^[a-zA-Z0-9][a-zA-Z0-9!#$&\-\^_+\.]*\/[a-zA-Z0-9][a-zA-Z0-9!#$&\-\^_+\.]*(?:\s*;\s*[a-zA-Z0-9!#$&\-\^_+\.]+=(?:[a-zA-Z0-9!#$&\-\^_+\.]+|"[^"\r\n]*"))*$'
81)
82_URI_SCHEME_RE: Pattern[str] = re.compile(r"^[a-zA-Z][a-zA-Z0-9+\-.]*://")
83_SHELL_DANGEROUS_CHARS_RE: Pattern[str] = re.compile(r"[;&|`$(){}\[\]<>]")
84_ANSI_ESCAPE_RE: Pattern[str] = re.compile(r"\x1B\[[0-9;]*[A-Za-z]")
85_CONTROL_CHARS_RE: Pattern[str] = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]")
87# Polyglot attack patterns (precompiled with IGNORECASE)
88_POLYGLOT_PATTERNS: List[Pattern[str]] = [
89 re.compile(r"['\"];.*alert\s*\(", re.IGNORECASE),
90 re.compile(r"-->\s*<[^>]+>", re.IGNORECASE),
91 re.compile(r"['\"].*//['\"]", re.IGNORECASE),
92 re.compile(r"<<[A-Z]+>", re.IGNORECASE),
93 re.compile(r"String\.fromCharCode", re.IGNORECASE),
94 re.compile(r"javascript:.*\(", re.IGNORECASE),
95]
97# SSTI prevention - safe scanning without regex backtracking.
98_SSTI_DANGEROUS_SUBSTRINGS: tuple[str, ...] = (
99 "__",
100 ".",
101 "config",
102 "self",
103 "request",
104 "application",
105 "globals",
106 "builtins",
107 "import",
108 "getattr", # Python getattr function
109 "|attr", # Jinja2 attr filter (checked after whitespace normalization)
110 "|selectattr", # Jinja2 selectattr filter (takes attribute name as arg)
111 "|sort", # Jinja2 sort filter with attribute parameter
112 "|map", # Jinja2 map filter with attribute parameter
113 "attribute=", # Jinja2 filters: map(attribute=...), selectattr, sort(attribute=...)
114 "\\x", # Hex escape sequences (e.g., \x5f for underscore)
115 "\\u", # Unicode escape sequences (e.g., \u005f for underscore)
116 "\\n{", # Named unicode escapes (e.g., \N{LOW LINE})
117 "\\0",
118 "\\1",
119 "\\2",
120 "\\3",
121 "\\4",
122 "\\5",
123 "\\6",
124 "\\7", # Octal escapes
125)
126# Operators that enable code execution or dynamic construction
127_SSTI_DANGEROUS_OPERATORS: tuple[str, ...] = (
128 "*",
129 "/",
130 "+",
131 "-",
132 "~", # Jinja2 string concatenation (can build dunder names dynamically)
133 "[", # Bracket notation for dynamic attribute access
134 "%", # Python string formatting (e.g., '%c' % 95 produces '_')
135)
136_SSTI_SIMPLE_TEMPLATE_PREFIXES: tuple[str, ...] = ("${", "#{", "%{")
139def _iter_template_expressions(value: str, start: str, end: str) -> Iterable[str]:
140 """Yield template expression contents for a start/end delimiter, skipping delimiters inside quotes.
142 Args:
143 value (str): Template text to scan.
144 start (str): Opening delimiter.
145 end (str): Closing delimiter.
147 Yields:
148 str: The template expression contents between delimiters.
150 Raises:
151 ValueError: If an unterminated template expression is found (fail-closed behavior).
152 """
153 start_len = len(start)
154 end_len = len(end)
155 i = 0
156 value_len = len(value)
157 while i <= value_len - start_len:
158 if value.startswith(start, i):
159 j = i + start_len
160 in_quote: Optional[str] = None
161 escaped = False
162 while j <= value_len - end_len:
163 ch = value[j]
164 if escaped:
165 escaped = False
166 elif ch == "\\":
167 escaped = True
168 elif in_quote:
169 if ch == in_quote:
170 in_quote = None
171 else:
172 if ch in ("'", '"'):
173 in_quote = ch
174 elif value.startswith(end, j):
175 yield value[i + start_len : j]
176 i = j + end_len
177 break
178 j += 1
179 else:
180 raise ValueError("Template contains potentially dangerous expressions")
181 else:
182 i += 1
185def _has_simple_template_expression(value: str, start: str) -> bool:
186 """Return True if start is followed by any closing brace.
188 Uses O(n) linear scan by finding last } first, then checking prefixes.
190 Args:
191 value (str): Template text to scan.
192 start (str): Opening delimiter.
194 Returns:
195 bool: True if a closing brace exists after the delimiter.
196 """
197 # Find the last closing brace - if none exists, no expression can be complete
198 last_close = value.rfind("}")
199 if last_close == -1:
200 return False
201 # Check if any prefix exists before the last closing brace - O(n) single find
202 idx = value.find(start)
203 return idx != -1 and idx < last_close
206# Dangerous URL protocol patterns (precompiled with IGNORECASE)
207_DANGEROUS_URL_PATTERNS: List[Pattern[str]] = [
208 re.compile(r"javascript:", re.IGNORECASE),
209 re.compile(r"data:", re.IGNORECASE),
210 re.compile(r"vbscript:", re.IGNORECASE),
211 re.compile(r"about:", re.IGNORECASE),
212 re.compile(r"chrome:", re.IGNORECASE),
213 re.compile(r"file:", re.IGNORECASE),
214 re.compile(r"ftp:", re.IGNORECASE),
215 re.compile(r"mailto:", re.IGNORECASE),
216]
218# SQL injection patterns (precompiled with IGNORECASE)
219_SQL_PATTERNS: List[Pattern[str]] = [
220 re.compile(r"[';\"\\]", re.IGNORECASE),
221 re.compile(r"--", re.IGNORECASE),
222 re.compile(r"/\*.*?\*/", re.IGNORECASE),
223 re.compile(r"\b(union|select|insert|update|delete|drop|exec|execute)\b", re.IGNORECASE),
224]
227# ============================================================================
228# HTML Tag Stripper with Character Preservation
229# ============================================================================
230class _TagStripper(HTMLParser):
231 """Strip HTML tags while preserving all text content and special characters.
233 This parser removes HTML tags but keeps the text content exactly as-is,
234 including special characters like &, ", and '. HTML entities are decoded
235 to their literal characters (e.g., & becomes &).
236 """
238 def __init__(self) -> None:
239 super().__init__(convert_charrefs=True)
240 self.reset()
241 self.strict = False
242 self.fed: List[str] = []
244 def handle_data(self, data: str) -> None:
245 """Handle text data between tags.
247 With convert_charrefs=True, HTML entities are automatically decoded
248 (e.g., & → &) and plain text with & passes through unchanged.
250 Args:
251 data: Text content between HTML tags
252 """
253 self.fed.append(data)
255 def get_data(self) -> str:
256 """Return the accumulated text content.
258 Returns:
259 str: Concatenated text content from all handled data
260 """
261 return "".join(self.fed)
264def _strip_html_tags(value: str) -> str:
265 """Remove HTML tags while preserving special characters exactly as-is.
267 Args:
268 value: String that may contain HTML tags
270 Returns:
271 String with HTML tags removed but text content preserved
273 Examples:
274 >>> _strip_html_tags('<b>Hello</b> World')
275 'Hello World'
276 >>> _strip_html_tags('Test & Check')
277 'Test & Check'
278 >>> _strip_html_tags('Quote: "Hello"')
279 'Quote: "Hello"'
280 >>> _strip_html_tags('&&&')
281 '&&&'
282 """
283 s = _TagStripper()
284 s.feed(value)
285 s.close()
286 return s.get_data()
289class SecurityValidator:
290 """Configurable validation with MCP-compliant limits"""
292 # Configurable patterns (from settings)
293 DANGEROUS_HTML_PATTERN = (
294 settings.validation_dangerous_html_pattern
295 ) # Default: '<(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)\b|</*(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)>'
296 DANGEROUS_JS_PATTERN = settings.validation_dangerous_js_pattern # Default: javascript:|vbscript:|on\w+\s*=|data:.*script
297 ALLOWED_URL_SCHEMES = settings.validation_allowed_url_schemes # Default: ["http://", "https://", "ws://", "wss://"]
299 # Character type patterns
300 NAME_PATTERN = settings.validation_name_pattern # Default: ^[a-zA-Z0-9_.\- ]+$ (literal space, not \s)
301 IDENTIFIER_PATTERN = settings.validation_identifier_pattern # Default: ^[a-zA-Z0-9_\-\.]+$
302 VALIDATION_SAFE_URI_PATTERN = settings.validation_safe_uri_pattern # Default: ^[a-zA-Z0-9_\-.:/?=&%]+$
303 VALIDATION_UNSAFE_URI_PATTERN = settings.validation_unsafe_uri_pattern # Default: [<>"\'\\]
304 TOOL_NAME_PATTERN = settings.validation_tool_name_pattern # Default: ^[a-zA-Z0-9_][a-zA-Z0-9._/-]*$ (SEP-986)
306 # MCP-compliant limits (configurable)
307 MAX_NAME_LENGTH = settings.validation_max_name_length # Default: 255
308 MAX_DESCRIPTION_LENGTH = settings.validation_max_description_length # Default: 8192 (8KB)
309 MAX_TEMPLATE_LENGTH = settings.validation_max_template_length # Default: 65536
310 MAX_CONTENT_LENGTH = settings.validation_max_content_length # Default: 1048576 (1MB)
311 MAX_JSON_DEPTH = settings.validation_max_json_depth # Default: 30
312 MAX_URL_LENGTH = settings.validation_max_url_length # Default: 2048
314 @classmethod
315 def sanitize_display_text(cls, value: str, field_name: str) -> str:
316 """Ensure text is safe for display in UI by escaping special characters
318 Args:
319 value (str): Value to validate
320 field_name (str): Name of field being validated
322 Returns:
323 str: Value if acceptable
325 Raises:
326 ValueError: When input is not acceptable
328 Examples:
329 Basic HTML tag stripping:
331 >>> SecurityValidator.sanitize_display_text('Hello World', 'test')
332 'Hello World'
333 >>> SecurityValidator.sanitize_display_text('Hello <b>World</b>', 'test')
334 'Hello World'
336 Empty/None handling:
338 >>> SecurityValidator.sanitize_display_text('', 'test')
339 ''
340 >>> SecurityValidator.sanitize_display_text(None, 'test') #doctest: +SKIP
342 Dangerous script patterns:
344 >>> SecurityValidator.sanitize_display_text('alert();', 'test')
345 'alert();'
346 >>> SecurityValidator.sanitize_display_text('javascript:alert(1)', 'test')
347 Traceback (most recent call last):
348 ...
349 ValueError: test contains script patterns that may cause display issues
351 Polyglot attack patterns:
353 >>> SecurityValidator.sanitize_display_text('"; alert()', 'test')
354 Traceback (most recent call last):
355 ...
356 ValueError: test contains potentially dangerous character sequences
357 >>> SecurityValidator.sanitize_display_text('-->test', 'test')
358 '-->test'
359 >>> SecurityValidator.sanitize_display_text('--><script>', 'test')
360 Traceback (most recent call last):
361 ...
362 ValueError: test contains HTML tags that may cause display issues
363 >>> SecurityValidator.sanitize_display_text('String.fromCharCode(65)', 'test')
364 Traceback (most recent call last):
365 ...
366 ValueError: test contains potentially dangerous character sequences
368 Special characters (preserved as-is, no HTML entity conversion):
370 >>> SecurityValidator.sanitize_display_text('User & Admin', 'test')
371 'User & Admin'
372 >>> SecurityValidator.sanitize_display_text('Quote: "Hello"', 'test')
373 'Quote: "Hello"'
374 >>> SecurityValidator.sanitize_display_text("Quote: 'Hello'", 'test')
375 "Quote: 'Hello'"
376 """
377 if not value:
378 return value
380 # Check for patterns that could cause display issues
381 if re.search(cls.DANGEROUS_HTML_PATTERN, value, re.IGNORECASE):
382 raise ValueError(f"{field_name} contains HTML tags that may cause display issues")
384 if re.search(cls.DANGEROUS_JS_PATTERN, value, re.IGNORECASE):
385 raise ValueError(f"{field_name} contains script patterns that may cause display issues")
387 # Check for polyglot patterns (uses precompiled regex list)
388 for pattern in _POLYGLOT_PATTERNS:
389 if pattern.search(value):
390 raise ValueError(f"{field_name} contains potentially dangerous character sequences")
392 cleaned = _strip_html_tags(value)
393 return cleaned
395 @classmethod
396 def validate_name(cls, value: str, field_name: str = "Name") -> str:
397 """Validate names with strict character requirements
399 Args:
400 value (str): Value to validate
401 field_name (str): Name of field being validated
403 Returns:
404 str: Value if acceptable
406 Raises:
407 ValueError: When input is not acceptable
409 Examples:
410 >>> SecurityValidator.validate_name('valid_name')
411 'valid_name'
412 >>> SecurityValidator.validate_name('valid_name-123')
413 'valid_name-123'
414 >>> SecurityValidator.validate_name('valid_name_test')
415 'valid_name_test'
416 >>> SecurityValidator.validate_name('Test Name')
417 'Test Name'
418 >>> try:
419 ... SecurityValidator.validate_name('Invalid Name!')
420 ... except ValueError as e:
421 ... 'can only contain' in str(e)
422 True
423 >>> try:
424 ... SecurityValidator.validate_name('')
425 ... except ValueError as e:
426 ... 'cannot be empty' in str(e)
427 True
428 >>> try:
429 ... SecurityValidator.validate_name('name<script>')
430 ... except ValueError as e:
431 ... 'HTML special characters' in str(e) or 'can only contain' in str(e)
432 True
434 Test length limit (line 181):
436 >>> long_name = 'a' * 256
437 >>> try:
438 ... SecurityValidator.validate_name(long_name)
439 ... except ValueError as e:
440 ... 'exceeds maximum length' in str(e)
441 True
443 Test HTML special characters (line 178):
445 >>> try:
446 ... SecurityValidator.validate_name('name"test')
447 ... except ValueError as e:
448 ... 'can only contain' in str(e)
449 True
450 >>> try:
451 ... SecurityValidator.validate_name("name'test")
452 ... except ValueError as e:
453 ... 'can only contain' in str(e)
454 True
455 >>> try:
456 ... SecurityValidator.validate_name('name/test')
457 ... except ValueError as e:
458 ... 'can only contain' in str(e)
459 True
460 """
461 if not value:
462 raise ValueError(f"{field_name} cannot be empty")
464 # Check against allowed pattern
465 if not re.match(cls.NAME_PATTERN, value):
466 raise ValueError(f"{field_name} can only contain letters, numbers, underscore, and hyphen. Special characters like <, >, quotes are not allowed.")
468 # Additional check for HTML-like patterns (uses precompiled regex)
469 if _HTML_SPECIAL_CHARS_RE.search(value):
470 raise ValueError(f"{field_name} cannot contain HTML special characters")
472 if len(value) > cls.MAX_NAME_LENGTH:
473 raise ValueError(f"{field_name} exceeds maximum length of {cls.MAX_NAME_LENGTH}")
475 return value
477 @classmethod
478 def validate_identifier(cls, value: str, field_name: str) -> str:
479 """Validate identifiers (IDs) - MCP compliant
481 Args:
482 value (str): Value to validate
483 field_name (str): Name of field being validated
485 Returns:
486 str: Value if acceptable
488 Raises:
489 ValueError: When input is not acceptable
491 Examples:
492 >>> SecurityValidator.validate_identifier('valid_id', 'ID')
493 'valid_id'
494 >>> SecurityValidator.validate_identifier('valid.id.123', 'ID')
495 'valid.id.123'
496 >>> SecurityValidator.validate_identifier('valid-id_test', 'ID')
497 'valid-id_test'
498 >>> SecurityValidator.validate_identifier('test123', 'ID')
499 'test123'
500 >>> try:
501 ... SecurityValidator.validate_identifier('Invalid/ID', 'ID')
502 ... except ValueError as e:
503 ... 'can only contain' in str(e)
504 True
505 >>> try:
506 ... SecurityValidator.validate_identifier('', 'ID')
507 ... except ValueError as e:
508 ... 'cannot be empty' in str(e)
509 True
510 >>> try:
511 ... SecurityValidator.validate_identifier('id<script>', 'ID')
512 ... except ValueError as e:
513 ... 'HTML special characters' in str(e) or 'can only contain' in str(e)
514 True
516 Test HTML special characters (line 233):
518 >>> try:
519 ... SecurityValidator.validate_identifier('id"test', 'ID')
520 ... except ValueError as e:
521 ... 'can only contain' in str(e)
522 True
523 >>> try:
524 ... SecurityValidator.validate_identifier("id'test", 'ID')
525 ... except ValueError as e:
526 ... 'can only contain' in str(e)
527 True
528 >>> try:
529 ... SecurityValidator.validate_identifier('id/test', 'ID')
530 ... except ValueError as e:
531 ... 'can only contain' in str(e)
532 True
534 Test length limit (line 236):
536 >>> long_id = 'a' * 256
537 >>> try:
538 ... SecurityValidator.validate_identifier(long_id, 'ID')
539 ... except ValueError as e:
540 ... 'exceeds maximum length' in str(e)
541 True
542 """
543 if not value:
544 raise ValueError(f"{field_name} cannot be empty")
546 # MCP spec: identifiers should be alphanumeric + limited special chars
547 if not re.match(cls.IDENTIFIER_PATTERN, value):
548 raise ValueError(f"{field_name} can only contain letters, numbers, underscore, hyphen, and dots")
550 # Block HTML-like patterns (uses precompiled regex)
551 if _HTML_SPECIAL_CHARS_RE.search(value):
552 raise ValueError(f"{field_name} cannot contain HTML special characters")
554 if len(value) > cls.MAX_NAME_LENGTH:
555 raise ValueError(f"{field_name} exceeds maximum length of {cls.MAX_NAME_LENGTH}")
557 return value
559 @classmethod
560 def validate_uri(cls, value: str, field_name: str = "URI") -> str:
561 """Validate URIs - MCP compliant
563 Args:
564 value (str): Value to validate
565 field_name (str): Name of field being validated
567 Returns:
568 str: Value if acceptable
570 Raises:
571 ValueError: When input is not acceptable
573 Examples:
574 >>> SecurityValidator.validate_uri('/valid/uri', 'URI')
575 '/valid/uri'
576 >>> SecurityValidator.validate_uri('..', 'URI')
577 Traceback (most recent call last):
578 ...
579 ValueError: URI cannot contain directory traversal sequences ('..')
580 """
581 if not value:
582 raise ValueError(f"{field_name} cannot be empty")
584 # Block HTML-like patterns
585 if re.search(cls.VALIDATION_UNSAFE_URI_PATTERN, value):
586 raise ValueError(f"{field_name} cannot contain HTML special characters")
588 if ".." in value:
589 raise ValueError(f"{field_name} cannot contain directory traversal sequences ('..')")
591 if not re.search(cls.VALIDATION_SAFE_URI_PATTERN, value):
592 raise ValueError(f"{field_name} contains invalid characters")
594 if len(value) > cls.MAX_NAME_LENGTH:
595 raise ValueError(f"{field_name} exceeds maximum length of {cls.MAX_NAME_LENGTH}")
597 return value
599 @classmethod
600 def validate_tool_name(cls, value: str) -> str:
601 """Special validation for MCP tool names
603 Args:
604 value (str): Value to validate
606 Returns:
607 str: Value if acceptable
609 Raises:
610 ValueError: When input is not acceptable
612 Examples:
613 >>> SecurityValidator.validate_tool_name('tool_1')
614 'tool_1'
615 >>> SecurityValidator.validate_tool_name('_5gpt_query')
616 '_5gpt_query'
617 >>> SecurityValidator.validate_tool_name('1tool')
618 '1tool'
620 Test invalid characters (rejected by pattern):
622 >>> try:
623 ... SecurityValidator.validate_tool_name('tool<script>')
624 ... except ValueError as e:
625 ... 'must start with a letter, number, or underscore' in str(e)
626 True
627 >>> try:
628 ... SecurityValidator.validate_tool_name('tool"test')
629 ... except ValueError as e:
630 ... 'must start with a letter, number, or underscore' in str(e)
631 True
632 >>> try:
633 ... SecurityValidator.validate_tool_name("tool'test")
634 ... except ValueError as e:
635 ... 'must start with a letter, number, or underscore' in str(e)
636 True
637 >>> # Slashes are allowed per SEP-986
638 >>> SecurityValidator.validate_tool_name('tool/test')
639 'tool/test'
640 >>> SecurityValidator.validate_tool_name('namespace/subtool')
641 'namespace/subtool'
643 Test length limit (line 313):
645 >>> long_tool_name = 'a' * 256
646 >>> try:
647 ... SecurityValidator.validate_tool_name(long_tool_name)
648 ... except ValueError as e:
649 ... 'exceeds maximum length' in str(e)
650 True
651 """
652 if not value:
653 raise ValueError("Tool name cannot be empty")
655 # MCP tools have specific naming requirements
656 if not re.match(cls.TOOL_NAME_PATTERN, value):
657 raise ValueError("Tool name must start with a letter, number, or underscore and contain only letters, numbers, periods, underscores, hyphens, and slashes")
659 # Ensure no HTML-like content (uses precompiled regex)
660 if _HTML_SPECIAL_CHARS_RE.search(value):
661 raise ValueError("Tool name cannot contain HTML special characters")
663 if len(value) > cls.MAX_NAME_LENGTH:
664 raise ValueError(f"Tool name exceeds maximum length of {cls.MAX_NAME_LENGTH}")
666 return value
668 @classmethod
669 def validate_uuid(cls, value: str, field_name: str = "UUID") -> str:
670 """Validate UUID format
672 Args:
673 value (str): Value to validate
674 field_name (str): Name of field being validated
676 Returns:
677 str: Value if validated as safe
679 Raises:
680 ValueError: When value is not a valid UUID
682 Examples:
683 >>> SecurityValidator.validate_uuid('550e8400-e29b-41d4-a716-446655440000')
684 '550e8400e29b41d4a716446655440000'
685 >>> SecurityValidator.validate_uuid('invalid-uuid')
686 Traceback (most recent call last):
687 ...
688 ValueError: UUID must be a valid UUID format
690 Test empty UUID (line 340):
692 >>> SecurityValidator.validate_uuid('')
693 ''
695 Test normalized UUID format (lines 344-346):
697 >>> SecurityValidator.validate_uuid('550E8400-E29B-41D4-A716-446655440000')
698 '550e8400e29b41d4a716446655440000'
699 >>> SecurityValidator.validate_uuid('550e8400e29b41d4a716446655440000')
700 '550e8400e29b41d4a716446655440000'
702 Test various invalid UUID formats (line 347-348):
704 >>> try:
705 ... SecurityValidator.validate_uuid('not-a-uuid')
706 ... except ValueError as e:
707 ... 'valid UUID format' in str(e)
708 True
709 >>> try:
710 ... SecurityValidator.validate_uuid('550e8400-e29b-41d4-a716')
711 ... except ValueError as e:
712 ... 'valid UUID format' in str(e)
713 True
714 >>> try:
715 ... SecurityValidator.validate_uuid('550e8400-e29b-41d4-a716-446655440000-extra')
716 ... except ValueError as e:
717 ... 'valid UUID format' in str(e)
718 True
719 >>> try:
720 ... SecurityValidator.validate_uuid('gggggggg-gggg-gggg-gggg-gggggggggggg')
721 ... except ValueError as e:
722 ... 'valid UUID format' in str(e)
723 True
724 """
725 if not value:
726 return value
728 try:
729 # Validate UUID format by attempting to parse it
730 uuid_obj = uuid.UUID(value)
731 # Return the normalized string representation
732 return str(uuid_obj).replace("-", "")
733 except ValueError:
734 logger.error(f"Invalid UUID format for {field_name}: {value}")
735 raise ValueError(f"{field_name} must be a valid UUID format")
737 @classmethod
738 def validate_template(cls, value: str) -> str:
739 """Special validation for templates - allow safe Jinja2 but prevent SSTI
741 Args:
742 value (str): Value to validate
744 Returns:
745 str: Value if acceptable
747 Raises:
748 ValueError: When input is not acceptable
750 Examples:
751 Empty template handling:
753 >>> SecurityValidator.validate_template('')
754 ''
755 >>> SecurityValidator.validate_template(None) #doctest: +SKIP
757 Safe Jinja2 templates:
759 >>> SecurityValidator.validate_template('Hello {{ name }}')
760 'Hello {{ name }}'
761 >>> SecurityValidator.validate_template('{% if condition %}text{% endif %}')
762 '{% if condition %}text{% endif %}'
763 >>> SecurityValidator.validate_template('{{ username }}')
764 '{{ username }}'
766 Dangerous HTML tags blocked:
768 >>> SecurityValidator.validate_template('Hello <script>alert(1)</script>')
769 Traceback (most recent call last):
770 ...
771 ValueError: Template contains HTML tags that may interfere with proper display
772 >>> SecurityValidator.validate_template('Test <iframe src="evil.com"></iframe>')
773 Traceback (most recent call last):
774 ...
775 ValueError: Template contains HTML tags that may interfere with proper display
776 >>> SecurityValidator.validate_template('<form action="/evil"></form>')
777 Traceback (most recent call last):
778 ...
779 ValueError: Template contains HTML tags that may interfere with proper display
781 Event handlers blocked:
783 >>> SecurityValidator.validate_template('<div onclick="evil()">Test</div>')
784 Traceback (most recent call last):
785 ...
786 ValueError: Template contains event handlers that may cause display issues
787 >>> SecurityValidator.validate_template('onload = "alert(1)"')
788 Traceback (most recent call last):
789 ...
790 ValueError: Template contains event handlers that may cause display issues
792 SSTI prevention patterns:
794 >>> SecurityValidator.validate_template('{{ __import__ }}')
795 Traceback (most recent call last):
796 ...
797 ValueError: Template contains potentially dangerous expressions
798 >>> SecurityValidator.validate_template('{{ config }}')
799 Traceback (most recent call last):
800 ...
801 ValueError: Template contains potentially dangerous expressions
802 >>> SecurityValidator.validate_template('{% import os %}')
803 Traceback (most recent call last):
804 ...
805 ValueError: Template contains potentially dangerous expressions
806 >>> SecurityValidator.validate_template('{{ 7*7 }}')
807 Traceback (most recent call last):
808 ...
809 ValueError: Template contains potentially dangerous expressions
810 >>> SecurityValidator.validate_template('{{ 10/2 }}')
811 Traceback (most recent call last):
812 ...
813 ValueError: Template contains potentially dangerous expressions
814 >>> SecurityValidator.validate_template('{{ 5+5 }}')
815 Traceback (most recent call last):
816 ...
817 ValueError: Template contains potentially dangerous expressions
818 >>> SecurityValidator.validate_template('{{ 10-5 }}')
819 Traceback (most recent call last):
820 ...
821 ValueError: Template contains potentially dangerous expressions
823 Other template injection patterns:
825 >>> SecurityValidator.validate_template('${evil}')
826 Traceback (most recent call last):
827 ...
828 ValueError: Template contains potentially dangerous expressions
829 >>> SecurityValidator.validate_template('#{evil}')
830 Traceback (most recent call last):
831 ...
832 ValueError: Template contains potentially dangerous expressions
833 >>> SecurityValidator.validate_template('%{evil}')
834 Traceback (most recent call last):
835 ...
836 ValueError: Template contains potentially dangerous expressions
838 Length limit note: size validation is performed at the service layer
839 using configurable limits (ContentSecurityService). This validator
840 only checks encoding, dangerous patterns, and SSTI prevention.
841 """
842 if not value:
843 return value
845 # Block dangerous tags but allow Jinja2 syntax {{ }} and {% %} (uses precompiled regex)
846 if _DANGEROUS_TEMPLATE_TAGS_RE.search(value):
847 raise ValueError("Template contains HTML tags that may interfere with proper display")
849 # Check for event handlers that could cause issues (uses precompiled regex)
850 if _EVENT_HANDLER_RE.search(value):
851 raise ValueError("Template contains event handlers that may cause display issues")
853 # SSTI prevention - scan expressions without regex backtracking.
854 for expr in _iter_template_expressions(value, "{{", "}}"):
855 expr_lower = expr.lower()
856 # Normalize whitespace around | and = to catch bypass variants
857 expr_normalized = re.sub(r"\s*\|\s*", "|", expr_lower)
858 expr_normalized = re.sub(r"\s*=\s*", "=", expr_normalized)
859 if any(token in expr_normalized for token in _SSTI_DANGEROUS_SUBSTRINGS):
860 raise ValueError("Template contains potentially dangerous expressions")
861 if any(op in expr for op in _SSTI_DANGEROUS_OPERATORS):
862 raise ValueError("Template contains potentially dangerous expressions")
864 for expr in _iter_template_expressions(value, "{%", "%}"):
865 expr_lower = expr.lower()
866 # Normalize whitespace around | and = to catch bypass variants
867 expr_normalized = re.sub(r"\s*\|\s*", "|", expr_lower)
868 expr_normalized = re.sub(r"\s*=\s*", "=", expr_normalized)
869 if any(token in expr_normalized for token in _SSTI_DANGEROUS_SUBSTRINGS):
870 raise ValueError("Template contains potentially dangerous expressions")
871 if any(op in expr for op in _SSTI_DANGEROUS_OPERATORS):
872 raise ValueError("Template contains potentially dangerous expressions")
874 if any(_has_simple_template_expression(value, prefix) for prefix in _SSTI_SIMPLE_TEMPLATE_PREFIXES):
875 raise ValueError("Template contains potentially dangerous expressions")
877 return value
879 @classmethod
880 def sanitize_log_message(cls, message: Optional[Any], max_length: int = 10000) -> str:
881 """Sanitize log message to prevent log injection attacks.
883 Removes newlines, carriage returns, ANSI escapes, and control characters
884 to prevent log forging and injection attacks (CWE-117).
886 Args:
887 message: Log message to sanitize
888 max_length: Maximum length (default: 10000)
890 Returns:
891 Sanitized message safe for logging
893 Examples:
894 Basic newline removal:
896 >>> SecurityValidator.sanitize_log_message("User\\nFake: admin")
897 'User Fake: admin'
898 >>> SecurityValidator.sanitize_log_message("Test\\rInjection")
899 'Test Injection'
901 ANSI escape removal:
903 >>> SecurityValidator.sanitize_log_message("User: \\x1B[31madmin\\x1B[0m")
904 'User: admin'
906 Control character removal:
908 >>> result = SecurityValidator.sanitize_log_message("User\\x00\\x01\\x02")
909 >>> "\\x00" not in result and "\\x01" not in result
910 True
912 Length truncation:
914 >>> long_msg = "A" * 15000
915 >>> result = SecurityValidator.sanitize_log_message(long_msg, max_length=10000)
916 >>> len(result) <= 10020
917 True
918 >>> result.endswith("[truncated]")
919 True
921 Empty input handling:
923 >>> SecurityValidator.sanitize_log_message("")
924 ''
925 >>> SecurityValidator.sanitize_log_message(None)
926 ''
927 """
928 if not message:
929 return ""
931 text = str(message)
933 # Remove newlines and carriage returns (primary log injection vectors)
934 text = text.replace("\n", " ").replace("\r", " ")
936 # Remove ANSI escape sequences
937 text = _ANSI_ESCAPE_RE.sub("", text)
939 # Remove control characters
940 text = _CONTROL_CHARS_RE.sub("", text)
942 # Truncate to prevent log flooding
943 if len(text) > max_length:
944 text = text[:max_length] + "...[truncated]"
946 return text
948 @classmethod
949 def validate_url(cls, value: str, field_name: str = "URL") -> str:
950 """Validate URLs for allowed schemes and safe display
952 Args:
953 value (str): Value to validate
954 field_name (str): Name of field being validated
956 Returns:
957 str: Value if acceptable
959 Raises:
960 ValueError: When input is not acceptable
962 Examples:
963 Valid URLs:
965 >>> SecurityValidator.validate_url('https://example.com')
966 'https://example.com'
967 >>> SecurityValidator.validate_url('http://example.com')
968 'http://example.com'
969 >>> SecurityValidator.validate_url('ws://example.com')
970 'ws://example.com'
971 >>> SecurityValidator.validate_url('wss://example.com')
972 'wss://example.com'
973 >>> SecurityValidator.validate_url('https://example.com:8080/path')
974 'https://example.com:8080/path'
975 >>> SecurityValidator.validate_url('https://example.com/path?query=value')
976 'https://example.com/path?query=value'
978 Empty URL handling:
980 >>> SecurityValidator.validate_url('')
981 Traceback (most recent call last):
982 ...
983 ValueError: URL cannot be empty
985 Length validation:
987 >>> long_url = 'https://example.com/' + 'a' * 2100
988 >>> SecurityValidator.validate_url(long_url)
989 Traceback (most recent call last):
990 ...
991 ValueError: URL exceeds maximum length of 2048
993 Scheme validation:
995 >>> SecurityValidator.validate_url('ftp://example.com')
996 Traceback (most recent call last):
997 ...
998 ValueError: URL must start with one of: http://, https://, ws://, wss://
999 >>> SecurityValidator.validate_url('file:///etc/passwd')
1000 Traceback (most recent call last):
1001 ...
1002 ValueError: URL must start with one of: http://, https://, ws://, wss://
1003 >>> SecurityValidator.validate_url('javascript:alert(1)')
1004 Traceback (most recent call last):
1005 ...
1006 ValueError: URL must start with one of: http://, https://, ws://, wss://
1007 >>> SecurityValidator.validate_url('data:text/plain,hello')
1008 Traceback (most recent call last):
1009 ...
1010 ValueError: URL must start with one of: http://, https://, ws://, wss://
1011 >>> SecurityValidator.validate_url('vbscript:alert(1)')
1012 Traceback (most recent call last):
1013 ...
1014 ValueError: URL must start with one of: http://, https://, ws://, wss://
1015 >>> SecurityValidator.validate_url('about:blank')
1016 Traceback (most recent call last):
1017 ...
1018 ValueError: URL must start with one of: http://, https://, ws://, wss://
1019 >>> SecurityValidator.validate_url('chrome://settings')
1020 Traceback (most recent call last):
1021 ...
1022 ValueError: URL must start with one of: http://, https://, ws://, wss://
1023 >>> SecurityValidator.validate_url('mailto:test@example.com')
1024 Traceback (most recent call last):
1025 ...
1026 ValueError: URL must start with one of: http://, https://, ws://, wss://
1028 IPv6 URL blocking:
1030 >>> SecurityValidator.validate_url('https://[::1]:8080/')
1031 Traceback (most recent call last):
1032 ...
1033 ValueError: URL contains IPv6 address which is not supported
1034 >>> SecurityValidator.validate_url('https://[2001:db8::1]/')
1035 Traceback (most recent call last):
1036 ...
1037 ValueError: URL contains IPv6 address which is not supported
1039 Protocol-relative URL blocking:
1041 >>> SecurityValidator.validate_url('//example.com/path')
1042 Traceback (most recent call last):
1043 ...
1044 ValueError: URL must start with one of: http://, https://, ws://, wss://
1046 Line break injection:
1048 >>> SecurityValidator.validate_url('https://example.com\\rHost: evil.com')
1049 Traceback (most recent call last):
1050 ...
1051 ValueError: URL contains line breaks which are not allowed
1052 >>> SecurityValidator.validate_url('https://example.com\\nHost: evil.com')
1053 Traceback (most recent call last):
1054 ...
1055 ValueError: URL contains line breaks which are not allowed
1057 Space validation:
1059 >>> SecurityValidator.validate_url('https://exam ple.com')
1060 Traceback (most recent call last):
1061 ...
1062 ValueError: URL contains spaces which are not allowed in URLs
1063 >>> SecurityValidator.validate_url('https://example.com/path?query=hello world')
1064 'https://example.com/path?query=hello world'
1066 Malformed URLs:
1068 >>> SecurityValidator.validate_url('https://')
1069 Traceback (most recent call last):
1070 ...
1071 ValueError: URL is not a valid URL
1072 >>> SecurityValidator.validate_url('not-a-url')
1073 Traceback (most recent call last):
1074 ...
1075 ValueError: URL must start with one of: http://, https://, ws://, wss://
1077 Restricted IP addresses:
1079 >>> SecurityValidator.validate_url('https://0.0.0.0/')
1080 Traceback (most recent call last):
1081 ...
1082 ValueError: URL contains invalid IP address (0.0.0.0)
1083 >>> SecurityValidator.validate_url('https://169.254.169.254/') # doctest: +ELLIPSIS
1084 Traceback (most recent call last):
1085 ...
1086 ValueError: URL contains IP address blocked by SSRF protection ...
1088 Invalid port numbers:
1090 >>> SecurityValidator.validate_url('https://example.com:0/')
1091 Traceback (most recent call last):
1092 ...
1093 ValueError: URL contains invalid port number
1094 >>> try:
1095 ... SecurityValidator.validate_url('https://example.com:65536/')
1096 ... except ValueError as e:
1097 ... 'Port out of range' in str(e) or 'invalid port' in str(e)
1098 True
1100 Credentials in URL:
1102 >>> SecurityValidator.validate_url('https://user:pass@example.com/')
1103 Traceback (most recent call last):
1104 ...
1105 ValueError: URL contains credentials which are not allowed
1106 >>> SecurityValidator.validate_url('https://user@example.com/')
1107 Traceback (most recent call last):
1108 ...
1109 ValueError: URL contains credentials which are not allowed
1111 XSS patterns in URLs:
1113 >>> SecurityValidator.validate_url('https://example.com/<script>')
1114 Traceback (most recent call last):
1115 ...
1116 ValueError: URL contains HTML tags that may cause security issues
1117 >>> SecurityValidator.validate_url('https://example.com?param=javascript:alert(1)')
1118 Traceback (most recent call last):
1119 ...
1120 ValueError: URL contains unsupported or potentially dangerous protocol
1121 """
1122 if not value:
1123 raise ValueError(f"{field_name} cannot be empty")
1125 # Length check
1126 if len(value) > cls.MAX_URL_LENGTH:
1127 raise ValueError(f"{field_name} exceeds maximum length of {cls.MAX_URL_LENGTH}")
1129 # Check allowed schemes
1130 allowed_schemes = cls.ALLOWED_URL_SCHEMES
1131 if not any(value.lower().startswith(scheme.lower()) for scheme in allowed_schemes):
1132 raise ValueError(f"{field_name} must start with one of: {', '.join(allowed_schemes)}")
1134 # Block dangerous URL patterns (uses precompiled regex list)
1135 for pattern in _DANGEROUS_URL_PATTERNS:
1136 if pattern.search(value):
1137 raise ValueError(f"{field_name} contains unsupported or potentially dangerous protocol")
1139 # Block IPv6 URLs (URLs with square brackets)
1140 if "[" in value or "]" in value:
1141 raise ValueError(f"{field_name} contains IPv6 address which is not supported")
1143 # Block protocol-relative URLs
1144 if value.startswith("//"):
1145 raise ValueError(f"{field_name} contains protocol-relative URL which is not supported")
1147 # Check for CRLF injection
1148 if "\r" in value or "\n" in value:
1149 raise ValueError(f"{field_name} contains line breaks which are not allowed")
1151 # Check for spaces in domain
1152 if " " in value.split("?", maxsplit=1)[0]: # Check only in the URL part, not query string
1153 raise ValueError(f"{field_name} contains spaces which are not allowed in URLs")
1155 # Basic URL structure validation
1156 try:
1157 result = urlparse(value)
1158 if not all([result.scheme, result.netloc]):
1159 raise ValueError(f"{field_name} is not a valid URL")
1161 # Additional validation: ensure netloc doesn't contain brackets (double-check)
1162 if "[" in result.netloc or "]" in result.netloc:
1163 raise ValueError(f"{field_name} contains IPv6 address which is not supported")
1165 # SSRF Protection: Block dangerous IP addresses and hostnames
1166 hostname = result.hostname
1167 if hostname:
1168 # Always block 0.0.0.0 (all interfaces) regardless of SSRF settings
1169 if hostname == "0.0.0.0": # nosec B104 - we're blocking this for security
1170 raise ValueError(f"{field_name} contains invalid IP address (0.0.0.0)")
1172 # Apply SSRF protection if enabled
1173 if settings.ssrf_protection_enabled:
1174 cls._validate_ssrf(hostname, field_name)
1176 # Validate port number
1177 if result.port is not None:
1178 if result.port < 1 or result.port > 65535:
1179 raise ValueError(f"{field_name} contains invalid port number")
1181 # Check for credentials in URL
1182 if result.username or result.password:
1183 raise ValueError(f"{field_name} contains credentials which are not allowed")
1185 # Check for XSS patterns in the entire URL
1186 if re.search(cls.DANGEROUS_HTML_PATTERN, value, re.IGNORECASE):
1187 raise ValueError(f"{field_name} contains HTML tags that may cause security issues")
1189 if re.search(cls.DANGEROUS_JS_PATTERN, value, re.IGNORECASE):
1190 raise ValueError(f"{field_name} contains script patterns that may cause security issues")
1192 except ValueError:
1193 # Re-raise ValueError as-is
1194 raise
1195 except Exception:
1196 raise ValueError(f"{field_name} is not a valid URL")
1198 return value
1200 @classmethod
1201 def _validate_ssrf(cls, hostname: str, field_name: str) -> None:
1202 """Validate hostname/IP against SSRF protection rules.
1204 This method implements configurable SSRF (Server-Side Request Forgery) protection
1205 to prevent the gateway from being used to access internal resources or cloud
1206 metadata services.
1208 Args:
1209 hostname (str): The hostname or IP address to validate.
1210 field_name (str): Name of field being validated (for error messages).
1212 Raises:
1213 ValueError: If the hostname/IP is blocked by SSRF protection rules.
1215 Configuration (via settings):
1216 - ssrf_protection_enabled: Master switch (must be True for this to be called)
1217 - ssrf_blocked_networks: CIDR ranges always blocked (e.g., cloud metadata)
1218 - ssrf_blocked_hosts: Hostnames always blocked
1219 - ssrf_allow_localhost: If False, blocks 127.0.0.0/8 and localhost
1220 - ssrf_allow_private_networks: If False, blocks RFC 1918 private ranges
1221 - ssrf_allowed_networks: Optional CIDR allowlist for private ranges
1223 Examples:
1224 Cloud metadata (always blocked):
1226 >>> from unittest.mock import patch, MagicMock
1227 >>> mock_settings = MagicMock()
1228 >>> mock_settings.ssrf_protection_enabled = True
1229 >>> mock_settings.ssrf_blocked_networks = ["169.254.169.254/32"]
1230 >>> mock_settings.ssrf_blocked_hosts = ["metadata.google.internal"]
1231 >>> mock_settings.ssrf_allow_localhost = True
1232 >>> mock_settings.ssrf_allow_private_networks = True
1233 >>> with patch('mcpgateway.common.validators.settings', mock_settings):
1234 ... try:
1235 ... SecurityValidator._validate_ssrf('169.254.169.254', 'URL')
1236 ... except ValueError as e:
1237 ... 'blocked by SSRF protection' in str(e)
1238 True
1240 Localhost (configurable):
1242 >>> mock_settings.ssrf_allow_localhost = False
1243 >>> with patch('mcpgateway.common.validators.settings', mock_settings):
1244 ... try:
1245 ... SecurityValidator._validate_ssrf('127.0.0.1', 'URL')
1246 ... except ValueError as e:
1247 ... 'localhost' in str(e).lower()
1248 True
1250 Public IPs (always allowed):
1252 >>> mock_settings.ssrf_allow_localhost = True
1253 >>> mock_settings.ssrf_allow_private_networks = True
1254 >>> mock_settings.ssrf_allowed_networks = []
1255 >>> with patch('mcpgateway.common.validators.settings', mock_settings):
1256 ... SecurityValidator._validate_ssrf('8.8.8.8', 'URL') # Should not raise
1257 """
1258 # Normalize hostname: lowercase, strip trailing dots (DNS FQDN notation)
1259 hostname_normalized = hostname.lower().rstrip(".")
1261 # Check blocked hostnames (case-insensitive, normalized)
1262 for blocked_host in settings.ssrf_blocked_hosts:
1263 blocked_normalized = blocked_host.lower().rstrip(".")
1264 if hostname_normalized == blocked_normalized:
1265 raise ValueError(f"{field_name} contains blocked hostname '{hostname}' (SSRF protection)")
1267 # Resolve hostname to IP for network-based checks
1268 # Uses getaddrinfo to check ALL resolved addresses (A and AAAA records)
1269 ip_addresses: list = []
1270 try:
1271 # Try to parse as IP address directly
1272 ip_addresses = [ipaddress.ip_address(hostname)]
1273 except ValueError:
1274 # It's a hostname, resolve ALL addresses (IPv4 and IPv6)
1275 try:
1276 # getaddrinfo returns all A/AAAA records
1277 addr_info = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
1278 for _, _, _, _, sockaddr in addr_info:
1279 try:
1280 ip_addresses.append(ipaddress.ip_address(sockaddr[0]))
1281 except ValueError:
1282 continue
1283 except (socket.gaierror, socket.herror):
1284 # DNS resolution failed
1285 if settings.ssrf_dns_fail_closed:
1286 raise ValueError(f"{field_name} DNS resolution failed and SSRF_DNS_FAIL_CLOSED is enabled")
1287 # Fail open: allow through (hostname blocking above catches known dangerous hostnames)
1288 return
1290 if not ip_addresses:
1291 if settings.ssrf_dns_fail_closed:
1292 raise ValueError(f"{field_name} DNS resolution returned no addresses and SSRF_DNS_FAIL_CLOSED is enabled")
1293 return
1295 # Check ALL resolved addresses - if ANY is blocked, reject the request
1296 for ip_addr in ip_addresses:
1297 # Check against blocked networks (always blocked regardless of other settings)
1298 for network_str in settings.ssrf_blocked_networks:
1299 try:
1300 network = ipaddress.ip_network(network_str, strict=False)
1301 except ValueError:
1302 # Invalid network in config - log and skip
1303 logger.warning(f"Invalid CIDR in ssrf_blocked_networks: {network_str}")
1304 continue
1306 if ip_addr in network:
1307 raise ValueError(f"{field_name} contains IP address blocked by SSRF protection (network: {network_str})")
1309 # Check localhost/loopback (if not allowed)
1310 if not settings.ssrf_allow_localhost:
1311 if ip_addr.is_loopback or hostname_normalized in ("localhost", "localhost.localdomain"):
1312 raise ValueError(f"{field_name} contains localhost address which is blocked by SSRF protection")
1314 # Check private networks (if not allowed)
1315 if not settings.ssrf_allow_private_networks:
1316 if ip_addr.is_private and not ip_addr.is_loopback:
1317 allowed_private = False
1318 allowed_networks = getattr(settings, "ssrf_allowed_networks", []) or []
1319 for network_str in allowed_networks:
1320 try:
1321 network = ipaddress.ip_network(network_str, strict=False)
1322 except ValueError:
1323 logger.warning(f"Invalid CIDR in ssrf_allowed_networks: {network_str}")
1324 continue
1325 if ip_addr in network:
1326 allowed_private = True
1327 break
1329 if not allowed_private:
1330 raise ValueError(f"{field_name} contains private network address which is blocked by SSRF protection")
1332 @classmethod
1333 def validate_no_xss(cls, value: str, field_name: str) -> None:
1334 """
1335 Validate that a string does not contain XSS patterns.
1337 Args:
1338 value (str): Value to validate.
1339 field_name (str): Name of the field being validated.
1341 Raises:
1342 ValueError: If the value contains XSS patterns.
1344 Examples:
1345 Safe strings pass validation:
1347 >>> SecurityValidator.validate_no_xss('Hello World', 'test_field')
1348 >>> SecurityValidator.validate_no_xss('User: admin@example.com', 'email')
1349 >>> SecurityValidator.validate_no_xss('Price: $10.99', 'price')
1351 Empty/None strings are considered safe:
1353 >>> SecurityValidator.validate_no_xss('', 'empty_field')
1354 >>> SecurityValidator.validate_no_xss(None, 'none_field') #doctest: +SKIP
1356 Dangerous HTML tags trigger validation errors:
1358 >>> SecurityValidator.validate_no_xss('<script>alert(1)</script>', 'test_field')
1359 Traceback (most recent call last):
1360 ...
1361 ValueError: test_field contains HTML tags that may cause security issues
1362 >>> SecurityValidator.validate_no_xss('<iframe src="evil.com"></iframe>', 'content')
1363 Traceback (most recent call last):
1364 ...
1365 ValueError: content contains HTML tags that may cause security issues
1366 >>> SecurityValidator.validate_no_xss('<object data="malware.swf"></object>', 'data')
1367 Traceback (most recent call last):
1368 ...
1369 ValueError: data contains HTML tags that may cause security issues
1370 >>> SecurityValidator.validate_no_xss('<embed src="evil.swf">', 'embed')
1371 Traceback (most recent call last):
1372 ...
1373 ValueError: embed contains HTML tags that may cause security issues
1374 >>> SecurityValidator.validate_no_xss('<link rel="stylesheet" href="evil.css">', 'style')
1375 Traceback (most recent call last):
1376 ...
1377 ValueError: style contains HTML tags that may cause security issues
1378 >>> SecurityValidator.validate_no_xss('<meta http-equiv="refresh" content="0;url=evil.com">', 'meta')
1379 Traceback (most recent call last):
1380 ...
1381 ValueError: meta contains HTML tags that may cause security issues
1382 >>> SecurityValidator.validate_no_xss('<base href="http://evil.com">', 'base')
1383 Traceback (most recent call last):
1384 ...
1385 ValueError: base contains HTML tags that may cause security issues
1386 >>> SecurityValidator.validate_no_xss('<form action="evil.php">', 'form')
1387 Traceback (most recent call last):
1388 ...
1389 ValueError: form contains HTML tags that may cause security issues
1390 >>> SecurityValidator.validate_no_xss('<img src="x" onerror="alert(1)">', 'image')
1391 Traceback (most recent call last):
1392 ...
1393 ValueError: image contains HTML tags that may cause security issues
1394 >>> SecurityValidator.validate_no_xss('<svg onload="alert(1)"></svg>', 'svg')
1395 Traceback (most recent call last):
1396 ...
1397 ValueError: svg contains HTML tags that may cause security issues
1398 >>> SecurityValidator.validate_no_xss('<video src="x" onerror="alert(1)"></video>', 'video')
1399 Traceback (most recent call last):
1400 ...
1401 ValueError: video contains HTML tags that may cause security issues
1402 >>> SecurityValidator.validate_no_xss('<audio src="x" onerror="alert(1)"></audio>', 'audio')
1403 Traceback (most recent call last):
1404 ...
1405 ValueError: audio contains HTML tags that may cause security issues
1406 """
1407 if not value:
1408 return # Empty values are considered safe
1409 # Check for dangerous HTML tags
1410 if re.search(cls.DANGEROUS_HTML_PATTERN, value, re.IGNORECASE):
1411 raise ValueError(f"{field_name} contains HTML tags that may cause security issues")
1413 @classmethod
1414 def validate_json_depth(
1415 cls,
1416 obj: object,
1417 max_depth: int | None = None,
1418 current_depth: int = 0,
1419 ) -> None:
1420 """Validate that a JSON‑like structure does not exceed a depth limit.
1422 A *depth* is counted **only** when we enter a container (`dict` or
1423 `list`). Primitive values (`str`, `int`, `bool`, `None`, etc.) do not
1424 increase the depth, but an *empty* container still counts as one level.
1426 Args:
1427 obj: Any Python object to inspect recursively.
1428 max_depth: Maximum allowed depth (defaults to
1429 :pyattr:`SecurityValidator.MAX_JSON_DEPTH`).
1430 current_depth: Internal recursion counter. **Do not** set this
1431 from user code.
1433 Raises:
1434 ValueError: If the nesting level exceeds *max_depth*.
1436 Examples:
1437 Simple flat dictionary – depth 1: ::
1439 >>> SecurityValidator.validate_json_depth({'name': 'Alice'})
1441 Nested dict – depth 2: ::
1443 >>> SecurityValidator.validate_json_depth(
1444 ... {'user': {'name': 'Alice'}}
1445 ... )
1447 Mixed dict/list – depth 3: ::
1449 >>> SecurityValidator.validate_json_depth(
1450 ... {'users': [{'name': 'Alice', 'meta': {'age': 30}}]}
1451 ... )
1453 At 10 levels of nesting – allowed: ::
1455 >>> deep_10 = {'1': {'2': {'3': {'4': {'5': {'6': {'7': {'8':
1456 ... {'9': {'10': 'end'}}}}}}}}}}
1457 >>> SecurityValidator.validate_json_depth(deep_10)
1459 At new default limit (30) – allowed: ::
1461 >>> deep_30 = {'1': {'2': {'3': {'4': {'5': {'6': {'7': {'8':
1462 ... {'9': {'10': {'11': {'12': {'13': {'14': {'15': {'16':
1463 ... {'17': {'18': {'19': {'20': {'21': {'22': {'23': {'24':
1464 ... {'25': {'26': {'27': {'28': {'29': {'30': 'end'}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}
1465 >>> SecurityValidator.validate_json_depth(deep_30)
1467 One level deeper – rejected: ::
1469 >>> deep_31 = {'1': {'2': {'3': {'4': {'5': {'6': {'7': {'8':
1470 ... {'9': {'10': {'11': {'12': {'13': {'14': {'15': {'16':
1471 ... {'17': {'18': {'19': {'20': {'21': {'22': {'23': {'24':
1472 ... {'25': {'26': {'27': {'28': {'29': {'30': {'31': 'end'}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}
1473 >>> SecurityValidator.validate_json_depth(deep_31)
1474 Traceback (most recent call last):
1475 ...
1476 ValueError: JSON structure exceeds maximum depth of 30
1477 """
1478 if max_depth is None:
1479 max_depth = cls.MAX_JSON_DEPTH
1481 # Only containers count toward depth; primitives are ignored
1482 if not isinstance(obj, (dict, list)):
1483 return
1485 next_depth = current_depth + 1
1486 if next_depth > max_depth:
1487 raise ValueError(f"JSON structure exceeds maximum depth of {max_depth}")
1489 if isinstance(obj, dict):
1490 for value in obj.values():
1491 cls.validate_json_depth(value, max_depth, next_depth)
1492 else: # obj is a list
1493 for item in obj:
1494 cls.validate_json_depth(item, max_depth, next_depth)
1496 @classmethod
1497 def validate_mime_type(cls, value: str) -> str:
1498 """Validate MIME type format
1500 Args:
1501 value (str): Value to validate
1503 Returns:
1504 str: Value if acceptable
1506 Raises:
1507 ValueError: When input is not acceptable
1509 Examples:
1510 Empty/None handling:
1512 >>> SecurityValidator.validate_mime_type('')
1513 ''
1514 >>> SecurityValidator.validate_mime_type(None) #doctest: +SKIP
1516 Valid standard MIME types:
1518 >>> SecurityValidator.validate_mime_type('text/plain')
1519 'text/plain'
1520 >>> SecurityValidator.validate_mime_type('application/json')
1521 'application/json'
1522 >>> SecurityValidator.validate_mime_type('image/jpeg')
1523 'image/jpeg'
1524 >>> SecurityValidator.validate_mime_type('text/html')
1525 'text/html'
1526 >>> SecurityValidator.validate_mime_type('application/pdf')
1527 'application/pdf'
1529 Valid vendor-specific MIME types:
1531 >>> SecurityValidator.validate_mime_type('application/x-custom')
1532 'application/x-custom'
1533 >>> SecurityValidator.validate_mime_type('text/x-log')
1534 'text/x-log'
1536 Valid MIME types with suffixes:
1538 >>> SecurityValidator.validate_mime_type('application/vnd.api+json')
1539 'application/vnd.api+json'
1540 >>> SecurityValidator.validate_mime_type('image/svg+xml')
1541 'image/svg+xml'
1543 Valid MIME types with parameters:
1545 >>> SecurityValidator.validate_mime_type('application/json; charset=utf-8')
1546 'application/json; charset=utf-8'
1547 >>> SecurityValidator.validate_mime_type('text/plain; charset=utf-8')
1548 'text/plain; charset=utf-8'
1550 Invalid MIME type formats:
1552 >>> SecurityValidator.validate_mime_type('invalid')
1553 Traceback (most recent call last):
1554 ...
1555 ValueError: Invalid MIME type format
1556 >>> SecurityValidator.validate_mime_type('text/')
1557 Traceback (most recent call last):
1558 ...
1559 ValueError: Invalid MIME type format
1560 >>> SecurityValidator.validate_mime_type('/plain')
1561 Traceback (most recent call last):
1562 ...
1563 ValueError: Invalid MIME type format
1564 >>> SecurityValidator.validate_mime_type('text//plain')
1565 Traceback (most recent call last):
1566 ...
1567 ValueError: Invalid MIME type format
1568 >>> SecurityValidator.validate_mime_type('text/plain/extra')
1569 Traceback (most recent call last):
1570 ...
1571 ValueError: Invalid MIME type format
1572 >>> SecurityValidator.validate_mime_type('text plain')
1573 Traceback (most recent call last):
1574 ...
1575 ValueError: Invalid MIME type format
1576 >>> SecurityValidator.validate_mime_type('<text/plain>')
1577 Traceback (most recent call last):
1578 ...
1579 ValueError: Invalid MIME type format
1581 Disallowed MIME types (not in whitelist - line 620):
1583 >>> try:
1584 ... SecurityValidator.validate_mime_type('application/evil')
1585 ... except ValueError as e:
1586 ... 'not in the allowed list' in str(e)
1587 True
1588 >>> try:
1589 ... SecurityValidator.validate_mime_type('text/evil')
1590 ... except ValueError as e:
1591 ... 'not in the allowed list' in str(e)
1592 True
1594 Test MIME type with parameters:
1596 >>> try:
1597 ... SecurityValidator.validate_mime_type('application/evil; charset=utf-8')
1598 ... except ValueError as e:
1599 ... 'not in the allowed list' in str(e)
1600 True
1601 """
1602 if not value:
1603 return value
1605 # Basic MIME type pattern (uses precompiled regex)
1606 if not _MIME_TYPE_RE.match(value):
1607 raise ValueError("Invalid MIME type format")
1609 # Common safe MIME types
1610 safe_mime_types = settings.validation_allowed_mime_types
1611 base_type = value.split(";", 1)[0].strip()
1612 if value not in safe_mime_types and base_type not in safe_mime_types:
1613 # Allow x- vendor types and + suffixes
1614 if not (base_type.startswith("application/x-") or base_type.startswith("text/x-") or "+" in base_type):
1615 raise ValueError(f"MIME type '{value}' is not in the allowed list")
1617 return value
1619 @classmethod
1620 def validate_shell_parameter(cls, value: str) -> str:
1621 """Validate and escape shell parameters to prevent command injection.
1623 Args:
1624 value (str): Shell parameter to validate
1626 Returns:
1627 str: Validated/escaped parameter
1629 Raises:
1630 ValueError: If parameter contains dangerous characters in strict mode
1632 Examples:
1633 >>> SecurityValidator.validate_shell_parameter('safe_param')
1634 'safe_param'
1635 >>> SecurityValidator.validate_shell_parameter('param with spaces')
1636 'param with spaces'
1637 """
1638 if not isinstance(value, str):
1639 raise ValueError("Parameter must be string")
1641 # Check for dangerous patterns (uses precompiled regex)
1642 if _SHELL_DANGEROUS_CHARS_RE.search(value):
1643 # Check if validation is strict
1644 strict_mode = getattr(settings, "validation_strict", True)
1645 if strict_mode:
1646 raise ValueError("Parameter contains shell metacharacters")
1647 # In non-strict mode, escape using shlex
1648 return shlex.quote(value)
1650 return value
1652 @classmethod
1653 def validate_path(cls, path: str, allowed_roots: Optional[List[str]] = None) -> str:
1654 """Validate and normalize file paths to prevent directory traversal.
1656 Args:
1657 path (str): File path to validate
1658 allowed_roots (Optional[List[str]]): List of allowed root directories
1660 Returns:
1661 str: Validated and normalized path
1663 Raises:
1664 ValueError: If path contains traversal attempts or is outside allowed roots
1666 Examples:
1667 >>> SecurityValidator.validate_path('/safe/path')
1668 '/safe/path'
1669 >>> SecurityValidator.validate_path('http://example.com/file')
1670 'http://example.com/file'
1671 """
1672 if not isinstance(path, str):
1673 raise ValueError("Path must be string")
1675 # Skip validation for URI schemes (http://, plugin://, etc.) (uses precompiled regex)
1676 if _URI_SCHEME_RE.match(path):
1677 return path
1679 try:
1680 p = Path(path)
1681 # Check for path traversal
1682 if ".." in p.parts:
1683 raise ValueError("Path traversal detected")
1685 resolved_path = p.resolve()
1687 # Check against allowed roots
1688 if allowed_roots:
1689 allowed = any(str(resolved_path).startswith(str(Path(root).resolve())) for root in allowed_roots)
1690 if not allowed:
1691 raise ValueError("Path outside allowed roots")
1693 return str(resolved_path)
1694 except (OSError, ValueError) as e:
1695 raise ValueError(f"Invalid path: {e}")
1697 @classmethod
1698 def validate_sql_parameter(cls, value: str) -> str:
1699 """Validate SQL parameters to prevent SQL injection attacks.
1701 Args:
1702 value (str): SQL parameter to validate
1704 Returns:
1705 str: Validated/escaped parameter
1707 Raises:
1708 ValueError: If parameter contains SQL injection patterns in strict mode
1710 Examples:
1711 >>> SecurityValidator.validate_sql_parameter('safe_value')
1712 'safe_value'
1713 >>> SecurityValidator.validate_sql_parameter('123')
1714 '123'
1715 """
1716 if not isinstance(value, str):
1717 return value
1719 # Check for SQL injection patterns (uses precompiled regex list)
1720 for pattern in _SQL_PATTERNS:
1721 if pattern.search(value):
1722 if getattr(settings, "validation_strict", True):
1723 raise ValueError("Parameter contains SQL injection patterns")
1724 # Basic escaping
1725 value = value.replace("'", "''").replace('"', '""')
1727 return value
1729 @classmethod
1730 def validate_parameter_length(cls, value: str, max_length: Optional[int] = None) -> str:
1731 """Validate parameter length against configured limits.
1733 Args:
1734 value (str): Parameter to validate
1735 max_length (int): Maximum allowed length
1737 Returns:
1738 str: Parameter if within length limits
1740 Raises:
1741 ValueError: If parameter exceeds maximum length
1743 Examples:
1744 >>> SecurityValidator.validate_parameter_length('short', 10)
1745 'short'
1746 """
1747 max_len = max_length or getattr(settings, "max_param_length", 10000)
1748 if len(value) > max_len:
1749 raise ValueError(f"Parameter exceeds maximum length of {max_len}")
1750 return value
1752 @classmethod
1753 def sanitize_text(cls, text: str) -> str:
1754 """Remove control characters and ANSI escape sequences from text.
1756 Args:
1757 text (str): Text to sanitize
1759 Returns:
1760 str: Sanitized text with control characters removed
1762 Examples:
1763 >>> SecurityValidator.sanitize_text('Hello World')
1764 'Hello World'
1765 >>> SecurityValidator.sanitize_text('Text\x1b[31mwith\x1b[0mcolors')
1766 'Textwithcolors'
1767 """
1768 if not isinstance(text, str):
1769 return text
1771 # Remove ANSI escape sequences (uses precompiled regex)
1772 text = _ANSI_ESCAPE_RE.sub("", text)
1773 # Remove control characters except newlines and tabs (uses precompiled regex)
1774 sanitized = _CONTROL_CHARS_RE.sub("", text)
1775 return sanitized
1777 @classmethod
1778 def sanitize_json_response(cls, data: Any) -> Any:
1779 """Recursively sanitize JSON response data by removing control characters.
1781 Args:
1782 data (Any): JSON data structure to sanitize
1784 Returns:
1785 Any: Sanitized data structure with same type as input
1787 Examples:
1788 >>> SecurityValidator.sanitize_json_response('clean text')
1789 'clean text'
1790 >>> SecurityValidator.sanitize_json_response({'key': 'value'})
1791 {'key': 'value'}
1792 >>> SecurityValidator.sanitize_json_response(['item1', 'item2'])
1793 ['item1', 'item2']
1794 """
1795 if isinstance(data, str):
1796 return cls.sanitize_text(data)
1797 if isinstance(data, dict):
1798 return {k: cls.sanitize_json_response(v) for k, v in data.items()}
1799 if isinstance(data, list):
1800 return [cls.sanitize_json_response(item) for item in data]
1801 return data
1804def validate_core_url(value: str, field_name: str = "URL") -> str:
1805 """Core ContextForge URL validation entry point.
1807 This wrapper provides an explicit core-only entry point so the core
1808 processing path does not depend on plugin-framework validators.
1810 Args:
1811 value: The URL string to validate.
1812 field_name: Descriptive name for error messages.
1814 Returns:
1815 The validated URL string.
1816 """
1817 return SecurityValidator.validate_url(value, field_name)