Coverage for mcpgateway / common / validators.py: 100%

351 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/common/validators.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti, Madhav Kandukuri 

6 

7SecurityValidator for MCP Gateway 

8This module defines the `SecurityValidator` class, which provides centralized, configurable 

9validation logic for user-generated content in MCP-based applications. 

10 

11The validator enforces strict security and structural rules across common input types such as: 

12- Display text (e.g., names, descriptions) 

13- Identifiers and tool names 

14- URIs and URLs 

15- JSON object depth 

16- Templates (including limited HTML/Jinja2) 

17- MIME types 

18 

19Key Features: 

20- Pattern-based validation using settings-defined regex for HTML/script safety 

21- Configurable max lengths and depth limits 

22- Whitelist-based URL scheme and MIME type validation 

23- Safe escaping of user-visible text fields 

24- Reusable static/class methods for field-level and form-level validation 

25 

26Intended to be used with Pydantic or similar schema-driven systems to validate and sanitize 

27user input in a consistent, centralized way. 

28 

29Dependencies: 

30- Standard Library: re, html, logging, urllib.parse 

31- First-party: `settings` from `mcpgateway.config` 

32 

33Example usage: 

34 SecurityValidator.validate_name("my_tool", field_name="Tool Name") 

35 SecurityValidator.validate_url("https://example.com") 

36 SecurityValidator.validate_json_depth({...}) 

37 

38Examples: 

39 >>> from mcpgateway.common.validators import SecurityValidator 

40 >>> SecurityValidator.sanitize_display_text('<b>Test</b>', 'test') 

41 '&lt;b&gt;Test&lt;/b&gt;' 

42 >>> SecurityValidator.validate_name('valid_name-123', 'test') 

43 'valid_name-123' 

44 >>> SecurityValidator.validate_identifier('my.test.id_123', 'test') 

45 'my.test.id_123' 

46 >>> SecurityValidator.validate_json_depth({'a': {'b': 1}}) 

47 >>> SecurityValidator.validate_json_depth({'a': 1}) 

48""" 

49 

50# Standard 

51import html 

52import ipaddress 

53import logging 

54from pathlib import Path 

55import re 

56import shlex 

57import socket 

58from typing import Any, Iterable, List, Optional, Pattern 

59from urllib.parse import urlparse 

60import uuid 

61 

62# First-Party 

63from mcpgateway.config import settings 

64 

65logger = logging.getLogger(__name__) 

66 

67# ============================================================================ 

68# Precompiled regex patterns (compiled once at module load for performance) 

69# ============================================================================ 

70# Note: Settings-based patterns (DANGEROUS_HTML_PATTERN, DANGEROUS_JS_PATTERN, 

71# NAME_PATTERN, IDENTIFIER_PATTERN, etc.) are NOT precompiled here because tests 

72# override the class attributes at runtime. Only truly static patterns are 

73# precompiled at module level. 

74 

75# Static inline patterns used multiple times 

76_HTML_SPECIAL_CHARS_RE: Pattern[str] = re.compile(r'[<>"\']') # / removed per SEP-986 

77_DANGEROUS_TEMPLATE_TAGS_RE: Pattern[str] = re.compile(r"<(script|iframe|object|embed|link|meta|base|form)\b", re.IGNORECASE) 

78_EVENT_HANDLER_RE: Pattern[str] = re.compile(r"on\w+\s*=", re.IGNORECASE) 

79_MIME_TYPE_RE: Pattern[str] = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9!#$&\-\^_+\.]*\/[a-zA-Z0-9][a-zA-Z0-9!#$&\-\^_+\.]*$") 

80_URI_SCHEME_RE: Pattern[str] = re.compile(r"^[a-zA-Z][a-zA-Z0-9+\-.]*://") 

81_SHELL_DANGEROUS_CHARS_RE: Pattern[str] = re.compile(r"[;&|`$(){}\[\]<>]") 

82_ANSI_ESCAPE_RE: Pattern[str] = re.compile(r"\x1B\[[0-9;]*[A-Za-z]") 

83_CONTROL_CHARS_RE: Pattern[str] = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]") 

84 

85# Polyglot attack patterns (precompiled with IGNORECASE) 

86_POLYGLOT_PATTERNS: List[Pattern[str]] = [ 

87 re.compile(r"['\"];.*alert\s*\(", re.IGNORECASE), 

88 re.compile(r"-->\s*<[^>]+>", re.IGNORECASE), 

89 re.compile(r"['\"].*//['\"]", re.IGNORECASE), 

90 re.compile(r"<<[A-Z]+>", re.IGNORECASE), 

91 re.compile(r"String\.fromCharCode", re.IGNORECASE), 

92 re.compile(r"javascript:.*\(", re.IGNORECASE), 

93] 

94 

95# SSTI prevention - safe scanning without regex backtracking. 

96_SSTI_DANGEROUS_SUBSTRINGS: tuple[str, ...] = ( 

97 "__", 

98 ".", 

99 "config", 

100 "self", 

101 "request", 

102 "application", 

103 "globals", 

104 "builtins", 

105 "import", 

106 "getattr", # Python getattr function 

107 "|attr", # Jinja2 attr filter (checked after whitespace normalization) 

108 "|selectattr", # Jinja2 selectattr filter (takes attribute name as arg) 

109 "|sort", # Jinja2 sort filter with attribute parameter 

110 "|map", # Jinja2 map filter with attribute parameter 

111 "attribute=", # Jinja2 filters: map(attribute=...), selectattr, sort(attribute=...) 

112 "\\x", # Hex escape sequences (e.g., \x5f for underscore) 

113 "\\u", # Unicode escape sequences (e.g., \u005f for underscore) 

114 "\\n{", # Named unicode escapes (e.g., \N{LOW LINE}) 

115 "\\0", 

116 "\\1", 

117 "\\2", 

118 "\\3", 

119 "\\4", 

120 "\\5", 

121 "\\6", 

122 "\\7", # Octal escapes 

123) 

124# Operators that enable code execution or dynamic construction 

125_SSTI_DANGEROUS_OPERATORS: tuple[str, ...] = ( 

126 "*", 

127 "/", 

128 "+", 

129 "-", 

130 "~", # Jinja2 string concatenation (can build dunder names dynamically) 

131 "[", # Bracket notation for dynamic attribute access 

132 "%", # Python string formatting (e.g., '%c' % 95 produces '_') 

133) 

134_SSTI_SIMPLE_TEMPLATE_PREFIXES: tuple[str, ...] = ("${", "#{", "%{") 

135 

136 

137def _iter_template_expressions(value: str, start: str, end: str) -> Iterable[str]: 

138 """Yield template expression contents for a start/end delimiter, skipping delimiters inside quotes. 

139 

140 Args: 

141 value (str): Template text to scan. 

142 start (str): Opening delimiter. 

143 end (str): Closing delimiter. 

144 

145 Yields: 

146 str: The template expression contents between delimiters. 

147 

148 Raises: 

149 ValueError: If an unterminated template expression is found (fail-closed behavior). 

150 """ 

151 start_len = len(start) 

152 end_len = len(end) 

153 i = 0 

154 value_len = len(value) 

155 while i <= value_len - start_len: 

156 if value.startswith(start, i): 

157 j = i + start_len 

158 in_quote: Optional[str] = None 

159 escaped = False 

160 while j <= value_len - end_len: 

161 ch = value[j] 

162 if escaped: 

163 escaped = False 

164 elif ch == "\\": 

165 escaped = True 

166 elif in_quote: 

167 if ch == in_quote: 

168 in_quote = None 

169 else: 

170 if ch in ("'", '"'): 

171 in_quote = ch 

172 elif value.startswith(end, j): 

173 yield value[i + start_len : j] 

174 i = j + end_len 

175 break 

176 j += 1 

177 else: 

178 raise ValueError("Template contains potentially dangerous expressions") 

179 else: 

180 i += 1 

181 

182 

183def _has_simple_template_expression(value: str, start: str) -> bool: 

184 """Return True if start is followed by any closing brace. 

185 

186 Uses O(n) linear scan by finding last } first, then checking prefixes. 

187 

188 Args: 

189 value (str): Template text to scan. 

190 start (str): Opening delimiter. 

191 

192 Returns: 

193 bool: True if a closing brace exists after the delimiter. 

194 """ 

195 # Find the last closing brace - if none exists, no expression can be complete 

196 last_close = value.rfind("}") 

197 if last_close == -1: 

198 return False 

199 # Check if any prefix exists before the last closing brace - O(n) single find 

200 idx = value.find(start) 

201 return idx != -1 and idx < last_close 

202 

203 

204# Dangerous URL protocol patterns (precompiled with IGNORECASE) 

205_DANGEROUS_URL_PATTERNS: List[Pattern[str]] = [ 

206 re.compile(r"javascript:", re.IGNORECASE), 

207 re.compile(r"data:", re.IGNORECASE), 

208 re.compile(r"vbscript:", re.IGNORECASE), 

209 re.compile(r"about:", re.IGNORECASE), 

210 re.compile(r"chrome:", re.IGNORECASE), 

211 re.compile(r"file:", re.IGNORECASE), 

212 re.compile(r"ftp:", re.IGNORECASE), 

213 re.compile(r"mailto:", re.IGNORECASE), 

214] 

215 

216# SQL injection patterns (precompiled with IGNORECASE) 

217_SQL_PATTERNS: List[Pattern[str]] = [ 

218 re.compile(r"[';\"\\]", re.IGNORECASE), 

219 re.compile(r"--", re.IGNORECASE), 

220 re.compile(r"/\*.*?\*/", re.IGNORECASE), 

221 re.compile(r"\b(union|select|insert|update|delete|drop|exec|execute)\b", re.IGNORECASE), 

222] 

223 

224 

225class SecurityValidator: 

226 """Configurable validation with MCP-compliant limits""" 

227 

228 # Configurable patterns (from settings) 

229 DANGEROUS_HTML_PATTERN = ( 

230 settings.validation_dangerous_html_pattern 

231 ) # Default: '<(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)\b|</*(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)>' 

232 DANGEROUS_JS_PATTERN = settings.validation_dangerous_js_pattern # Default: javascript:|vbscript:|on\w+\s*=|data:.*script 

233 ALLOWED_URL_SCHEMES = settings.validation_allowed_url_schemes # Default: ["http://", "https://", "ws://", "wss://"] 

234 

235 # Character type patterns 

236 NAME_PATTERN = settings.validation_name_pattern # Default: ^[a-zA-Z0-9_\-\s]+$ 

237 IDENTIFIER_PATTERN = settings.validation_identifier_pattern # Default: ^[a-zA-Z0-9_\-\.]+$ 

238 VALIDATION_SAFE_URI_PATTERN = settings.validation_safe_uri_pattern # Default: ^[a-zA-Z0-9_\-.:/?=&%]+$ 

239 VALIDATION_UNSAFE_URI_PATTERN = settings.validation_unsafe_uri_pattern # Default: [<>"\'\\] 

240 TOOL_NAME_PATTERN = settings.validation_tool_name_pattern # Default: ^[a-zA-Z0-9_][a-zA-Z0-9._/-]*$ (SEP-986) 

241 

242 # MCP-compliant limits (configurable) 

243 MAX_NAME_LENGTH = settings.validation_max_name_length # Default: 255 

244 MAX_DESCRIPTION_LENGTH = settings.validation_max_description_length # Default: 8192 (8KB) 

245 MAX_TEMPLATE_LENGTH = settings.validation_max_template_length # Default: 65536 

246 MAX_CONTENT_LENGTH = settings.validation_max_content_length # Default: 1048576 (1MB) 

247 MAX_JSON_DEPTH = settings.validation_max_json_depth # Default: 30 

248 MAX_URL_LENGTH = settings.validation_max_url_length # Default: 2048 

249 

250 @classmethod 

251 def sanitize_display_text(cls, value: str, field_name: str) -> str: 

252 """Ensure text is safe for display in UI by escaping special characters 

253 

254 Args: 

255 value (str): Value to validate 

256 field_name (str): Name of field being validated 

257 

258 Returns: 

259 str: Value if acceptable 

260 

261 Raises: 

262 ValueError: When input is not acceptable 

263 

264 Examples: 

265 Basic HTML escaping: 

266 

267 >>> SecurityValidator.sanitize_display_text('Hello World', 'test') 

268 'Hello World' 

269 >>> SecurityValidator.sanitize_display_text('Hello <b>World</b>', 'test') 

270 'Hello &lt;b&gt;World&lt;/b&gt;' 

271 

272 Empty/None handling: 

273 

274 >>> SecurityValidator.sanitize_display_text('', 'test') 

275 '' 

276 >>> SecurityValidator.sanitize_display_text(None, 'test') #doctest: +SKIP 

277 

278 Dangerous script patterns: 

279 

280 >>> SecurityValidator.sanitize_display_text('alert();', 'test') 

281 'alert();' 

282 >>> SecurityValidator.sanitize_display_text('javascript:alert(1)', 'test') 

283 Traceback (most recent call last): 

284 ... 

285 ValueError: test contains script patterns that may cause display issues 

286 

287 Polyglot attack patterns: 

288 

289 >>> SecurityValidator.sanitize_display_text('"; alert()', 'test') 

290 Traceback (most recent call last): 

291 ... 

292 ValueError: test contains potentially dangerous character sequences 

293 >>> SecurityValidator.sanitize_display_text('-->test', 'test') 

294 '--&gt;test' 

295 >>> SecurityValidator.sanitize_display_text('--><script>', 'test') 

296 Traceback (most recent call last): 

297 ... 

298 ValueError: test contains HTML tags that may cause display issues 

299 >>> SecurityValidator.sanitize_display_text('String.fromCharCode(65)', 'test') 

300 Traceback (most recent call last): 

301 ... 

302 ValueError: test contains potentially dangerous character sequences 

303 

304 Safe character escaping: 

305 

306 >>> SecurityValidator.sanitize_display_text('User & Admin', 'test') 

307 'User &amp; Admin' 

308 >>> SecurityValidator.sanitize_display_text('Quote: "Hello"', 'test') 

309 'Quote: &quot;Hello&quot;' 

310 >>> SecurityValidator.sanitize_display_text("Quote: 'Hello'", 'test') 

311 'Quote: &#x27;Hello&#x27;' 

312 """ 

313 if not value: 

314 return value 

315 

316 # Check for patterns that could cause display issues 

317 if re.search(cls.DANGEROUS_HTML_PATTERN, value, re.IGNORECASE): 

318 raise ValueError(f"{field_name} contains HTML tags that may cause display issues") 

319 

320 if re.search(cls.DANGEROUS_JS_PATTERN, value, re.IGNORECASE): 

321 raise ValueError(f"{field_name} contains script patterns that may cause display issues") 

322 

323 # Check for polyglot patterns (uses precompiled regex list) 

324 for pattern in _POLYGLOT_PATTERNS: 

325 if pattern.search(value): 

326 raise ValueError(f"{field_name} contains potentially dangerous character sequences") 

327 

328 # Escape HTML entities to ensure proper display 

329 return html.escape(value, quote=True) 

330 

331 @classmethod 

332 def validate_name(cls, value: str, field_name: str = "Name") -> str: 

333 """Validate names with strict character requirements 

334 

335 Args: 

336 value (str): Value to validate 

337 field_name (str): Name of field being validated 

338 

339 Returns: 

340 str: Value if acceptable 

341 

342 Raises: 

343 ValueError: When input is not acceptable 

344 

345 Examples: 

346 >>> SecurityValidator.validate_name('valid_name') 

347 'valid_name' 

348 >>> SecurityValidator.validate_name('valid_name-123') 

349 'valid_name-123' 

350 >>> SecurityValidator.validate_name('valid_name_test') 

351 'valid_name_test' 

352 >>> SecurityValidator.validate_name('Test Name') 

353 'Test Name' 

354 >>> try: 

355 ... SecurityValidator.validate_name('Invalid Name!') 

356 ... except ValueError as e: 

357 ... 'can only contain' in str(e) 

358 True 

359 >>> try: 

360 ... SecurityValidator.validate_name('') 

361 ... except ValueError as e: 

362 ... 'cannot be empty' in str(e) 

363 True 

364 >>> try: 

365 ... SecurityValidator.validate_name('name<script>') 

366 ... except ValueError as e: 

367 ... 'HTML special characters' in str(e) or 'can only contain' in str(e) 

368 True 

369 

370 Test length limit (line 181): 

371 

372 >>> long_name = 'a' * 256 

373 >>> try: 

374 ... SecurityValidator.validate_name(long_name) 

375 ... except ValueError as e: 

376 ... 'exceeds maximum length' in str(e) 

377 True 

378 

379 Test HTML special characters (line 178): 

380 

381 >>> try: 

382 ... SecurityValidator.validate_name('name"test') 

383 ... except ValueError as e: 

384 ... 'can only contain' in str(e) 

385 True 

386 >>> try: 

387 ... SecurityValidator.validate_name("name'test") 

388 ... except ValueError as e: 

389 ... 'can only contain' in str(e) 

390 True 

391 >>> try: 

392 ... SecurityValidator.validate_name('name/test') 

393 ... except ValueError as e: 

394 ... 'can only contain' in str(e) 

395 True 

396 """ 

397 if not value: 

398 raise ValueError(f"{field_name} cannot be empty") 

399 

400 # Check against allowed pattern 

401 if not re.match(cls.NAME_PATTERN, value): 

402 raise ValueError(f"{field_name} can only contain letters, numbers, underscore, and hyphen. Special characters like <, >, quotes are not allowed.") 

403 

404 # Additional check for HTML-like patterns (uses precompiled regex) 

405 if _HTML_SPECIAL_CHARS_RE.search(value): 

406 raise ValueError(f"{field_name} cannot contain HTML special characters") 

407 

408 if len(value) > cls.MAX_NAME_LENGTH: 

409 raise ValueError(f"{field_name} exceeds maximum length of {cls.MAX_NAME_LENGTH}") 

410 

411 return value 

412 

413 @classmethod 

414 def validate_identifier(cls, value: str, field_name: str) -> str: 

415 """Validate identifiers (IDs) - MCP compliant 

416 

417 Args: 

418 value (str): Value to validate 

419 field_name (str): Name of field being validated 

420 

421 Returns: 

422 str: Value if acceptable 

423 

424 Raises: 

425 ValueError: When input is not acceptable 

426 

427 Examples: 

428 >>> SecurityValidator.validate_identifier('valid_id', 'ID') 

429 'valid_id' 

430 >>> SecurityValidator.validate_identifier('valid.id.123', 'ID') 

431 'valid.id.123' 

432 >>> SecurityValidator.validate_identifier('valid-id_test', 'ID') 

433 'valid-id_test' 

434 >>> SecurityValidator.validate_identifier('test123', 'ID') 

435 'test123' 

436 >>> try: 

437 ... SecurityValidator.validate_identifier('Invalid/ID', 'ID') 

438 ... except ValueError as e: 

439 ... 'can only contain' in str(e) 

440 True 

441 >>> try: 

442 ... SecurityValidator.validate_identifier('', 'ID') 

443 ... except ValueError as e: 

444 ... 'cannot be empty' in str(e) 

445 True 

446 >>> try: 

447 ... SecurityValidator.validate_identifier('id<script>', 'ID') 

448 ... except ValueError as e: 

449 ... 'HTML special characters' in str(e) or 'can only contain' in str(e) 

450 True 

451 

452 Test HTML special characters (line 233): 

453 

454 >>> try: 

455 ... SecurityValidator.validate_identifier('id"test', 'ID') 

456 ... except ValueError as e: 

457 ... 'can only contain' in str(e) 

458 True 

459 >>> try: 

460 ... SecurityValidator.validate_identifier("id'test", 'ID') 

461 ... except ValueError as e: 

462 ... 'can only contain' in str(e) 

463 True 

464 >>> try: 

465 ... SecurityValidator.validate_identifier('id/test', 'ID') 

466 ... except ValueError as e: 

467 ... 'can only contain' in str(e) 

468 True 

469 

470 Test length limit (line 236): 

471 

472 >>> long_id = 'a' * 256 

473 >>> try: 

474 ... SecurityValidator.validate_identifier(long_id, 'ID') 

475 ... except ValueError as e: 

476 ... 'exceeds maximum length' in str(e) 

477 True 

478 """ 

479 if not value: 

480 raise ValueError(f"{field_name} cannot be empty") 

481 

482 # MCP spec: identifiers should be alphanumeric + limited special chars 

483 if not re.match(cls.IDENTIFIER_PATTERN, value): 

484 raise ValueError(f"{field_name} can only contain letters, numbers, underscore, hyphen, and dots") 

485 

486 # Block HTML-like patterns (uses precompiled regex) 

487 if _HTML_SPECIAL_CHARS_RE.search(value): 

488 raise ValueError(f"{field_name} cannot contain HTML special characters") 

489 

490 if len(value) > cls.MAX_NAME_LENGTH: 

491 raise ValueError(f"{field_name} exceeds maximum length of {cls.MAX_NAME_LENGTH}") 

492 

493 return value 

494 

495 @classmethod 

496 def validate_uri(cls, value: str, field_name: str = "URI") -> str: 

497 """Validate URIs - MCP compliant 

498 

499 Args: 

500 value (str): Value to validate 

501 field_name (str): Name of field being validated 

502 

503 Returns: 

504 str: Value if acceptable 

505 

506 Raises: 

507 ValueError: When input is not acceptable 

508 

509 Examples: 

510 >>> SecurityValidator.validate_uri('/valid/uri', 'URI') 

511 '/valid/uri' 

512 >>> SecurityValidator.validate_uri('..', 'URI') 

513 Traceback (most recent call last): 

514 ... 

515 ValueError: URI cannot contain directory traversal sequences ('..') 

516 """ 

517 if not value: 

518 raise ValueError(f"{field_name} cannot be empty") 

519 

520 # Block HTML-like patterns 

521 if re.search(cls.VALIDATION_UNSAFE_URI_PATTERN, value): 

522 raise ValueError(f"{field_name} cannot contain HTML special characters") 

523 

524 if ".." in value: 

525 raise ValueError(f"{field_name} cannot contain directory traversal sequences ('..')") 

526 

527 if not re.search(cls.VALIDATION_SAFE_URI_PATTERN, value): 

528 raise ValueError(f"{field_name} contains invalid characters") 

529 

530 if len(value) > cls.MAX_NAME_LENGTH: 

531 raise ValueError(f"{field_name} exceeds maximum length of {cls.MAX_NAME_LENGTH}") 

532 

533 return value 

534 

535 @classmethod 

536 def validate_tool_name(cls, value: str) -> str: 

537 """Special validation for MCP tool names 

538 

539 Args: 

540 value (str): Value to validate 

541 

542 Returns: 

543 str: Value if acceptable 

544 

545 Raises: 

546 ValueError: When input is not acceptable 

547 

548 Examples: 

549 >>> SecurityValidator.validate_tool_name('tool_1') 

550 'tool_1' 

551 >>> SecurityValidator.validate_tool_name('_5gpt_query') 

552 '_5gpt_query' 

553 >>> SecurityValidator.validate_tool_name('1tool') 

554 '1tool' 

555 

556 Test invalid characters (rejected by pattern): 

557 

558 >>> try: 

559 ... SecurityValidator.validate_tool_name('tool<script>') 

560 ... except ValueError as e: 

561 ... 'must start with a letter, number, or underscore' in str(e) 

562 True 

563 >>> try: 

564 ... SecurityValidator.validate_tool_name('tool"test') 

565 ... except ValueError as e: 

566 ... 'must start with a letter, number, or underscore' in str(e) 

567 True 

568 >>> try: 

569 ... SecurityValidator.validate_tool_name("tool'test") 

570 ... except ValueError as e: 

571 ... 'must start with a letter, number, or underscore' in str(e) 

572 True 

573 >>> # Slashes are allowed per SEP-986 

574 >>> SecurityValidator.validate_tool_name('tool/test') 

575 'tool/test' 

576 >>> SecurityValidator.validate_tool_name('namespace/subtool') 

577 'namespace/subtool' 

578 

579 Test length limit (line 313): 

580 

581 >>> long_tool_name = 'a' * 256 

582 >>> try: 

583 ... SecurityValidator.validate_tool_name(long_tool_name) 

584 ... except ValueError as e: 

585 ... 'exceeds maximum length' in str(e) 

586 True 

587 """ 

588 if not value: 

589 raise ValueError("Tool name cannot be empty") 

590 

591 # MCP tools have specific naming requirements 

592 if not re.match(cls.TOOL_NAME_PATTERN, value): 

593 raise ValueError("Tool name must start with a letter, number, or underscore and contain only letters, numbers, periods, underscores, hyphens, and slashes") 

594 

595 # Ensure no HTML-like content (uses precompiled regex) 

596 if _HTML_SPECIAL_CHARS_RE.search(value): 

597 raise ValueError("Tool name cannot contain HTML special characters") 

598 

599 if len(value) > cls.MAX_NAME_LENGTH: 

600 raise ValueError(f"Tool name exceeds maximum length of {cls.MAX_NAME_LENGTH}") 

601 

602 return value 

603 

604 @classmethod 

605 def validate_uuid(cls, value: str, field_name: str = "UUID") -> str: 

606 """Validate UUID format 

607 

608 Args: 

609 value (str): Value to validate 

610 field_name (str): Name of field being validated 

611 

612 Returns: 

613 str: Value if validated as safe 

614 

615 Raises: 

616 ValueError: When value is not a valid UUID 

617 

618 Examples: 

619 >>> SecurityValidator.validate_uuid('550e8400-e29b-41d4-a716-446655440000') 

620 '550e8400e29b41d4a716446655440000' 

621 >>> SecurityValidator.validate_uuid('invalid-uuid') 

622 Traceback (most recent call last): 

623 ... 

624 ValueError: UUID must be a valid UUID format 

625 

626 Test empty UUID (line 340): 

627 

628 >>> SecurityValidator.validate_uuid('') 

629 '' 

630 

631 Test normalized UUID format (lines 344-346): 

632 

633 >>> SecurityValidator.validate_uuid('550E8400-E29B-41D4-A716-446655440000') 

634 '550e8400e29b41d4a716446655440000' 

635 >>> SecurityValidator.validate_uuid('550e8400e29b41d4a716446655440000') 

636 '550e8400e29b41d4a716446655440000' 

637 

638 Test various invalid UUID formats (line 347-348): 

639 

640 >>> try: 

641 ... SecurityValidator.validate_uuid('not-a-uuid') 

642 ... except ValueError as e: 

643 ... 'valid UUID format' in str(e) 

644 True 

645 >>> try: 

646 ... SecurityValidator.validate_uuid('550e8400-e29b-41d4-a716') 

647 ... except ValueError as e: 

648 ... 'valid UUID format' in str(e) 

649 True 

650 >>> try: 

651 ... SecurityValidator.validate_uuid('550e8400-e29b-41d4-a716-446655440000-extra') 

652 ... except ValueError as e: 

653 ... 'valid UUID format' in str(e) 

654 True 

655 >>> try: 

656 ... SecurityValidator.validate_uuid('gggggggg-gggg-gggg-gggg-gggggggggggg') 

657 ... except ValueError as e: 

658 ... 'valid UUID format' in str(e) 

659 True 

660 """ 

661 if not value: 

662 return value 

663 

664 try: 

665 # Validate UUID format by attempting to parse it 

666 uuid_obj = uuid.UUID(value) 

667 # Return the normalized string representation 

668 return str(uuid_obj).replace("-", "") 

669 except ValueError: 

670 logger.error(f"Invalid UUID format for {field_name}: {value}") 

671 raise ValueError(f"{field_name} must be a valid UUID format") 

672 

673 @classmethod 

674 def validate_template(cls, value: str) -> str: 

675 """Special validation for templates - allow safe Jinja2 but prevent SSTI 

676 

677 Args: 

678 value (str): Value to validate 

679 

680 Returns: 

681 str: Value if acceptable 

682 

683 Raises: 

684 ValueError: When input is not acceptable 

685 

686 Examples: 

687 Empty template handling: 

688 

689 >>> SecurityValidator.validate_template('') 

690 '' 

691 >>> SecurityValidator.validate_template(None) #doctest: +SKIP 

692 

693 Safe Jinja2 templates: 

694 

695 >>> SecurityValidator.validate_template('Hello {{ name }}') 

696 'Hello {{ name }}' 

697 >>> SecurityValidator.validate_template('{% if condition %}text{% endif %}') 

698 '{% if condition %}text{% endif %}' 

699 >>> SecurityValidator.validate_template('{{ username }}') 

700 '{{ username }}' 

701 

702 Dangerous HTML tags blocked: 

703 

704 >>> SecurityValidator.validate_template('Hello <script>alert(1)</script>') 

705 Traceback (most recent call last): 

706 ... 

707 ValueError: Template contains HTML tags that may interfere with proper display 

708 >>> SecurityValidator.validate_template('Test <iframe src="evil.com"></iframe>') 

709 Traceback (most recent call last): 

710 ... 

711 ValueError: Template contains HTML tags that may interfere with proper display 

712 >>> SecurityValidator.validate_template('<form action="/evil"></form>') 

713 Traceback (most recent call last): 

714 ... 

715 ValueError: Template contains HTML tags that may interfere with proper display 

716 

717 Event handlers blocked: 

718 

719 >>> SecurityValidator.validate_template('<div onclick="evil()">Test</div>') 

720 Traceback (most recent call last): 

721 ... 

722 ValueError: Template contains event handlers that may cause display issues 

723 >>> SecurityValidator.validate_template('onload = "alert(1)"') 

724 Traceback (most recent call last): 

725 ... 

726 ValueError: Template contains event handlers that may cause display issues 

727 

728 SSTI prevention patterns: 

729 

730 >>> SecurityValidator.validate_template('{{ __import__ }}') 

731 Traceback (most recent call last): 

732 ... 

733 ValueError: Template contains potentially dangerous expressions 

734 >>> SecurityValidator.validate_template('{{ config }}') 

735 Traceback (most recent call last): 

736 ... 

737 ValueError: Template contains potentially dangerous expressions 

738 >>> SecurityValidator.validate_template('{% import os %}') 

739 Traceback (most recent call last): 

740 ... 

741 ValueError: Template contains potentially dangerous expressions 

742 >>> SecurityValidator.validate_template('{{ 7*7 }}') 

743 Traceback (most recent call last): 

744 ... 

745 ValueError: Template contains potentially dangerous expressions 

746 >>> SecurityValidator.validate_template('{{ 10/2 }}') 

747 Traceback (most recent call last): 

748 ... 

749 ValueError: Template contains potentially dangerous expressions 

750 >>> SecurityValidator.validate_template('{{ 5+5 }}') 

751 Traceback (most recent call last): 

752 ... 

753 ValueError: Template contains potentially dangerous expressions 

754 >>> SecurityValidator.validate_template('{{ 10-5 }}') 

755 Traceback (most recent call last): 

756 ... 

757 ValueError: Template contains potentially dangerous expressions 

758 

759 Other template injection patterns: 

760 

761 >>> SecurityValidator.validate_template('${evil}') 

762 Traceback (most recent call last): 

763 ... 

764 ValueError: Template contains potentially dangerous expressions 

765 >>> SecurityValidator.validate_template('#{evil}') 

766 Traceback (most recent call last): 

767 ... 

768 ValueError: Template contains potentially dangerous expressions 

769 >>> SecurityValidator.validate_template('%{evil}') 

770 Traceback (most recent call last): 

771 ... 

772 ValueError: Template contains potentially dangerous expressions 

773 

774 Length limit testing: 

775 

776 >>> long_template = 'a' * 65537 

777 >>> SecurityValidator.validate_template(long_template) 

778 Traceback (most recent call last): 

779 ... 

780 ValueError: Template exceeds maximum length of 65536 

781 """ 

782 if not value: 

783 return value 

784 

785 if len(value) > cls.MAX_TEMPLATE_LENGTH: 

786 raise ValueError(f"Template exceeds maximum length of {cls.MAX_TEMPLATE_LENGTH}") 

787 

788 # Block dangerous tags but allow Jinja2 syntax {{ }} and {% %} (uses precompiled regex) 

789 if _DANGEROUS_TEMPLATE_TAGS_RE.search(value): 

790 raise ValueError("Template contains HTML tags that may interfere with proper display") 

791 

792 # Check for event handlers that could cause issues (uses precompiled regex) 

793 if _EVENT_HANDLER_RE.search(value): 

794 raise ValueError("Template contains event handlers that may cause display issues") 

795 

796 # SSTI prevention - scan expressions without regex backtracking. 

797 for expr in _iter_template_expressions(value, "{{", "}}"): 

798 expr_lower = expr.lower() 

799 # Normalize whitespace around | and = to catch bypass variants 

800 expr_normalized = re.sub(r"\s*\|\s*", "|", expr_lower) 

801 expr_normalized = re.sub(r"\s*=\s*", "=", expr_normalized) 

802 if any(token in expr_normalized for token in _SSTI_DANGEROUS_SUBSTRINGS): 

803 raise ValueError("Template contains potentially dangerous expressions") 

804 if any(op in expr for op in _SSTI_DANGEROUS_OPERATORS): 

805 raise ValueError("Template contains potentially dangerous expressions") 

806 

807 for expr in _iter_template_expressions(value, "{%", "%}"): 

808 expr_lower = expr.lower() 

809 # Normalize whitespace around | and = to catch bypass variants 

810 expr_normalized = re.sub(r"\s*\|\s*", "|", expr_lower) 

811 expr_normalized = re.sub(r"\s*=\s*", "=", expr_normalized) 

812 if any(token in expr_normalized for token in _SSTI_DANGEROUS_SUBSTRINGS): 

813 raise ValueError("Template contains potentially dangerous expressions") 

814 if any(op in expr for op in _SSTI_DANGEROUS_OPERATORS): 

815 raise ValueError("Template contains potentially dangerous expressions") 

816 

817 if any(_has_simple_template_expression(value, prefix) for prefix in _SSTI_SIMPLE_TEMPLATE_PREFIXES): 

818 raise ValueError("Template contains potentially dangerous expressions") 

819 

820 return value 

821 

822 @classmethod 

823 def validate_url(cls, value: str, field_name: str = "URL") -> str: 

824 """Validate URLs for allowed schemes and safe display 

825 

826 Args: 

827 value (str): Value to validate 

828 field_name (str): Name of field being validated 

829 

830 Returns: 

831 str: Value if acceptable 

832 

833 Raises: 

834 ValueError: When input is not acceptable 

835 

836 Examples: 

837 Valid URLs: 

838 

839 >>> SecurityValidator.validate_url('https://example.com') 

840 'https://example.com' 

841 >>> SecurityValidator.validate_url('http://example.com') 

842 'http://example.com' 

843 >>> SecurityValidator.validate_url('ws://example.com') 

844 'ws://example.com' 

845 >>> SecurityValidator.validate_url('wss://example.com') 

846 'wss://example.com' 

847 >>> SecurityValidator.validate_url('https://example.com:8080/path') 

848 'https://example.com:8080/path' 

849 >>> SecurityValidator.validate_url('https://example.com/path?query=value') 

850 'https://example.com/path?query=value' 

851 

852 Empty URL handling: 

853 

854 >>> SecurityValidator.validate_url('') 

855 Traceback (most recent call last): 

856 ... 

857 ValueError: URL cannot be empty 

858 

859 Length validation: 

860 

861 >>> long_url = 'https://example.com/' + 'a' * 2100 

862 >>> SecurityValidator.validate_url(long_url) 

863 Traceback (most recent call last): 

864 ... 

865 ValueError: URL exceeds maximum length of 2048 

866 

867 Scheme validation: 

868 

869 >>> SecurityValidator.validate_url('ftp://example.com') 

870 Traceback (most recent call last): 

871 ... 

872 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

873 >>> SecurityValidator.validate_url('file:///etc/passwd') 

874 Traceback (most recent call last): 

875 ... 

876 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

877 >>> SecurityValidator.validate_url('javascript:alert(1)') 

878 Traceback (most recent call last): 

879 ... 

880 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

881 >>> SecurityValidator.validate_url('data:text/plain,hello') 

882 Traceback (most recent call last): 

883 ... 

884 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

885 >>> SecurityValidator.validate_url('vbscript:alert(1)') 

886 Traceback (most recent call last): 

887 ... 

888 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

889 >>> SecurityValidator.validate_url('about:blank') 

890 Traceback (most recent call last): 

891 ... 

892 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

893 >>> SecurityValidator.validate_url('chrome://settings') 

894 Traceback (most recent call last): 

895 ... 

896 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

897 >>> SecurityValidator.validate_url('mailto:test@example.com') 

898 Traceback (most recent call last): 

899 ... 

900 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

901 

902 IPv6 URL blocking: 

903 

904 >>> SecurityValidator.validate_url('https://[::1]:8080/') 

905 Traceback (most recent call last): 

906 ... 

907 ValueError: URL contains IPv6 address which is not supported 

908 >>> SecurityValidator.validate_url('https://[2001:db8::1]/') 

909 Traceback (most recent call last): 

910 ... 

911 ValueError: URL contains IPv6 address which is not supported 

912 

913 Protocol-relative URL blocking: 

914 

915 >>> SecurityValidator.validate_url('//example.com/path') 

916 Traceback (most recent call last): 

917 ... 

918 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

919 

920 Line break injection: 

921 

922 >>> SecurityValidator.validate_url('https://example.com\\rHost: evil.com') 

923 Traceback (most recent call last): 

924 ... 

925 ValueError: URL contains line breaks which are not allowed 

926 >>> SecurityValidator.validate_url('https://example.com\\nHost: evil.com') 

927 Traceback (most recent call last): 

928 ... 

929 ValueError: URL contains line breaks which are not allowed 

930 

931 Space validation: 

932 

933 >>> SecurityValidator.validate_url('https://exam ple.com') 

934 Traceback (most recent call last): 

935 ... 

936 ValueError: URL contains spaces which are not allowed in URLs 

937 >>> SecurityValidator.validate_url('https://example.com/path?query=hello world') 

938 'https://example.com/path?query=hello world' 

939 

940 Malformed URLs: 

941 

942 >>> SecurityValidator.validate_url('https://') 

943 Traceback (most recent call last): 

944 ... 

945 ValueError: URL is not a valid URL 

946 >>> SecurityValidator.validate_url('not-a-url') 

947 Traceback (most recent call last): 

948 ... 

949 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

950 

951 Restricted IP addresses: 

952 

953 >>> SecurityValidator.validate_url('https://0.0.0.0/') 

954 Traceback (most recent call last): 

955 ... 

956 ValueError: URL contains invalid IP address (0.0.0.0) 

957 >>> SecurityValidator.validate_url('https://169.254.169.254/') # doctest: +ELLIPSIS 

958 Traceback (most recent call last): 

959 ... 

960 ValueError: URL contains IP address blocked by SSRF protection ... 

961 

962 Invalid port numbers: 

963 

964 >>> SecurityValidator.validate_url('https://example.com:0/') 

965 Traceback (most recent call last): 

966 ... 

967 ValueError: URL contains invalid port number 

968 >>> try: 

969 ... SecurityValidator.validate_url('https://example.com:65536/') 

970 ... except ValueError as e: 

971 ... 'Port out of range' in str(e) or 'invalid port' in str(e) 

972 True 

973 

974 Credentials in URL: 

975 

976 >>> SecurityValidator.validate_url('https://user:pass@example.com/') 

977 Traceback (most recent call last): 

978 ... 

979 ValueError: URL contains credentials which are not allowed 

980 >>> SecurityValidator.validate_url('https://user@example.com/') 

981 Traceback (most recent call last): 

982 ... 

983 ValueError: URL contains credentials which are not allowed 

984 

985 XSS patterns in URLs: 

986 

987 >>> SecurityValidator.validate_url('https://example.com/<script>') 

988 Traceback (most recent call last): 

989 ... 

990 ValueError: URL contains HTML tags that may cause security issues 

991 >>> SecurityValidator.validate_url('https://example.com?param=javascript:alert(1)') 

992 Traceback (most recent call last): 

993 ... 

994 ValueError: URL contains unsupported or potentially dangerous protocol 

995 """ 

996 if not value: 

997 raise ValueError(f"{field_name} cannot be empty") 

998 

999 # Length check 

1000 if len(value) > cls.MAX_URL_LENGTH: 

1001 raise ValueError(f"{field_name} exceeds maximum length of {cls.MAX_URL_LENGTH}") 

1002 

1003 # Check allowed schemes 

1004 allowed_schemes = cls.ALLOWED_URL_SCHEMES 

1005 if not any(value.lower().startswith(scheme.lower()) for scheme in allowed_schemes): 

1006 raise ValueError(f"{field_name} must start with one of: {', '.join(allowed_schemes)}") 

1007 

1008 # Block dangerous URL patterns (uses precompiled regex list) 

1009 for pattern in _DANGEROUS_URL_PATTERNS: 

1010 if pattern.search(value): 

1011 raise ValueError(f"{field_name} contains unsupported or potentially dangerous protocol") 

1012 

1013 # Block IPv6 URLs (URLs with square brackets) 

1014 if "[" in value or "]" in value: 

1015 raise ValueError(f"{field_name} contains IPv6 address which is not supported") 

1016 

1017 # Block protocol-relative URLs 

1018 if value.startswith("//"): 

1019 raise ValueError(f"{field_name} contains protocol-relative URL which is not supported") 

1020 

1021 # Check for CRLF injection 

1022 if "\r" in value or "\n" in value: 

1023 raise ValueError(f"{field_name} contains line breaks which are not allowed") 

1024 

1025 # Check for spaces in domain 

1026 if " " in value.split("?")[0]: # Check only in the URL part, not query string 

1027 raise ValueError(f"{field_name} contains spaces which are not allowed in URLs") 

1028 

1029 # Basic URL structure validation 

1030 try: 

1031 result = urlparse(value) 

1032 if not all([result.scheme, result.netloc]): 

1033 raise ValueError(f"{field_name} is not a valid URL") 

1034 

1035 # Additional validation: ensure netloc doesn't contain brackets (double-check) 

1036 if "[" in result.netloc or "]" in result.netloc: 

1037 raise ValueError(f"{field_name} contains IPv6 address which is not supported") 

1038 

1039 # SSRF Protection: Block dangerous IP addresses and hostnames 

1040 hostname = result.hostname 

1041 if hostname: 

1042 # Always block 0.0.0.0 (all interfaces) regardless of SSRF settings 

1043 if hostname == "0.0.0.0": # nosec B104 - we're blocking this for security 

1044 raise ValueError(f"{field_name} contains invalid IP address (0.0.0.0)") 

1045 

1046 # Apply SSRF protection if enabled 

1047 if settings.ssrf_protection_enabled: 

1048 cls._validate_ssrf(hostname, field_name) 

1049 

1050 # Validate port number 

1051 if result.port is not None: 

1052 if result.port < 1 or result.port > 65535: 

1053 raise ValueError(f"{field_name} contains invalid port number") 

1054 

1055 # Check for credentials in URL 

1056 if result.username or result.password: 

1057 raise ValueError(f"{field_name} contains credentials which are not allowed") 

1058 

1059 # Check for XSS patterns in the entire URL 

1060 if re.search(cls.DANGEROUS_HTML_PATTERN, value, re.IGNORECASE): 

1061 raise ValueError(f"{field_name} contains HTML tags that may cause security issues") 

1062 

1063 if re.search(cls.DANGEROUS_JS_PATTERN, value, re.IGNORECASE): 

1064 raise ValueError(f"{field_name} contains script patterns that may cause security issues") 

1065 

1066 except ValueError: 

1067 # Re-raise ValueError as-is 

1068 raise 

1069 except Exception: 

1070 raise ValueError(f"{field_name} is not a valid URL") 

1071 

1072 return value 

1073 

1074 @classmethod 

1075 def _validate_ssrf(cls, hostname: str, field_name: str) -> None: 

1076 """Validate hostname/IP against SSRF protection rules. 

1077 

1078 This method implements configurable SSRF (Server-Side Request Forgery) protection 

1079 to prevent the gateway from being used to access internal resources or cloud 

1080 metadata services. 

1081 

1082 Args: 

1083 hostname (str): The hostname or IP address to validate. 

1084 field_name (str): Name of field being validated (for error messages). 

1085 

1086 Raises: 

1087 ValueError: If the hostname/IP is blocked by SSRF protection rules. 

1088 

1089 Configuration (via settings): 

1090 - ssrf_protection_enabled: Master switch (must be True for this to be called) 

1091 - ssrf_blocked_networks: CIDR ranges always blocked (e.g., cloud metadata) 

1092 - ssrf_blocked_hosts: Hostnames always blocked 

1093 - ssrf_allow_localhost: If False, blocks 127.0.0.0/8 and localhost 

1094 - ssrf_allow_private_networks: If False, blocks RFC 1918 private ranges 

1095 

1096 Examples: 

1097 Cloud metadata (always blocked): 

1098 

1099 >>> from unittest.mock import patch, MagicMock 

1100 >>> mock_settings = MagicMock() 

1101 >>> mock_settings.ssrf_protection_enabled = True 

1102 >>> mock_settings.ssrf_blocked_networks = ["169.254.169.254/32"] 

1103 >>> mock_settings.ssrf_blocked_hosts = ["metadata.google.internal"] 

1104 >>> mock_settings.ssrf_allow_localhost = True 

1105 >>> mock_settings.ssrf_allow_private_networks = True 

1106 >>> with patch('mcpgateway.common.validators.settings', mock_settings): 

1107 ... try: 

1108 ... SecurityValidator._validate_ssrf('169.254.169.254', 'URL') 

1109 ... except ValueError as e: 

1110 ... 'blocked by SSRF protection' in str(e) 

1111 True 

1112 

1113 Localhost (configurable): 

1114 

1115 >>> mock_settings.ssrf_allow_localhost = False 

1116 >>> with patch('mcpgateway.common.validators.settings', mock_settings): 

1117 ... try: 

1118 ... SecurityValidator._validate_ssrf('127.0.0.1', 'URL') 

1119 ... except ValueError as e: 

1120 ... 'localhost' in str(e).lower() 

1121 True 

1122 

1123 Public IPs (always allowed): 

1124 

1125 >>> mock_settings.ssrf_allow_localhost = True 

1126 >>> mock_settings.ssrf_allow_private_networks = True 

1127 >>> with patch('mcpgateway.common.validators.settings', mock_settings): 

1128 ... SecurityValidator._validate_ssrf('8.8.8.8', 'URL') # Should not raise 

1129 """ 

1130 # Normalize hostname: lowercase, strip trailing dots (DNS FQDN notation) 

1131 hostname_normalized = hostname.lower().rstrip(".") 

1132 

1133 # Check blocked hostnames (case-insensitive, normalized) 

1134 for blocked_host in settings.ssrf_blocked_hosts: 

1135 blocked_normalized = blocked_host.lower().rstrip(".") 

1136 if hostname_normalized == blocked_normalized: 

1137 raise ValueError(f"{field_name} contains blocked hostname '{hostname}' (SSRF protection)") 

1138 

1139 # Resolve hostname to IP for network-based checks 

1140 # Uses getaddrinfo to check ALL resolved addresses (A and AAAA records) 

1141 ip_addresses: list = [] 

1142 try: 

1143 # Try to parse as IP address directly 

1144 ip_addresses = [ipaddress.ip_address(hostname)] 

1145 except ValueError: 

1146 # It's a hostname, resolve ALL addresses (IPv4 and IPv6) 

1147 try: 

1148 # getaddrinfo returns all A/AAAA records 

1149 addr_info = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM) 

1150 for _, _, _, _, sockaddr in addr_info: 

1151 try: 

1152 ip_addresses.append(ipaddress.ip_address(sockaddr[0])) 

1153 except ValueError: 

1154 continue 

1155 except (socket.gaierror, socket.herror): 

1156 # DNS resolution failed 

1157 if settings.ssrf_dns_fail_closed: 

1158 raise ValueError(f"{field_name} DNS resolution failed and SSRF_DNS_FAIL_CLOSED is enabled") 

1159 # Fail open: allow through (hostname blocking above catches known dangerous hostnames) 

1160 return 

1161 

1162 if not ip_addresses: 

1163 if settings.ssrf_dns_fail_closed: 

1164 raise ValueError(f"{field_name} DNS resolution returned no addresses and SSRF_DNS_FAIL_CLOSED is enabled") 

1165 return 

1166 

1167 # Check ALL resolved addresses - if ANY is blocked, reject the request 

1168 for ip_addr in ip_addresses: 

1169 # Check against blocked networks (always blocked regardless of other settings) 

1170 for network_str in settings.ssrf_blocked_networks: 

1171 try: 

1172 network = ipaddress.ip_network(network_str, strict=False) 

1173 except ValueError: 

1174 # Invalid network in config - log and skip 

1175 logger.warning(f"Invalid CIDR in ssrf_blocked_networks: {network_str}") 

1176 continue 

1177 

1178 if ip_addr in network: 

1179 raise ValueError(f"{field_name} contains IP address blocked by SSRF protection (network: {network_str})") 

1180 

1181 # Check localhost/loopback (if not allowed) 

1182 if not settings.ssrf_allow_localhost: 

1183 if ip_addr.is_loopback or hostname_normalized in ("localhost", "localhost.localdomain"): 

1184 raise ValueError(f"{field_name} contains localhost address which is blocked by SSRF protection") 

1185 

1186 # Check private networks (if not allowed) 

1187 if not settings.ssrf_allow_private_networks: 

1188 if ip_addr.is_private and not ip_addr.is_loopback: 

1189 raise ValueError(f"{field_name} contains private network address which is blocked by SSRF protection") 

1190 

1191 @classmethod 

1192 def validate_no_xss(cls, value: str, field_name: str) -> None: 

1193 """ 

1194 Validate that a string does not contain XSS patterns. 

1195 

1196 Args: 

1197 value (str): Value to validate. 

1198 field_name (str): Name of the field being validated. 

1199 

1200 Raises: 

1201 ValueError: If the value contains XSS patterns. 

1202 

1203 Examples: 

1204 Safe strings pass validation: 

1205 

1206 >>> SecurityValidator.validate_no_xss('Hello World', 'test_field') 

1207 >>> SecurityValidator.validate_no_xss('User: admin@example.com', 'email') 

1208 >>> SecurityValidator.validate_no_xss('Price: $10.99', 'price') 

1209 

1210 Empty/None strings are considered safe: 

1211 

1212 >>> SecurityValidator.validate_no_xss('', 'empty_field') 

1213 >>> SecurityValidator.validate_no_xss(None, 'none_field') #doctest: +SKIP 

1214 

1215 Dangerous HTML tags trigger validation errors: 

1216 

1217 >>> SecurityValidator.validate_no_xss('<script>alert(1)</script>', 'test_field') 

1218 Traceback (most recent call last): 

1219 ... 

1220 ValueError: test_field contains HTML tags that may cause security issues 

1221 >>> SecurityValidator.validate_no_xss('<iframe src="evil.com"></iframe>', 'content') 

1222 Traceback (most recent call last): 

1223 ... 

1224 ValueError: content contains HTML tags that may cause security issues 

1225 >>> SecurityValidator.validate_no_xss('<object data="malware.swf"></object>', 'data') 

1226 Traceback (most recent call last): 

1227 ... 

1228 ValueError: data contains HTML tags that may cause security issues 

1229 >>> SecurityValidator.validate_no_xss('<embed src="evil.swf">', 'embed') 

1230 Traceback (most recent call last): 

1231 ... 

1232 ValueError: embed contains HTML tags that may cause security issues 

1233 >>> SecurityValidator.validate_no_xss('<link rel="stylesheet" href="evil.css">', 'style') 

1234 Traceback (most recent call last): 

1235 ... 

1236 ValueError: style contains HTML tags that may cause security issues 

1237 >>> SecurityValidator.validate_no_xss('<meta http-equiv="refresh" content="0;url=evil.com">', 'meta') 

1238 Traceback (most recent call last): 

1239 ... 

1240 ValueError: meta contains HTML tags that may cause security issues 

1241 >>> SecurityValidator.validate_no_xss('<base href="http://evil.com">', 'base') 

1242 Traceback (most recent call last): 

1243 ... 

1244 ValueError: base contains HTML tags that may cause security issues 

1245 >>> SecurityValidator.validate_no_xss('<form action="evil.php">', 'form') 

1246 Traceback (most recent call last): 

1247 ... 

1248 ValueError: form contains HTML tags that may cause security issues 

1249 >>> SecurityValidator.validate_no_xss('<img src="x" onerror="alert(1)">', 'image') 

1250 Traceback (most recent call last): 

1251 ... 

1252 ValueError: image contains HTML tags that may cause security issues 

1253 >>> SecurityValidator.validate_no_xss('<svg onload="alert(1)"></svg>', 'svg') 

1254 Traceback (most recent call last): 

1255 ... 

1256 ValueError: svg contains HTML tags that may cause security issues 

1257 >>> SecurityValidator.validate_no_xss('<video src="x" onerror="alert(1)"></video>', 'video') 

1258 Traceback (most recent call last): 

1259 ... 

1260 ValueError: video contains HTML tags that may cause security issues 

1261 >>> SecurityValidator.validate_no_xss('<audio src="x" onerror="alert(1)"></audio>', 'audio') 

1262 Traceback (most recent call last): 

1263 ... 

1264 ValueError: audio contains HTML tags that may cause security issues 

1265 """ 

1266 if not value: 

1267 return # Empty values are considered safe 

1268 # Check for dangerous HTML tags 

1269 if re.search(cls.DANGEROUS_HTML_PATTERN, value, re.IGNORECASE): 

1270 raise ValueError(f"{field_name} contains HTML tags that may cause security issues") 

1271 

1272 @classmethod 

1273 def validate_json_depth( 

1274 cls, 

1275 obj: object, 

1276 max_depth: int | None = None, 

1277 current_depth: int = 0, 

1278 ) -> None: 

1279 """Validate that a JSON‑like structure does not exceed a depth limit. 

1280 

1281 A *depth* is counted **only** when we enter a container (`dict` or 

1282 `list`). Primitive values (`str`, `int`, `bool`, `None`, etc.) do not 

1283 increase the depth, but an *empty* container still counts as one level. 

1284 

1285 Args: 

1286 obj: Any Python object to inspect recursively. 

1287 max_depth: Maximum allowed depth (defaults to 

1288 :pyattr:`SecurityValidator.MAX_JSON_DEPTH`). 

1289 current_depth: Internal recursion counter. **Do not** set this 

1290 from user code. 

1291 

1292 Raises: 

1293 ValueError: If the nesting level exceeds *max_depth*. 

1294 

1295 Examples: 

1296 Simple flat dictionary – depth 1: :: 

1297 

1298 >>> SecurityValidator.validate_json_depth({'name': 'Alice'}) 

1299 

1300 Nested dict – depth 2: :: 

1301 

1302 >>> SecurityValidator.validate_json_depth( 

1303 ... {'user': {'name': 'Alice'}} 

1304 ... ) 

1305 

1306 Mixed dict/list – depth 3: :: 

1307 

1308 >>> SecurityValidator.validate_json_depth( 

1309 ... {'users': [{'name': 'Alice', 'meta': {'age': 30}}]} 

1310 ... ) 

1311 

1312 At 10 levels of nesting – allowed: :: 

1313 

1314 >>> deep_10 = {'1': {'2': {'3': {'4': {'5': {'6': {'7': {'8': 

1315 ... {'9': {'10': 'end'}}}}}}}}}} 

1316 >>> SecurityValidator.validate_json_depth(deep_10) 

1317 

1318 At new default limit (30) – allowed: :: 

1319 

1320 >>> deep_30 = {'1': {'2': {'3': {'4': {'5': {'6': {'7': {'8': 

1321 ... {'9': {'10': {'11': {'12': {'13': {'14': {'15': {'16': 

1322 ... {'17': {'18': {'19': {'20': {'21': {'22': {'23': {'24': 

1323 ... {'25': {'26': {'27': {'28': {'29': {'30': 'end'}}}}}}}}}}}}}}}}}}}}}}}}}}}}}} 

1324 >>> SecurityValidator.validate_json_depth(deep_30) 

1325 

1326 One level deeper – rejected: :: 

1327 

1328 >>> deep_31 = {'1': {'2': {'3': {'4': {'5': {'6': {'7': {'8': 

1329 ... {'9': {'10': {'11': {'12': {'13': {'14': {'15': {'16': 

1330 ... {'17': {'18': {'19': {'20': {'21': {'22': {'23': {'24': 

1331 ... {'25': {'26': {'27': {'28': {'29': {'30': {'31': 'end'}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}} 

1332 >>> SecurityValidator.validate_json_depth(deep_31) 

1333 Traceback (most recent call last): 

1334 ... 

1335 ValueError: JSON structure exceeds maximum depth of 30 

1336 """ 

1337 if max_depth is None: 

1338 max_depth = cls.MAX_JSON_DEPTH 

1339 

1340 # Only containers count toward depth; primitives are ignored 

1341 if not isinstance(obj, (dict, list)): 

1342 return 

1343 

1344 next_depth = current_depth + 1 

1345 if next_depth > max_depth: 

1346 raise ValueError(f"JSON structure exceeds maximum depth of {max_depth}") 

1347 

1348 if isinstance(obj, dict): 

1349 for value in obj.values(): 

1350 cls.validate_json_depth(value, max_depth, next_depth) 

1351 else: # obj is a list 

1352 for item in obj: 

1353 cls.validate_json_depth(item, max_depth, next_depth) 

1354 

1355 @classmethod 

1356 def validate_mime_type(cls, value: str) -> str: 

1357 """Validate MIME type format 

1358 

1359 Args: 

1360 value (str): Value to validate 

1361 

1362 Returns: 

1363 str: Value if acceptable 

1364 

1365 Raises: 

1366 ValueError: When input is not acceptable 

1367 

1368 Examples: 

1369 Empty/None handling: 

1370 

1371 >>> SecurityValidator.validate_mime_type('') 

1372 '' 

1373 >>> SecurityValidator.validate_mime_type(None) #doctest: +SKIP 

1374 

1375 Valid standard MIME types: 

1376 

1377 >>> SecurityValidator.validate_mime_type('text/plain') 

1378 'text/plain' 

1379 >>> SecurityValidator.validate_mime_type('application/json') 

1380 'application/json' 

1381 >>> SecurityValidator.validate_mime_type('image/jpeg') 

1382 'image/jpeg' 

1383 >>> SecurityValidator.validate_mime_type('text/html') 

1384 'text/html' 

1385 >>> SecurityValidator.validate_mime_type('application/pdf') 

1386 'application/pdf' 

1387 

1388 Valid vendor-specific MIME types: 

1389 

1390 >>> SecurityValidator.validate_mime_type('application/x-custom') 

1391 'application/x-custom' 

1392 >>> SecurityValidator.validate_mime_type('text/x-log') 

1393 'text/x-log' 

1394 

1395 Valid MIME types with suffixes: 

1396 

1397 >>> SecurityValidator.validate_mime_type('application/vnd.api+json') 

1398 'application/vnd.api+json' 

1399 >>> SecurityValidator.validate_mime_type('image/svg+xml') 

1400 'image/svg+xml' 

1401 

1402 Invalid MIME type formats: 

1403 

1404 >>> SecurityValidator.validate_mime_type('invalid') 

1405 Traceback (most recent call last): 

1406 ... 

1407 ValueError: Invalid MIME type format 

1408 >>> SecurityValidator.validate_mime_type('text/') 

1409 Traceback (most recent call last): 

1410 ... 

1411 ValueError: Invalid MIME type format 

1412 >>> SecurityValidator.validate_mime_type('/plain') 

1413 Traceback (most recent call last): 

1414 ... 

1415 ValueError: Invalid MIME type format 

1416 >>> SecurityValidator.validate_mime_type('text//plain') 

1417 Traceback (most recent call last): 

1418 ... 

1419 ValueError: Invalid MIME type format 

1420 >>> SecurityValidator.validate_mime_type('text/plain/extra') 

1421 Traceback (most recent call last): 

1422 ... 

1423 ValueError: Invalid MIME type format 

1424 >>> SecurityValidator.validate_mime_type('text plain') 

1425 Traceback (most recent call last): 

1426 ... 

1427 ValueError: Invalid MIME type format 

1428 >>> SecurityValidator.validate_mime_type('<text/plain>') 

1429 Traceback (most recent call last): 

1430 ... 

1431 ValueError: Invalid MIME type format 

1432 

1433 Disallowed MIME types (not in whitelist - line 620): 

1434 

1435 >>> try: 

1436 ... SecurityValidator.validate_mime_type('application/evil') 

1437 ... except ValueError as e: 

1438 ... 'not in the allowed list' in str(e) 

1439 True 

1440 >>> try: 

1441 ... SecurityValidator.validate_mime_type('text/evil') 

1442 ... except ValueError as e: 

1443 ... 'not in the allowed list' in str(e) 

1444 True 

1445 

1446 Test MIME type with parameters (line 618): 

1447 

1448 >>> try: 

1449 ... SecurityValidator.validate_mime_type('application/evil; charset=utf-8') 

1450 ... except ValueError as e: 

1451 ... 'Invalid MIME type format' in str(e) 

1452 True 

1453 """ 

1454 if not value: 

1455 return value 

1456 

1457 # Basic MIME type pattern (uses precompiled regex) 

1458 if not _MIME_TYPE_RE.match(value): 

1459 raise ValueError("Invalid MIME type format") 

1460 

1461 # Common safe MIME types 

1462 safe_mime_types = settings.validation_allowed_mime_types 

1463 if value not in safe_mime_types: 

1464 # Allow x- vendor types and + suffixes 

1465 base_type = value.split(";")[0].strip() 

1466 if not (base_type.startswith("application/x-") or base_type.startswith("text/x-") or "+" in base_type): 

1467 raise ValueError(f"MIME type '{value}' is not in the allowed list") 

1468 

1469 return value 

1470 

1471 @classmethod 

1472 def validate_shell_parameter(cls, value: str) -> str: 

1473 """Validate and escape shell parameters to prevent command injection. 

1474 

1475 Args: 

1476 value (str): Shell parameter to validate 

1477 

1478 Returns: 

1479 str: Validated/escaped parameter 

1480 

1481 Raises: 

1482 ValueError: If parameter contains dangerous characters in strict mode 

1483 

1484 Examples: 

1485 >>> SecurityValidator.validate_shell_parameter('safe_param') 

1486 'safe_param' 

1487 >>> SecurityValidator.validate_shell_parameter('param with spaces') 

1488 'param with spaces' 

1489 """ 

1490 if not isinstance(value, str): 

1491 raise ValueError("Parameter must be string") 

1492 

1493 # Check for dangerous patterns (uses precompiled regex) 

1494 if _SHELL_DANGEROUS_CHARS_RE.search(value): 

1495 # Check if validation is strict 

1496 strict_mode = getattr(settings, "validation_strict", True) 

1497 if strict_mode: 

1498 raise ValueError("Parameter contains shell metacharacters") 

1499 # In non-strict mode, escape using shlex 

1500 return shlex.quote(value) 

1501 

1502 return value 

1503 

1504 @classmethod 

1505 def validate_path(cls, path: str, allowed_roots: Optional[List[str]] = None) -> str: 

1506 """Validate and normalize file paths to prevent directory traversal. 

1507 

1508 Args: 

1509 path (str): File path to validate 

1510 allowed_roots (Optional[List[str]]): List of allowed root directories 

1511 

1512 Returns: 

1513 str: Validated and normalized path 

1514 

1515 Raises: 

1516 ValueError: If path contains traversal attempts or is outside allowed roots 

1517 

1518 Examples: 

1519 >>> SecurityValidator.validate_path('/safe/path') 

1520 '/safe/path' 

1521 >>> SecurityValidator.validate_path('http://example.com/file') 

1522 'http://example.com/file' 

1523 """ 

1524 if not isinstance(path, str): 

1525 raise ValueError("Path must be string") 

1526 

1527 # Skip validation for URI schemes (http://, plugin://, etc.) (uses precompiled regex) 

1528 if _URI_SCHEME_RE.match(path): 

1529 return path 

1530 

1531 try: 

1532 p = Path(path) 

1533 # Check for path traversal 

1534 if ".." in p.parts: 

1535 raise ValueError("Path traversal detected") 

1536 

1537 resolved_path = p.resolve() 

1538 

1539 # Check against allowed roots 

1540 if allowed_roots: 

1541 allowed = any(str(resolved_path).startswith(str(Path(root).resolve())) for root in allowed_roots) 

1542 if not allowed: 

1543 raise ValueError("Path outside allowed roots") 

1544 

1545 return str(resolved_path) 

1546 except (OSError, ValueError) as e: 

1547 raise ValueError(f"Invalid path: {e}") 

1548 

1549 @classmethod 

1550 def validate_sql_parameter(cls, value: str) -> str: 

1551 """Validate SQL parameters to prevent SQL injection attacks. 

1552 

1553 Args: 

1554 value (str): SQL parameter to validate 

1555 

1556 Returns: 

1557 str: Validated/escaped parameter 

1558 

1559 Raises: 

1560 ValueError: If parameter contains SQL injection patterns in strict mode 

1561 

1562 Examples: 

1563 >>> SecurityValidator.validate_sql_parameter('safe_value') 

1564 'safe_value' 

1565 >>> SecurityValidator.validate_sql_parameter('123') 

1566 '123' 

1567 """ 

1568 if not isinstance(value, str): 

1569 return value 

1570 

1571 # Check for SQL injection patterns (uses precompiled regex list) 

1572 for pattern in _SQL_PATTERNS: 

1573 if pattern.search(value): 

1574 if getattr(settings, "validation_strict", True): 

1575 raise ValueError("Parameter contains SQL injection patterns") 

1576 # Basic escaping 

1577 value = value.replace("'", "''").replace('"', '""') 

1578 

1579 return value 

1580 

1581 @classmethod 

1582 def validate_parameter_length(cls, value: str, max_length: Optional[int] = None) -> str: 

1583 """Validate parameter length against configured limits. 

1584 

1585 Args: 

1586 value (str): Parameter to validate 

1587 max_length (int): Maximum allowed length 

1588 

1589 Returns: 

1590 str: Parameter if within length limits 

1591 

1592 Raises: 

1593 ValueError: If parameter exceeds maximum length 

1594 

1595 Examples: 

1596 >>> SecurityValidator.validate_parameter_length('short', 10) 

1597 'short' 

1598 """ 

1599 max_len = max_length or getattr(settings, "max_param_length", 10000) 

1600 if len(value) > max_len: 

1601 raise ValueError(f"Parameter exceeds maximum length of {max_len}") 

1602 return value 

1603 

1604 @classmethod 

1605 def sanitize_text(cls, text: str) -> str: 

1606 """Remove control characters and ANSI escape sequences from text. 

1607 

1608 Args: 

1609 text (str): Text to sanitize 

1610 

1611 Returns: 

1612 str: Sanitized text with control characters removed 

1613 

1614 Examples: 

1615 >>> SecurityValidator.sanitize_text('Hello World') 

1616 'Hello World' 

1617 >>> SecurityValidator.sanitize_text('Text\x1b[31mwith\x1b[0mcolors') 

1618 'Textwithcolors' 

1619 """ 

1620 if not isinstance(text, str): 

1621 return text 

1622 

1623 # Remove ANSI escape sequences (uses precompiled regex) 

1624 text = _ANSI_ESCAPE_RE.sub("", text) 

1625 # Remove control characters except newlines and tabs (uses precompiled regex) 

1626 sanitized = _CONTROL_CHARS_RE.sub("", text) 

1627 return sanitized 

1628 

1629 @classmethod 

1630 def sanitize_json_response(cls, data: Any) -> Any: 

1631 """Recursively sanitize JSON response data by removing control characters. 

1632 

1633 Args: 

1634 data (Any): JSON data structure to sanitize 

1635 

1636 Returns: 

1637 Any: Sanitized data structure with same type as input 

1638 

1639 Examples: 

1640 >>> SecurityValidator.sanitize_json_response('clean text') 

1641 'clean text' 

1642 >>> SecurityValidator.sanitize_json_response({'key': 'value'}) 

1643 {'key': 'value'} 

1644 >>> SecurityValidator.sanitize_json_response(['item1', 'item2']) 

1645 ['item1', 'item2'] 

1646 """ 

1647 if isinstance(data, str): 

1648 return cls.sanitize_text(data) 

1649 if isinstance(data, dict): 

1650 return {k: cls.sanitize_json_response(v) for k, v in data.items()} 

1651 if isinstance(data, list): 

1652 return [cls.sanitize_json_response(item) for item in data] 

1653 return data