Coverage for mcpgateway / plugins / framework / validators.py: 100%
64 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/plugins/framework/validators.py
3Copyright 2026
4SPDX-License-Identifier: Apache-2.0
5Authors: Fred Araujo
7Self-contained security validation for the plugin framework.
9Contains only the validation methods actually used by framework models
10(MCPClientConfig), with hardcoded defaults to avoid any dependency on
11mcpgateway.config.settings.
13Examples:
14 >>> SecurityValidator.validate_url("https://example.com")
15 'https://example.com'
16"""
18# Standard
19import ipaddress
20import logging
21import re
22from re import Pattern
23from urllib.parse import urlparse
25# First-Party
26from mcpgateway.plugins.framework.settings import get_ssrf_settings
28logger = logging.getLogger(__name__)
30# Defaults matching the gateway's SecurityValidator in mcpgateway/common/validators.py.
31# Keep these in sync -- test_transport_type_enum_parity guards the enum,
32# but these constants are verified by test_security_validator_url_scheme_parity.
33_ALLOWED_URL_SCHEMES = ("http://", "https://", "ws://", "wss://")
34_MAX_URL_LENGTH = 2048
36# Dangerous URL protocol patterns (matches gateway's _DANGEROUS_URL_PATTERNS)
37_DANGEROUS_URL_PATTERNS: list[Pattern[str]] = [
38 re.compile(r"javascript:", re.IGNORECASE),
39 re.compile(r"data:", re.IGNORECASE),
40 re.compile(r"vbscript:", re.IGNORECASE),
41 re.compile(r"about:", re.IGNORECASE),
42 re.compile(r"chrome:", re.IGNORECASE),
43 re.compile(r"file:", re.IGNORECASE),
44 re.compile(r"ftp:", re.IGNORECASE),
45 re.compile(r"mailto:", re.IGNORECASE),
46]
48# HTML/script XSS patterns (matches gateway's DANGEROUS_HTML_PATTERN / DANGEROUS_JS_PATTERN).
49# Keep in sync with mcpgateway/config.py validation_dangerous_html_pattern / validation_dangerous_js_pattern.
50_DANGEROUS_HTML_PATTERN = re.compile(
51 r"<(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)\b"
52 r"|</*(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)>",
53 re.IGNORECASE,
54)
55_DANGEROUS_JS_PATTERN = re.compile(
56 r"(?:^|\s|[\"'`<>=])(javascript:|vbscript:|data:\s*[^,]*[;\s]*(javascript|vbscript)|\bon[a-z]+\s*=|<\s*script\b)",
57 re.IGNORECASE,
58)
60# Private/reserved IPv4 networks blocked for SSRF protection
61_BLOCKED_NETWORKS = [
62 ipaddress.ip_network("10.0.0.0/8"),
63 ipaddress.ip_network("172.16.0.0/12"),
64 ipaddress.ip_network("192.168.0.0/16"),
65 ipaddress.ip_network("127.0.0.0/8"),
66 ipaddress.ip_network("169.254.0.0/16"), # Link-local / cloud metadata
67]
70class SecurityValidator:
71 """Security validator for the plugin framework.
73 Mirrors the SSRF-hardening checks from the gateway's SecurityValidator
74 without depending on mcpgateway.config.settings.
76 Examples:
77 >>> SecurityValidator.validate_url("https://example.com")
78 'https://example.com'
79 """
81 @staticmethod
82 def validate_url(value: str, field_name: str = "URL") -> str:
83 """Validate URLs for allowed schemes, SSRF protection, and safe structure.
85 Credentials, IPv6, dangerous protocols, CRLF injection, spaces in
86 domain, and port range are always enforced. SSRF IP-range blocking
87 (private/reserved networks) is gated by the ``ssrf_protection_enabled``
88 plugin setting.
90 Args:
91 value: URL string to validate.
92 field_name: Name of the field being validated (for error messages).
94 Returns:
95 The validated URL string.
97 Raises:
98 ValueError: If the URL is empty, too long, uses a disallowed
99 scheme, contains credentials, targets a blocked IP (when SSRF
100 protection is enabled), or is structurally invalid.
102 Examples:
103 >>> SecurityValidator.validate_url("https://example.com")
104 'https://example.com'
105 >>> SecurityValidator.validate_url("https://example.com:9000/sse")
106 'https://example.com:9000/sse'
107 >>> SecurityValidator.validate_url("")
108 Traceback (most recent call last):
109 ...
110 ValueError: URL cannot be empty
111 >>> SecurityValidator.validate_url("ftp://example.com")
112 Traceback (most recent call last):
113 ...
114 ValueError: URL must start with one of: http://, https://, ws://, wss://
115 >>> SecurityValidator.validate_url("https://user:pass@example.com/")
116 Traceback (most recent call last):
117 ...
118 ValueError: URL contains credentials which are not allowed
119 >>> SecurityValidator.validate_url("https://[::1]:8080/")
120 Traceback (most recent call last):
121 ...
122 ValueError: URL contains IPv6 address which is not supported
123 >>> SecurityValidator.validate_url("https://0.0.0.0/")
124 Traceback (most recent call last):
125 ...
126 ValueError: URL contains invalid IP address (0.0.0.0)
127 >>> SecurityValidator.validate_url("https://example.com/<script>alert(1)</script>")
128 Traceback (most recent call last):
129 ...
130 ValueError: URL contains HTML tags that may cause security issues
131 """
132 if not value:
133 raise ValueError(f"{field_name} cannot be empty")
135 if len(value) > _MAX_URL_LENGTH:
136 raise ValueError(f"{field_name} exceeds maximum length of {_MAX_URL_LENGTH}")
138 if not any(value.lower().startswith(scheme) for scheme in _ALLOWED_URL_SCHEMES):
139 raise ValueError(f"{field_name} must start with one of: {', '.join(_ALLOWED_URL_SCHEMES)}")
141 # Block dangerous URL patterns
142 for pattern in _DANGEROUS_URL_PATTERNS:
143 if pattern.search(value):
144 raise ValueError(f"{field_name} contains unsupported or potentially dangerous protocol")
146 # Block IPv6 URLs
147 if "[" in value or "]" in value:
148 raise ValueError(f"{field_name} contains IPv6 address which is not supported")
150 # Block CRLF injection
151 if "\r" in value or "\n" in value:
152 raise ValueError(f"{field_name} contains line breaks which are not allowed")
154 # Block spaces in domain (but allow in query string)
155 if " " in value.split("?", maxsplit=1)[0]:
156 raise ValueError(f"{field_name} contains spaces which are not allowed in URLs")
158 try:
159 result = urlparse(value)
160 if not all([result.scheme, result.netloc]):
161 raise ValueError(f"{field_name} is not a valid URL")
163 # Block credentials in URL
164 if result.username or result.password:
165 raise ValueError(f"{field_name} contains credentials which are not allowed")
167 # Validate port number
168 if result.port is not None:
169 if result.port < 1 or result.port > 65535:
170 raise ValueError(f"{field_name} contains invalid port number")
172 # SSRF protection: block dangerous IP addresses (always block 0.0.0.0)
173 hostname = result.hostname
174 if hostname:
175 if hostname == "0.0.0.0": # nosec B104
176 raise ValueError(f"{field_name} contains invalid IP address (0.0.0.0)")
178 # Gate private/reserved IP blocking on plugin-specific settings.
179 if get_ssrf_settings().ssrf_protection_enabled:
180 try:
181 addr = ipaddress.ip_address(hostname)
182 for network in _BLOCKED_NETWORKS:
183 if addr in network:
184 raise ValueError(f"{field_name} contains IP address blocked by SSRF protection ({hostname})")
185 except ValueError as ip_err:
186 if "blocked by SSRF" in str(ip_err):
187 raise
188 # Not a valid IP — it's a hostname, which is fine
190 # Block HTML tags and script/event-handler patterns in URL
191 if _DANGEROUS_HTML_PATTERN.search(value):
192 raise ValueError(f"{field_name} contains HTML tags that may cause security issues")
193 if _DANGEROUS_JS_PATTERN.search(value):
194 raise ValueError(f"{field_name} contains script patterns that may cause security issues")
196 except ValueError:
197 raise
198 except Exception:
199 raise ValueError(f"{field_name} is not a valid URL")
201 return value
204def validate_plugin_url(value: str, field_name: str = "URL") -> str:
205 """Plugin framework URL validation entry point.
207 Args:
208 value: The URL string to validate.
209 field_name: Descriptive name for error messages.
211 Returns:
212 The validated URL string.
213 """
214 return SecurityValidator.validate_url(value, field_name)