Coverage for mcpgateway / plugins / framework / validators.py: 100%

64 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/plugins/framework/validators.py 

3Copyright 2026 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Fred Araujo 

6 

7Self-contained security validation for the plugin framework. 

8 

9Contains only the validation methods actually used by framework models 

10(MCPClientConfig), with hardcoded defaults to avoid any dependency on 

11mcpgateway.config.settings. 

12 

13Examples: 

14 >>> SecurityValidator.validate_url("https://example.com") 

15 'https://example.com' 

16""" 

17 

18# Standard 

19import ipaddress 

20import logging 

21import re 

22from re import Pattern 

23from urllib.parse import urlparse 

24 

25# First-Party 

26from mcpgateway.plugins.framework.settings import get_ssrf_settings 

27 

28logger = logging.getLogger(__name__) 

29 

30# Defaults matching the gateway's SecurityValidator in mcpgateway/common/validators.py. 

31# Keep these in sync -- test_transport_type_enum_parity guards the enum, 

32# but these constants are verified by test_security_validator_url_scheme_parity. 

33_ALLOWED_URL_SCHEMES = ("http://", "https://", "ws://", "wss://") 

34_MAX_URL_LENGTH = 2048 

35 

36# Dangerous URL protocol patterns (matches gateway's _DANGEROUS_URL_PATTERNS) 

37_DANGEROUS_URL_PATTERNS: list[Pattern[str]] = [ 

38 re.compile(r"javascript:", re.IGNORECASE), 

39 re.compile(r"data:", re.IGNORECASE), 

40 re.compile(r"vbscript:", re.IGNORECASE), 

41 re.compile(r"about:", re.IGNORECASE), 

42 re.compile(r"chrome:", re.IGNORECASE), 

43 re.compile(r"file:", re.IGNORECASE), 

44 re.compile(r"ftp:", re.IGNORECASE), 

45 re.compile(r"mailto:", re.IGNORECASE), 

46] 

47 

48# HTML/script XSS patterns (matches gateway's DANGEROUS_HTML_PATTERN / DANGEROUS_JS_PATTERN). 

49# Keep in sync with mcpgateway/config.py validation_dangerous_html_pattern / validation_dangerous_js_pattern. 

50_DANGEROUS_HTML_PATTERN = re.compile( 

51 r"<(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)\b" 

52 r"|</*(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)>", 

53 re.IGNORECASE, 

54) 

55_DANGEROUS_JS_PATTERN = re.compile( 

56 r"(?:^|\s|[\"'`<>=])(javascript:|vbscript:|data:\s*[^,]*[;\s]*(javascript|vbscript)|\bon[a-z]+\s*=|<\s*script\b)", 

57 re.IGNORECASE, 

58) 

59 

60# Private/reserved IPv4 networks blocked for SSRF protection 

61_BLOCKED_NETWORKS = [ 

62 ipaddress.ip_network("10.0.0.0/8"), 

63 ipaddress.ip_network("172.16.0.0/12"), 

64 ipaddress.ip_network("192.168.0.0/16"), 

65 ipaddress.ip_network("127.0.0.0/8"), 

66 ipaddress.ip_network("169.254.0.0/16"), # Link-local / cloud metadata 

67] 

68 

69 

70class SecurityValidator: 

71 """Security validator for the plugin framework. 

72 

73 Mirrors the SSRF-hardening checks from the gateway's SecurityValidator 

74 without depending on mcpgateway.config.settings. 

75 

76 Examples: 

77 >>> SecurityValidator.validate_url("https://example.com") 

78 'https://example.com' 

79 """ 

80 

81 @staticmethod 

82 def validate_url(value: str, field_name: str = "URL") -> str: 

83 """Validate URLs for allowed schemes, SSRF protection, and safe structure. 

84 

85 Credentials, IPv6, dangerous protocols, CRLF injection, spaces in 

86 domain, and port range are always enforced. SSRF IP-range blocking 

87 (private/reserved networks) is gated by the ``ssrf_protection_enabled`` 

88 plugin setting. 

89 

90 Args: 

91 value: URL string to validate. 

92 field_name: Name of the field being validated (for error messages). 

93 

94 Returns: 

95 The validated URL string. 

96 

97 Raises: 

98 ValueError: If the URL is empty, too long, uses a disallowed 

99 scheme, contains credentials, targets a blocked IP (when SSRF 

100 protection is enabled), or is structurally invalid. 

101 

102 Examples: 

103 >>> SecurityValidator.validate_url("https://example.com") 

104 'https://example.com' 

105 >>> SecurityValidator.validate_url("https://example.com:9000/sse") 

106 'https://example.com:9000/sse' 

107 >>> SecurityValidator.validate_url("") 

108 Traceback (most recent call last): 

109 ... 

110 ValueError: URL cannot be empty 

111 >>> SecurityValidator.validate_url("ftp://example.com") 

112 Traceback (most recent call last): 

113 ... 

114 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

115 >>> SecurityValidator.validate_url("https://user:pass@example.com/") 

116 Traceback (most recent call last): 

117 ... 

118 ValueError: URL contains credentials which are not allowed 

119 >>> SecurityValidator.validate_url("https://[::1]:8080/") 

120 Traceback (most recent call last): 

121 ... 

122 ValueError: URL contains IPv6 address which is not supported 

123 >>> SecurityValidator.validate_url("https://0.0.0.0/") 

124 Traceback (most recent call last): 

125 ... 

126 ValueError: URL contains invalid IP address (0.0.0.0) 

127 >>> SecurityValidator.validate_url("https://example.com/<script>alert(1)</script>") 

128 Traceback (most recent call last): 

129 ... 

130 ValueError: URL contains HTML tags that may cause security issues 

131 """ 

132 if not value: 

133 raise ValueError(f"{field_name} cannot be empty") 

134 

135 if len(value) > _MAX_URL_LENGTH: 

136 raise ValueError(f"{field_name} exceeds maximum length of {_MAX_URL_LENGTH}") 

137 

138 if not any(value.lower().startswith(scheme) for scheme in _ALLOWED_URL_SCHEMES): 

139 raise ValueError(f"{field_name} must start with one of: {', '.join(_ALLOWED_URL_SCHEMES)}") 

140 

141 # Block dangerous URL patterns 

142 for pattern in _DANGEROUS_URL_PATTERNS: 

143 if pattern.search(value): 

144 raise ValueError(f"{field_name} contains unsupported or potentially dangerous protocol") 

145 

146 # Block IPv6 URLs 

147 if "[" in value or "]" in value: 

148 raise ValueError(f"{field_name} contains IPv6 address which is not supported") 

149 

150 # Block CRLF injection 

151 if "\r" in value or "\n" in value: 

152 raise ValueError(f"{field_name} contains line breaks which are not allowed") 

153 

154 # Block spaces in domain (but allow in query string) 

155 if " " in value.split("?", maxsplit=1)[0]: 

156 raise ValueError(f"{field_name} contains spaces which are not allowed in URLs") 

157 

158 try: 

159 result = urlparse(value) 

160 if not all([result.scheme, result.netloc]): 

161 raise ValueError(f"{field_name} is not a valid URL") 

162 

163 # Block credentials in URL 

164 if result.username or result.password: 

165 raise ValueError(f"{field_name} contains credentials which are not allowed") 

166 

167 # Validate port number 

168 if result.port is not None: 

169 if result.port < 1 or result.port > 65535: 

170 raise ValueError(f"{field_name} contains invalid port number") 

171 

172 # SSRF protection: block dangerous IP addresses (always block 0.0.0.0) 

173 hostname = result.hostname 

174 if hostname: 

175 if hostname == "0.0.0.0": # nosec B104 

176 raise ValueError(f"{field_name} contains invalid IP address (0.0.0.0)") 

177 

178 # Gate private/reserved IP blocking on plugin-specific settings. 

179 if get_ssrf_settings().ssrf_protection_enabled: 

180 try: 

181 addr = ipaddress.ip_address(hostname) 

182 for network in _BLOCKED_NETWORKS: 

183 if addr in network: 

184 raise ValueError(f"{field_name} contains IP address blocked by SSRF protection ({hostname})") 

185 except ValueError as ip_err: 

186 if "blocked by SSRF" in str(ip_err): 

187 raise 

188 # Not a valid IP — it's a hostname, which is fine 

189 

190 # Block HTML tags and script/event-handler patterns in URL 

191 if _DANGEROUS_HTML_PATTERN.search(value): 

192 raise ValueError(f"{field_name} contains HTML tags that may cause security issues") 

193 if _DANGEROUS_JS_PATTERN.search(value): 

194 raise ValueError(f"{field_name} contains script patterns that may cause security issues") 

195 

196 except ValueError: 

197 raise 

198 except Exception: 

199 raise ValueError(f"{field_name} is not a valid URL") 

200 

201 return value 

202 

203 

204def validate_plugin_url(value: str, field_name: str = "URL") -> str: 

205 """Plugin framework URL validation entry point. 

206 

207 Args: 

208 value: The URL string to validate. 

209 field_name: Descriptive name for error messages. 

210 

211 Returns: 

212 The validated URL string. 

213 """ 

214 return SecurityValidator.validate_url(value, field_name)