Coverage for mcpgateway / common / validators.py: 100%

381 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/common/validators.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti, Madhav Kandukuri 

6 

7SecurityValidator for ContextForge 

8This module defines the `SecurityValidator` class, which provides centralized, configurable 

9validation logic for user-generated content in MCP-based applications. 

10 

11The validator enforces strict security and structural rules across common input types such as: 

12- Display text (e.g., names, descriptions) 

13- Identifiers and tool names 

14- URIs and URLs 

15- JSON object depth 

16- Templates (including limited HTML/Jinja2) 

17- MIME types 

18 

19Key Features: 

20- Pattern-based validation using settings-defined regex for HTML/script safety 

21- Configurable max lengths and depth limits 

22- Whitelist-based URL scheme and MIME type validation 

23- Safe escaping of user-visible text fields 

24- Reusable static/class methods for field-level and form-level validation 

25 

26Intended to be used with Pydantic or similar schema-driven systems to validate and sanitize 

27user input in a consistent, centralized way. 

28 

29Dependencies: 

30- Standard Library: re, html, logging, urllib.parse 

31- First-party: `settings` from `mcpgateway.config` 

32 

33Example usage: 

34 SecurityValidator.validate_name("my_tool", field_name="Tool Name") 

35 SecurityValidator.validate_url("https://example.com") 

36 SecurityValidator.validate_json_depth({...}) 

37 

38Examples: 

39 >>> from mcpgateway.common.validators import SecurityValidator 

40 >>> SecurityValidator.sanitize_display_text('<b>Test</b>', 'test') 

41 'Test' 

42 >>> SecurityValidator.validate_name('valid_name-123', 'test') 

43 'valid_name-123' 

44 >>> SecurityValidator.validate_identifier('my.test.id_123', 'test') 

45 'my.test.id_123' 

46 >>> SecurityValidator.validate_json_depth({'a': {'b': 1}}) 

47 >>> SecurityValidator.validate_json_depth({'a': 1}) 

48""" 

49 

50# Standard 

51from html.parser import HTMLParser 

52import ipaddress 

53import logging 

54from pathlib import Path 

55import re 

56import shlex 

57import socket 

58from typing import Any, Iterable, List, Optional, Pattern 

59from urllib.parse import urlparse 

60import uuid 

61 

62# First-Party 

63from mcpgateway.config import settings 

64 

65logger = logging.getLogger(__name__) 

66 

67# ============================================================================ 

68# Precompiled regex patterns (compiled once at module load for performance) 

69# ============================================================================ 

70# Note: Settings-based patterns (DANGEROUS_HTML_PATTERN, DANGEROUS_JS_PATTERN, 

71# NAME_PATTERN, IDENTIFIER_PATTERN, etc.) are NOT precompiled here because tests 

72# override the class attributes at runtime. Only truly static patterns are 

73# precompiled at module level. 

74 

75# Static inline patterns used multiple times 

76_HTML_SPECIAL_CHARS_RE: Pattern[str] = re.compile(r'[<>"\']') # / removed per SEP-986 

77_DANGEROUS_TEMPLATE_TAGS_RE: Pattern[str] = re.compile(r"<(script|iframe|object|embed|link|meta|base|form)\b", re.IGNORECASE) 

78_EVENT_HANDLER_RE: Pattern[str] = re.compile(r"on\w+\s*=", re.IGNORECASE) 

79_MIME_TYPE_RE: Pattern[str] = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9!#$&\-\^_+\.]*\/[a-zA-Z0-9][a-zA-Z0-9!#$&\-\^_+\.]*$") 

80_URI_SCHEME_RE: Pattern[str] = re.compile(r"^[a-zA-Z][a-zA-Z0-9+\-.]*://") 

81_SHELL_DANGEROUS_CHARS_RE: Pattern[str] = re.compile(r"[;&|`$(){}\[\]<>]") 

82_ANSI_ESCAPE_RE: Pattern[str] = re.compile(r"\x1B\[[0-9;]*[A-Za-z]") 

83_CONTROL_CHARS_RE: Pattern[str] = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]") 

84 

85# Polyglot attack patterns (precompiled with IGNORECASE) 

86_POLYGLOT_PATTERNS: List[Pattern[str]] = [ 

87 re.compile(r"['\"];.*alert\s*\(", re.IGNORECASE), 

88 re.compile(r"-->\s*<[^>]+>", re.IGNORECASE), 

89 re.compile(r"['\"].*//['\"]", re.IGNORECASE), 

90 re.compile(r"<<[A-Z]+>", re.IGNORECASE), 

91 re.compile(r"String\.fromCharCode", re.IGNORECASE), 

92 re.compile(r"javascript:.*\(", re.IGNORECASE), 

93] 

94 

95# SSTI prevention - safe scanning without regex backtracking. 

96_SSTI_DANGEROUS_SUBSTRINGS: tuple[str, ...] = ( 

97 "__", 

98 ".", 

99 "config", 

100 "self", 

101 "request", 

102 "application", 

103 "globals", 

104 "builtins", 

105 "import", 

106 "getattr", # Python getattr function 

107 "|attr", # Jinja2 attr filter (checked after whitespace normalization) 

108 "|selectattr", # Jinja2 selectattr filter (takes attribute name as arg) 

109 "|sort", # Jinja2 sort filter with attribute parameter 

110 "|map", # Jinja2 map filter with attribute parameter 

111 "attribute=", # Jinja2 filters: map(attribute=...), selectattr, sort(attribute=...) 

112 "\\x", # Hex escape sequences (e.g., \x5f for underscore) 

113 "\\u", # Unicode escape sequences (e.g., \u005f for underscore) 

114 "\\n{", # Named unicode escapes (e.g., \N{LOW LINE}) 

115 "\\0", 

116 "\\1", 

117 "\\2", 

118 "\\3", 

119 "\\4", 

120 "\\5", 

121 "\\6", 

122 "\\7", # Octal escapes 

123) 

124# Operators that enable code execution or dynamic construction 

125_SSTI_DANGEROUS_OPERATORS: tuple[str, ...] = ( 

126 "*", 

127 "/", 

128 "+", 

129 "-", 

130 "~", # Jinja2 string concatenation (can build dunder names dynamically) 

131 "[", # Bracket notation for dynamic attribute access 

132 "%", # Python string formatting (e.g., '%c' % 95 produces '_') 

133) 

134_SSTI_SIMPLE_TEMPLATE_PREFIXES: tuple[str, ...] = ("${", "#{", "%{") 

135 

136 

137def _iter_template_expressions(value: str, start: str, end: str) -> Iterable[str]: 

138 """Yield template expression contents for a start/end delimiter, skipping delimiters inside quotes. 

139 

140 Args: 

141 value (str): Template text to scan. 

142 start (str): Opening delimiter. 

143 end (str): Closing delimiter. 

144 

145 Yields: 

146 str: The template expression contents between delimiters. 

147 

148 Raises: 

149 ValueError: If an unterminated template expression is found (fail-closed behavior). 

150 """ 

151 start_len = len(start) 

152 end_len = len(end) 

153 i = 0 

154 value_len = len(value) 

155 while i <= value_len - start_len: 

156 if value.startswith(start, i): 

157 j = i + start_len 

158 in_quote: Optional[str] = None 

159 escaped = False 

160 while j <= value_len - end_len: 

161 ch = value[j] 

162 if escaped: 

163 escaped = False 

164 elif ch == "\\": 

165 escaped = True 

166 elif in_quote: 

167 if ch == in_quote: 

168 in_quote = None 

169 else: 

170 if ch in ("'", '"'): 

171 in_quote = ch 

172 elif value.startswith(end, j): 

173 yield value[i + start_len : j] 

174 i = j + end_len 

175 break 

176 j += 1 

177 else: 

178 raise ValueError("Template contains potentially dangerous expressions") 

179 else: 

180 i += 1 

181 

182 

183def _has_simple_template_expression(value: str, start: str) -> bool: 

184 """Return True if start is followed by any closing brace. 

185 

186 Uses O(n) linear scan by finding last } first, then checking prefixes. 

187 

188 Args: 

189 value (str): Template text to scan. 

190 start (str): Opening delimiter. 

191 

192 Returns: 

193 bool: True if a closing brace exists after the delimiter. 

194 """ 

195 # Find the last closing brace - if none exists, no expression can be complete 

196 last_close = value.rfind("}") 

197 if last_close == -1: 

198 return False 

199 # Check if any prefix exists before the last closing brace - O(n) single find 

200 idx = value.find(start) 

201 return idx != -1 and idx < last_close 

202 

203 

204# Dangerous URL protocol patterns (precompiled with IGNORECASE) 

205_DANGEROUS_URL_PATTERNS: List[Pattern[str]] = [ 

206 re.compile(r"javascript:", re.IGNORECASE), 

207 re.compile(r"data:", re.IGNORECASE), 

208 re.compile(r"vbscript:", re.IGNORECASE), 

209 re.compile(r"about:", re.IGNORECASE), 

210 re.compile(r"chrome:", re.IGNORECASE), 

211 re.compile(r"file:", re.IGNORECASE), 

212 re.compile(r"ftp:", re.IGNORECASE), 

213 re.compile(r"mailto:", re.IGNORECASE), 

214] 

215 

216# SQL injection patterns (precompiled with IGNORECASE) 

217_SQL_PATTERNS: List[Pattern[str]] = [ 

218 re.compile(r"[';\"\\]", re.IGNORECASE), 

219 re.compile(r"--", re.IGNORECASE), 

220 re.compile(r"/\*.*?\*/", re.IGNORECASE), 

221 re.compile(r"\b(union|select|insert|update|delete|drop|exec|execute)\b", re.IGNORECASE), 

222] 

223 

224 

225# ============================================================================ 

226# HTML Tag Stripper with Character Preservation 

227# ============================================================================ 

228class _TagStripper(HTMLParser): 

229 """Strip HTML tags while preserving all text content and special characters. 

230 

231 This parser removes HTML tags but keeps the text content exactly as-is, 

232 including special characters like &, ", and '. HTML entities are decoded 

233 to their literal characters (e.g., & becomes &). 

234 """ 

235 

236 def __init__(self) -> None: 

237 super().__init__(convert_charrefs=True) 

238 self.reset() 

239 self.strict = False 

240 self.fed: List[str] = [] 

241 

242 def handle_data(self, data: str) -> None: 

243 """Handle text data between tags. 

244 

245 With convert_charrefs=True, HTML entities are automatically decoded 

246 (e.g., &amp; → &) and plain text with & passes through unchanged. 

247 

248 Args: 

249 data: Text content between HTML tags 

250 """ 

251 self.fed.append(data) 

252 

253 def get_data(self) -> str: 

254 """Return the accumulated text content. 

255 

256 Returns: 

257 str: Concatenated text content from all handled data 

258 """ 

259 return "".join(self.fed) 

260 

261 

262def _strip_html_tags(value: str) -> str: 

263 """Remove HTML tags while preserving special characters exactly as-is. 

264 

265 Args: 

266 value: String that may contain HTML tags 

267 

268 Returns: 

269 String with HTML tags removed but text content preserved 

270 

271 Examples: 

272 >>> _strip_html_tags('<b>Hello</b> World') 

273 'Hello World' 

274 >>> _strip_html_tags('Test & Check') 

275 'Test & Check' 

276 >>> _strip_html_tags('Quote: "Hello"') 

277 'Quote: "Hello"' 

278 >>> _strip_html_tags('&&&') 

279 '&&&' 

280 """ 

281 s = _TagStripper() 

282 s.feed(value) 

283 s.close() 

284 return s.get_data() 

285 

286 

287class SecurityValidator: 

288 """Configurable validation with MCP-compliant limits""" 

289 

290 # Configurable patterns (from settings) 

291 DANGEROUS_HTML_PATTERN = ( 

292 settings.validation_dangerous_html_pattern 

293 ) # Default: '<(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)\b|</*(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)>' 

294 DANGEROUS_JS_PATTERN = settings.validation_dangerous_js_pattern # Default: javascript:|vbscript:|on\w+\s*=|data:.*script 

295 ALLOWED_URL_SCHEMES = settings.validation_allowed_url_schemes # Default: ["http://", "https://", "ws://", "wss://"] 

296 

297 # Character type patterns 

298 NAME_PATTERN = settings.validation_name_pattern # Default: ^[a-zA-Z0-9_.\- ]+$ (literal space, not \s) 

299 IDENTIFIER_PATTERN = settings.validation_identifier_pattern # Default: ^[a-zA-Z0-9_\-\.]+$ 

300 VALIDATION_SAFE_URI_PATTERN = settings.validation_safe_uri_pattern # Default: ^[a-zA-Z0-9_\-.:/?=&%]+$ 

301 VALIDATION_UNSAFE_URI_PATTERN = settings.validation_unsafe_uri_pattern # Default: [<>"\'\\] 

302 TOOL_NAME_PATTERN = settings.validation_tool_name_pattern # Default: ^[a-zA-Z0-9_][a-zA-Z0-9._/-]*$ (SEP-986) 

303 

304 # MCP-compliant limits (configurable) 

305 MAX_NAME_LENGTH = settings.validation_max_name_length # Default: 255 

306 MAX_DESCRIPTION_LENGTH = settings.validation_max_description_length # Default: 8192 (8KB) 

307 MAX_TEMPLATE_LENGTH = settings.validation_max_template_length # Default: 65536 

308 MAX_CONTENT_LENGTH = settings.validation_max_content_length # Default: 1048576 (1MB) 

309 MAX_JSON_DEPTH = settings.validation_max_json_depth # Default: 30 

310 MAX_URL_LENGTH = settings.validation_max_url_length # Default: 2048 

311 

312 @classmethod 

313 def sanitize_display_text(cls, value: str, field_name: str) -> str: 

314 """Ensure text is safe for display in UI by escaping special characters 

315 

316 Args: 

317 value (str): Value to validate 

318 field_name (str): Name of field being validated 

319 

320 Returns: 

321 str: Value if acceptable 

322 

323 Raises: 

324 ValueError: When input is not acceptable 

325 

326 Examples: 

327 Basic HTML tag stripping: 

328 

329 >>> SecurityValidator.sanitize_display_text('Hello World', 'test') 

330 'Hello World' 

331 >>> SecurityValidator.sanitize_display_text('Hello <b>World</b>', 'test') 

332 'Hello World' 

333 

334 Empty/None handling: 

335 

336 >>> SecurityValidator.sanitize_display_text('', 'test') 

337 '' 

338 >>> SecurityValidator.sanitize_display_text(None, 'test') #doctest: +SKIP 

339 

340 Dangerous script patterns: 

341 

342 >>> SecurityValidator.sanitize_display_text('alert();', 'test') 

343 'alert();' 

344 >>> SecurityValidator.sanitize_display_text('javascript:alert(1)', 'test') 

345 Traceback (most recent call last): 

346 ... 

347 ValueError: test contains script patterns that may cause display issues 

348 

349 Polyglot attack patterns: 

350 

351 >>> SecurityValidator.sanitize_display_text('"; alert()', 'test') 

352 Traceback (most recent call last): 

353 ... 

354 ValueError: test contains potentially dangerous character sequences 

355 >>> SecurityValidator.sanitize_display_text('-->test', 'test') 

356 '-->test' 

357 >>> SecurityValidator.sanitize_display_text('--><script>', 'test') 

358 Traceback (most recent call last): 

359 ... 

360 ValueError: test contains HTML tags that may cause display issues 

361 >>> SecurityValidator.sanitize_display_text('String.fromCharCode(65)', 'test') 

362 Traceback (most recent call last): 

363 ... 

364 ValueError: test contains potentially dangerous character sequences 

365 

366 Special characters (preserved as-is, no HTML entity conversion): 

367 

368 >>> SecurityValidator.sanitize_display_text('User & Admin', 'test') 

369 'User & Admin' 

370 >>> SecurityValidator.sanitize_display_text('Quote: "Hello"', 'test') 

371 'Quote: "Hello"' 

372 >>> SecurityValidator.sanitize_display_text("Quote: 'Hello'", 'test') 

373 "Quote: 'Hello'" 

374 """ 

375 if not value: 

376 return value 

377 

378 # Check for patterns that could cause display issues 

379 if re.search(cls.DANGEROUS_HTML_PATTERN, value, re.IGNORECASE): 

380 raise ValueError(f"{field_name} contains HTML tags that may cause display issues") 

381 

382 if re.search(cls.DANGEROUS_JS_PATTERN, value, re.IGNORECASE): 

383 raise ValueError(f"{field_name} contains script patterns that may cause display issues") 

384 

385 # Check for polyglot patterns (uses precompiled regex list) 

386 for pattern in _POLYGLOT_PATTERNS: 

387 if pattern.search(value): 

388 raise ValueError(f"{field_name} contains potentially dangerous character sequences") 

389 

390 cleaned = _strip_html_tags(value) 

391 return cleaned 

392 

393 @classmethod 

394 def validate_name(cls, value: str, field_name: str = "Name") -> str: 

395 """Validate names with strict character requirements 

396 

397 Args: 

398 value (str): Value to validate 

399 field_name (str): Name of field being validated 

400 

401 Returns: 

402 str: Value if acceptable 

403 

404 Raises: 

405 ValueError: When input is not acceptable 

406 

407 Examples: 

408 >>> SecurityValidator.validate_name('valid_name') 

409 'valid_name' 

410 >>> SecurityValidator.validate_name('valid_name-123') 

411 'valid_name-123' 

412 >>> SecurityValidator.validate_name('valid_name_test') 

413 'valid_name_test' 

414 >>> SecurityValidator.validate_name('Test Name') 

415 'Test Name' 

416 >>> try: 

417 ... SecurityValidator.validate_name('Invalid Name!') 

418 ... except ValueError as e: 

419 ... 'can only contain' in str(e) 

420 True 

421 >>> try: 

422 ... SecurityValidator.validate_name('') 

423 ... except ValueError as e: 

424 ... 'cannot be empty' in str(e) 

425 True 

426 >>> try: 

427 ... SecurityValidator.validate_name('name<script>') 

428 ... except ValueError as e: 

429 ... 'HTML special characters' in str(e) or 'can only contain' in str(e) 

430 True 

431 

432 Test length limit (line 181): 

433 

434 >>> long_name = 'a' * 256 

435 >>> try: 

436 ... SecurityValidator.validate_name(long_name) 

437 ... except ValueError as e: 

438 ... 'exceeds maximum length' in str(e) 

439 True 

440 

441 Test HTML special characters (line 178): 

442 

443 >>> try: 

444 ... SecurityValidator.validate_name('name"test') 

445 ... except ValueError as e: 

446 ... 'can only contain' in str(e) 

447 True 

448 >>> try: 

449 ... SecurityValidator.validate_name("name'test") 

450 ... except ValueError as e: 

451 ... 'can only contain' in str(e) 

452 True 

453 >>> try: 

454 ... SecurityValidator.validate_name('name/test') 

455 ... except ValueError as e: 

456 ... 'can only contain' in str(e) 

457 True 

458 """ 

459 if not value: 

460 raise ValueError(f"{field_name} cannot be empty") 

461 

462 # Check against allowed pattern 

463 if not re.match(cls.NAME_PATTERN, value): 

464 raise ValueError(f"{field_name} can only contain letters, numbers, underscore, and hyphen. Special characters like <, >, quotes are not allowed.") 

465 

466 # Additional check for HTML-like patterns (uses precompiled regex) 

467 if _HTML_SPECIAL_CHARS_RE.search(value): 

468 raise ValueError(f"{field_name} cannot contain HTML special characters") 

469 

470 if len(value) > cls.MAX_NAME_LENGTH: 

471 raise ValueError(f"{field_name} exceeds maximum length of {cls.MAX_NAME_LENGTH}") 

472 

473 return value 

474 

475 @classmethod 

476 def validate_identifier(cls, value: str, field_name: str) -> str: 

477 """Validate identifiers (IDs) - MCP compliant 

478 

479 Args: 

480 value (str): Value to validate 

481 field_name (str): Name of field being validated 

482 

483 Returns: 

484 str: Value if acceptable 

485 

486 Raises: 

487 ValueError: When input is not acceptable 

488 

489 Examples: 

490 >>> SecurityValidator.validate_identifier('valid_id', 'ID') 

491 'valid_id' 

492 >>> SecurityValidator.validate_identifier('valid.id.123', 'ID') 

493 'valid.id.123' 

494 >>> SecurityValidator.validate_identifier('valid-id_test', 'ID') 

495 'valid-id_test' 

496 >>> SecurityValidator.validate_identifier('test123', 'ID') 

497 'test123' 

498 >>> try: 

499 ... SecurityValidator.validate_identifier('Invalid/ID', 'ID') 

500 ... except ValueError as e: 

501 ... 'can only contain' in str(e) 

502 True 

503 >>> try: 

504 ... SecurityValidator.validate_identifier('', 'ID') 

505 ... except ValueError as e: 

506 ... 'cannot be empty' in str(e) 

507 True 

508 >>> try: 

509 ... SecurityValidator.validate_identifier('id<script>', 'ID') 

510 ... except ValueError as e: 

511 ... 'HTML special characters' in str(e) or 'can only contain' in str(e) 

512 True 

513 

514 Test HTML special characters (line 233): 

515 

516 >>> try: 

517 ... SecurityValidator.validate_identifier('id"test', 'ID') 

518 ... except ValueError as e: 

519 ... 'can only contain' in str(e) 

520 True 

521 >>> try: 

522 ... SecurityValidator.validate_identifier("id'test", 'ID') 

523 ... except ValueError as e: 

524 ... 'can only contain' in str(e) 

525 True 

526 >>> try: 

527 ... SecurityValidator.validate_identifier('id/test', 'ID') 

528 ... except ValueError as e: 

529 ... 'can only contain' in str(e) 

530 True 

531 

532 Test length limit (line 236): 

533 

534 >>> long_id = 'a' * 256 

535 >>> try: 

536 ... SecurityValidator.validate_identifier(long_id, 'ID') 

537 ... except ValueError as e: 

538 ... 'exceeds maximum length' in str(e) 

539 True 

540 """ 

541 if not value: 

542 raise ValueError(f"{field_name} cannot be empty") 

543 

544 # MCP spec: identifiers should be alphanumeric + limited special chars 

545 if not re.match(cls.IDENTIFIER_PATTERN, value): 

546 raise ValueError(f"{field_name} can only contain letters, numbers, underscore, hyphen, and dots") 

547 

548 # Block HTML-like patterns (uses precompiled regex) 

549 if _HTML_SPECIAL_CHARS_RE.search(value): 

550 raise ValueError(f"{field_name} cannot contain HTML special characters") 

551 

552 if len(value) > cls.MAX_NAME_LENGTH: 

553 raise ValueError(f"{field_name} exceeds maximum length of {cls.MAX_NAME_LENGTH}") 

554 

555 return value 

556 

557 @classmethod 

558 def validate_uri(cls, value: str, field_name: str = "URI") -> str: 

559 """Validate URIs - MCP compliant 

560 

561 Args: 

562 value (str): Value to validate 

563 field_name (str): Name of field being validated 

564 

565 Returns: 

566 str: Value if acceptable 

567 

568 Raises: 

569 ValueError: When input is not acceptable 

570 

571 Examples: 

572 >>> SecurityValidator.validate_uri('/valid/uri', 'URI') 

573 '/valid/uri' 

574 >>> SecurityValidator.validate_uri('..', 'URI') 

575 Traceback (most recent call last): 

576 ... 

577 ValueError: URI cannot contain directory traversal sequences ('..') 

578 """ 

579 if not value: 

580 raise ValueError(f"{field_name} cannot be empty") 

581 

582 # Block HTML-like patterns 

583 if re.search(cls.VALIDATION_UNSAFE_URI_PATTERN, value): 

584 raise ValueError(f"{field_name} cannot contain HTML special characters") 

585 

586 if ".." in value: 

587 raise ValueError(f"{field_name} cannot contain directory traversal sequences ('..')") 

588 

589 if not re.search(cls.VALIDATION_SAFE_URI_PATTERN, value): 

590 raise ValueError(f"{field_name} contains invalid characters") 

591 

592 if len(value) > cls.MAX_NAME_LENGTH: 

593 raise ValueError(f"{field_name} exceeds maximum length of {cls.MAX_NAME_LENGTH}") 

594 

595 return value 

596 

597 @classmethod 

598 def validate_tool_name(cls, value: str) -> str: 

599 """Special validation for MCP tool names 

600 

601 Args: 

602 value (str): Value to validate 

603 

604 Returns: 

605 str: Value if acceptable 

606 

607 Raises: 

608 ValueError: When input is not acceptable 

609 

610 Examples: 

611 >>> SecurityValidator.validate_tool_name('tool_1') 

612 'tool_1' 

613 >>> SecurityValidator.validate_tool_name('_5gpt_query') 

614 '_5gpt_query' 

615 >>> SecurityValidator.validate_tool_name('1tool') 

616 '1tool' 

617 

618 Test invalid characters (rejected by pattern): 

619 

620 >>> try: 

621 ... SecurityValidator.validate_tool_name('tool<script>') 

622 ... except ValueError as e: 

623 ... 'must start with a letter, number, or underscore' in str(e) 

624 True 

625 >>> try: 

626 ... SecurityValidator.validate_tool_name('tool"test') 

627 ... except ValueError as e: 

628 ... 'must start with a letter, number, or underscore' in str(e) 

629 True 

630 >>> try: 

631 ... SecurityValidator.validate_tool_name("tool'test") 

632 ... except ValueError as e: 

633 ... 'must start with a letter, number, or underscore' in str(e) 

634 True 

635 >>> # Slashes are allowed per SEP-986 

636 >>> SecurityValidator.validate_tool_name('tool/test') 

637 'tool/test' 

638 >>> SecurityValidator.validate_tool_name('namespace/subtool') 

639 'namespace/subtool' 

640 

641 Test length limit (line 313): 

642 

643 >>> long_tool_name = 'a' * 256 

644 >>> try: 

645 ... SecurityValidator.validate_tool_name(long_tool_name) 

646 ... except ValueError as e: 

647 ... 'exceeds maximum length' in str(e) 

648 True 

649 """ 

650 if not value: 

651 raise ValueError("Tool name cannot be empty") 

652 

653 # MCP tools have specific naming requirements 

654 if not re.match(cls.TOOL_NAME_PATTERN, value): 

655 raise ValueError("Tool name must start with a letter, number, or underscore and contain only letters, numbers, periods, underscores, hyphens, and slashes") 

656 

657 # Ensure no HTML-like content (uses precompiled regex) 

658 if _HTML_SPECIAL_CHARS_RE.search(value): 

659 raise ValueError("Tool name cannot contain HTML special characters") 

660 

661 if len(value) > cls.MAX_NAME_LENGTH: 

662 raise ValueError(f"Tool name exceeds maximum length of {cls.MAX_NAME_LENGTH}") 

663 

664 return value 

665 

666 @classmethod 

667 def validate_uuid(cls, value: str, field_name: str = "UUID") -> str: 

668 """Validate UUID format 

669 

670 Args: 

671 value (str): Value to validate 

672 field_name (str): Name of field being validated 

673 

674 Returns: 

675 str: Value if validated as safe 

676 

677 Raises: 

678 ValueError: When value is not a valid UUID 

679 

680 Examples: 

681 >>> SecurityValidator.validate_uuid('550e8400-e29b-41d4-a716-446655440000') 

682 '550e8400e29b41d4a716446655440000' 

683 >>> SecurityValidator.validate_uuid('invalid-uuid') 

684 Traceback (most recent call last): 

685 ... 

686 ValueError: UUID must be a valid UUID format 

687 

688 Test empty UUID (line 340): 

689 

690 >>> SecurityValidator.validate_uuid('') 

691 '' 

692 

693 Test normalized UUID format (lines 344-346): 

694 

695 >>> SecurityValidator.validate_uuid('550E8400-E29B-41D4-A716-446655440000') 

696 '550e8400e29b41d4a716446655440000' 

697 >>> SecurityValidator.validate_uuid('550e8400e29b41d4a716446655440000') 

698 '550e8400e29b41d4a716446655440000' 

699 

700 Test various invalid UUID formats (line 347-348): 

701 

702 >>> try: 

703 ... SecurityValidator.validate_uuid('not-a-uuid') 

704 ... except ValueError as e: 

705 ... 'valid UUID format' in str(e) 

706 True 

707 >>> try: 

708 ... SecurityValidator.validate_uuid('550e8400-e29b-41d4-a716') 

709 ... except ValueError as e: 

710 ... 'valid UUID format' in str(e) 

711 True 

712 >>> try: 

713 ... SecurityValidator.validate_uuid('550e8400-e29b-41d4-a716-446655440000-extra') 

714 ... except ValueError as e: 

715 ... 'valid UUID format' in str(e) 

716 True 

717 >>> try: 

718 ... SecurityValidator.validate_uuid('gggggggg-gggg-gggg-gggg-gggggggggggg') 

719 ... except ValueError as e: 

720 ... 'valid UUID format' in str(e) 

721 True 

722 """ 

723 if not value: 

724 return value 

725 

726 try: 

727 # Validate UUID format by attempting to parse it 

728 uuid_obj = uuid.UUID(value) 

729 # Return the normalized string representation 

730 return str(uuid_obj).replace("-", "") 

731 except ValueError: 

732 logger.error(f"Invalid UUID format for {field_name}: {value}") 

733 raise ValueError(f"{field_name} must be a valid UUID format") 

734 

735 @classmethod 

736 def validate_template(cls, value: str) -> str: 

737 """Special validation for templates - allow safe Jinja2 but prevent SSTI 

738 

739 Args: 

740 value (str): Value to validate 

741 

742 Returns: 

743 str: Value if acceptable 

744 

745 Raises: 

746 ValueError: When input is not acceptable 

747 

748 Examples: 

749 Empty template handling: 

750 

751 >>> SecurityValidator.validate_template('') 

752 '' 

753 >>> SecurityValidator.validate_template(None) #doctest: +SKIP 

754 

755 Safe Jinja2 templates: 

756 

757 >>> SecurityValidator.validate_template('Hello {{ name }}') 

758 'Hello {{ name }}' 

759 >>> SecurityValidator.validate_template('{% if condition %}text{% endif %}') 

760 '{% if condition %}text{% endif %}' 

761 >>> SecurityValidator.validate_template('{{ username }}') 

762 '{{ username }}' 

763 

764 Dangerous HTML tags blocked: 

765 

766 >>> SecurityValidator.validate_template('Hello <script>alert(1)</script>') 

767 Traceback (most recent call last): 

768 ... 

769 ValueError: Template contains HTML tags that may interfere with proper display 

770 >>> SecurityValidator.validate_template('Test <iframe src="evil.com"></iframe>') 

771 Traceback (most recent call last): 

772 ... 

773 ValueError: Template contains HTML tags that may interfere with proper display 

774 >>> SecurityValidator.validate_template('<form action="/evil"></form>') 

775 Traceback (most recent call last): 

776 ... 

777 ValueError: Template contains HTML tags that may interfere with proper display 

778 

779 Event handlers blocked: 

780 

781 >>> SecurityValidator.validate_template('<div onclick="evil()">Test</div>') 

782 Traceback (most recent call last): 

783 ... 

784 ValueError: Template contains event handlers that may cause display issues 

785 >>> SecurityValidator.validate_template('onload = "alert(1)"') 

786 Traceback (most recent call last): 

787 ... 

788 ValueError: Template contains event handlers that may cause display issues 

789 

790 SSTI prevention patterns: 

791 

792 >>> SecurityValidator.validate_template('{{ __import__ }}') 

793 Traceback (most recent call last): 

794 ... 

795 ValueError: Template contains potentially dangerous expressions 

796 >>> SecurityValidator.validate_template('{{ config }}') 

797 Traceback (most recent call last): 

798 ... 

799 ValueError: Template contains potentially dangerous expressions 

800 >>> SecurityValidator.validate_template('{% import os %}') 

801 Traceback (most recent call last): 

802 ... 

803 ValueError: Template contains potentially dangerous expressions 

804 >>> SecurityValidator.validate_template('{{ 7*7 }}') 

805 Traceback (most recent call last): 

806 ... 

807 ValueError: Template contains potentially dangerous expressions 

808 >>> SecurityValidator.validate_template('{{ 10/2 }}') 

809 Traceback (most recent call last): 

810 ... 

811 ValueError: Template contains potentially dangerous expressions 

812 >>> SecurityValidator.validate_template('{{ 5+5 }}') 

813 Traceback (most recent call last): 

814 ... 

815 ValueError: Template contains potentially dangerous expressions 

816 >>> SecurityValidator.validate_template('{{ 10-5 }}') 

817 Traceback (most recent call last): 

818 ... 

819 ValueError: Template contains potentially dangerous expressions 

820 

821 Other template injection patterns: 

822 

823 >>> SecurityValidator.validate_template('${evil}') 

824 Traceback (most recent call last): 

825 ... 

826 ValueError: Template contains potentially dangerous expressions 

827 >>> SecurityValidator.validate_template('#{evil}') 

828 Traceback (most recent call last): 

829 ... 

830 ValueError: Template contains potentially dangerous expressions 

831 >>> SecurityValidator.validate_template('%{evil}') 

832 Traceback (most recent call last): 

833 ... 

834 ValueError: Template contains potentially dangerous expressions 

835 

836 Length limit testing: 

837 

838 >>> long_template = 'a' * 65537 

839 >>> SecurityValidator.validate_template(long_template) 

840 Traceback (most recent call last): 

841 ... 

842 ValueError: Template exceeds maximum length of 65536 

843 """ 

844 if not value: 

845 return value 

846 

847 if len(value) > cls.MAX_TEMPLATE_LENGTH: 

848 raise ValueError(f"Template exceeds maximum length of {cls.MAX_TEMPLATE_LENGTH}") 

849 

850 # Block dangerous tags but allow Jinja2 syntax {{ }} and {% %} (uses precompiled regex) 

851 if _DANGEROUS_TEMPLATE_TAGS_RE.search(value): 

852 raise ValueError("Template contains HTML tags that may interfere with proper display") 

853 

854 # Check for event handlers that could cause issues (uses precompiled regex) 

855 if _EVENT_HANDLER_RE.search(value): 

856 raise ValueError("Template contains event handlers that may cause display issues") 

857 

858 # SSTI prevention - scan expressions without regex backtracking. 

859 for expr in _iter_template_expressions(value, "{{", "}}"): 

860 expr_lower = expr.lower() 

861 # Normalize whitespace around | and = to catch bypass variants 

862 expr_normalized = re.sub(r"\s*\|\s*", "|", expr_lower) 

863 expr_normalized = re.sub(r"\s*=\s*", "=", expr_normalized) 

864 if any(token in expr_normalized for token in _SSTI_DANGEROUS_SUBSTRINGS): 

865 raise ValueError("Template contains potentially dangerous expressions") 

866 if any(op in expr for op in _SSTI_DANGEROUS_OPERATORS): 

867 raise ValueError("Template contains potentially dangerous expressions") 

868 

869 for expr in _iter_template_expressions(value, "{%", "%}"): 

870 expr_lower = expr.lower() 

871 # Normalize whitespace around | and = to catch bypass variants 

872 expr_normalized = re.sub(r"\s*\|\s*", "|", expr_lower) 

873 expr_normalized = re.sub(r"\s*=\s*", "=", expr_normalized) 

874 if any(token in expr_normalized for token in _SSTI_DANGEROUS_SUBSTRINGS): 

875 raise ValueError("Template contains potentially dangerous expressions") 

876 if any(op in expr for op in _SSTI_DANGEROUS_OPERATORS): 

877 raise ValueError("Template contains potentially dangerous expressions") 

878 

879 if any(_has_simple_template_expression(value, prefix) for prefix in _SSTI_SIMPLE_TEMPLATE_PREFIXES): 

880 raise ValueError("Template contains potentially dangerous expressions") 

881 

882 return value 

883 

884 @classmethod 

885 def validate_url(cls, value: str, field_name: str = "URL") -> str: 

886 """Validate URLs for allowed schemes and safe display 

887 

888 Args: 

889 value (str): Value to validate 

890 field_name (str): Name of field being validated 

891 

892 Returns: 

893 str: Value if acceptable 

894 

895 Raises: 

896 ValueError: When input is not acceptable 

897 

898 Examples: 

899 Valid URLs: 

900 

901 >>> SecurityValidator.validate_url('https://example.com') 

902 'https://example.com' 

903 >>> SecurityValidator.validate_url('http://example.com') 

904 'http://example.com' 

905 >>> SecurityValidator.validate_url('ws://example.com') 

906 'ws://example.com' 

907 >>> SecurityValidator.validate_url('wss://example.com') 

908 'wss://example.com' 

909 >>> SecurityValidator.validate_url('https://example.com:8080/path') 

910 'https://example.com:8080/path' 

911 >>> SecurityValidator.validate_url('https://example.com/path?query=value') 

912 'https://example.com/path?query=value' 

913 

914 Empty URL handling: 

915 

916 >>> SecurityValidator.validate_url('') 

917 Traceback (most recent call last): 

918 ... 

919 ValueError: URL cannot be empty 

920 

921 Length validation: 

922 

923 >>> long_url = 'https://example.com/' + 'a' * 2100 

924 >>> SecurityValidator.validate_url(long_url) 

925 Traceback (most recent call last): 

926 ... 

927 ValueError: URL exceeds maximum length of 2048 

928 

929 Scheme validation: 

930 

931 >>> SecurityValidator.validate_url('ftp://example.com') 

932 Traceback (most recent call last): 

933 ... 

934 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

935 >>> SecurityValidator.validate_url('file:///etc/passwd') 

936 Traceback (most recent call last): 

937 ... 

938 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

939 >>> SecurityValidator.validate_url('javascript:alert(1)') 

940 Traceback (most recent call last): 

941 ... 

942 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

943 >>> SecurityValidator.validate_url('data:text/plain,hello') 

944 Traceback (most recent call last): 

945 ... 

946 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

947 >>> SecurityValidator.validate_url('vbscript:alert(1)') 

948 Traceback (most recent call last): 

949 ... 

950 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

951 >>> SecurityValidator.validate_url('about:blank') 

952 Traceback (most recent call last): 

953 ... 

954 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

955 >>> SecurityValidator.validate_url('chrome://settings') 

956 Traceback (most recent call last): 

957 ... 

958 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

959 >>> SecurityValidator.validate_url('mailto:test@example.com') 

960 Traceback (most recent call last): 

961 ... 

962 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

963 

964 IPv6 URL blocking: 

965 

966 >>> SecurityValidator.validate_url('https://[::1]:8080/') 

967 Traceback (most recent call last): 

968 ... 

969 ValueError: URL contains IPv6 address which is not supported 

970 >>> SecurityValidator.validate_url('https://[2001:db8::1]/') 

971 Traceback (most recent call last): 

972 ... 

973 ValueError: URL contains IPv6 address which is not supported 

974 

975 Protocol-relative URL blocking: 

976 

977 >>> SecurityValidator.validate_url('//example.com/path') 

978 Traceback (most recent call last): 

979 ... 

980 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

981 

982 Line break injection: 

983 

984 >>> SecurityValidator.validate_url('https://example.com\\rHost: evil.com') 

985 Traceback (most recent call last): 

986 ... 

987 ValueError: URL contains line breaks which are not allowed 

988 >>> SecurityValidator.validate_url('https://example.com\\nHost: evil.com') 

989 Traceback (most recent call last): 

990 ... 

991 ValueError: URL contains line breaks which are not allowed 

992 

993 Space validation: 

994 

995 >>> SecurityValidator.validate_url('https://exam ple.com') 

996 Traceback (most recent call last): 

997 ... 

998 ValueError: URL contains spaces which are not allowed in URLs 

999 >>> SecurityValidator.validate_url('https://example.com/path?query=hello world') 

1000 'https://example.com/path?query=hello world' 

1001 

1002 Malformed URLs: 

1003 

1004 >>> SecurityValidator.validate_url('https://') 

1005 Traceback (most recent call last): 

1006 ... 

1007 ValueError: URL is not a valid URL 

1008 >>> SecurityValidator.validate_url('not-a-url') 

1009 Traceback (most recent call last): 

1010 ... 

1011 ValueError: URL must start with one of: http://, https://, ws://, wss:// 

1012 

1013 Restricted IP addresses: 

1014 

1015 >>> SecurityValidator.validate_url('https://0.0.0.0/') 

1016 Traceback (most recent call last): 

1017 ... 

1018 ValueError: URL contains invalid IP address (0.0.0.0) 

1019 >>> SecurityValidator.validate_url('https://169.254.169.254/') # doctest: +ELLIPSIS 

1020 Traceback (most recent call last): 

1021 ... 

1022 ValueError: URL contains IP address blocked by SSRF protection ... 

1023 

1024 Invalid port numbers: 

1025 

1026 >>> SecurityValidator.validate_url('https://example.com:0/') 

1027 Traceback (most recent call last): 

1028 ... 

1029 ValueError: URL contains invalid port number 

1030 >>> try: 

1031 ... SecurityValidator.validate_url('https://example.com:65536/') 

1032 ... except ValueError as e: 

1033 ... 'Port out of range' in str(e) or 'invalid port' in str(e) 

1034 True 

1035 

1036 Credentials in URL: 

1037 

1038 >>> SecurityValidator.validate_url('https://user:pass@example.com/') 

1039 Traceback (most recent call last): 

1040 ... 

1041 ValueError: URL contains credentials which are not allowed 

1042 >>> SecurityValidator.validate_url('https://user@example.com/') 

1043 Traceback (most recent call last): 

1044 ... 

1045 ValueError: URL contains credentials which are not allowed 

1046 

1047 XSS patterns in URLs: 

1048 

1049 >>> SecurityValidator.validate_url('https://example.com/<script>') 

1050 Traceback (most recent call last): 

1051 ... 

1052 ValueError: URL contains HTML tags that may cause security issues 

1053 >>> SecurityValidator.validate_url('https://example.com?param=javascript:alert(1)') 

1054 Traceback (most recent call last): 

1055 ... 

1056 ValueError: URL contains unsupported or potentially dangerous protocol 

1057 """ 

1058 if not value: 

1059 raise ValueError(f"{field_name} cannot be empty") 

1060 

1061 # Length check 

1062 if len(value) > cls.MAX_URL_LENGTH: 

1063 raise ValueError(f"{field_name} exceeds maximum length of {cls.MAX_URL_LENGTH}") 

1064 

1065 # Check allowed schemes 

1066 allowed_schemes = cls.ALLOWED_URL_SCHEMES 

1067 if not any(value.lower().startswith(scheme.lower()) for scheme in allowed_schemes): 

1068 raise ValueError(f"{field_name} must start with one of: {', '.join(allowed_schemes)}") 

1069 

1070 # Block dangerous URL patterns (uses precompiled regex list) 

1071 for pattern in _DANGEROUS_URL_PATTERNS: 

1072 if pattern.search(value): 

1073 raise ValueError(f"{field_name} contains unsupported or potentially dangerous protocol") 

1074 

1075 # Block IPv6 URLs (URLs with square brackets) 

1076 if "[" in value or "]" in value: 

1077 raise ValueError(f"{field_name} contains IPv6 address which is not supported") 

1078 

1079 # Block protocol-relative URLs 

1080 if value.startswith("//"): 

1081 raise ValueError(f"{field_name} contains protocol-relative URL which is not supported") 

1082 

1083 # Check for CRLF injection 

1084 if "\r" in value or "\n" in value: 

1085 raise ValueError(f"{field_name} contains line breaks which are not allowed") 

1086 

1087 # Check for spaces in domain 

1088 if " " in value.split("?", maxsplit=1)[0]: # Check only in the URL part, not query string 

1089 raise ValueError(f"{field_name} contains spaces which are not allowed in URLs") 

1090 

1091 # Basic URL structure validation 

1092 try: 

1093 result = urlparse(value) 

1094 if not all([result.scheme, result.netloc]): 

1095 raise ValueError(f"{field_name} is not a valid URL") 

1096 

1097 # Additional validation: ensure netloc doesn't contain brackets (double-check) 

1098 if "[" in result.netloc or "]" in result.netloc: 

1099 raise ValueError(f"{field_name} contains IPv6 address which is not supported") 

1100 

1101 # SSRF Protection: Block dangerous IP addresses and hostnames 

1102 hostname = result.hostname 

1103 if hostname: 

1104 # Always block 0.0.0.0 (all interfaces) regardless of SSRF settings 

1105 if hostname == "0.0.0.0": # nosec B104 - we're blocking this for security 

1106 raise ValueError(f"{field_name} contains invalid IP address (0.0.0.0)") 

1107 

1108 # Apply SSRF protection if enabled 

1109 if settings.ssrf_protection_enabled: 

1110 cls._validate_ssrf(hostname, field_name) 

1111 

1112 # Validate port number 

1113 if result.port is not None: 

1114 if result.port < 1 or result.port > 65535: 

1115 raise ValueError(f"{field_name} contains invalid port number") 

1116 

1117 # Check for credentials in URL 

1118 if result.username or result.password: 

1119 raise ValueError(f"{field_name} contains credentials which are not allowed") 

1120 

1121 # Check for XSS patterns in the entire URL 

1122 if re.search(cls.DANGEROUS_HTML_PATTERN, value, re.IGNORECASE): 

1123 raise ValueError(f"{field_name} contains HTML tags that may cause security issues") 

1124 

1125 if re.search(cls.DANGEROUS_JS_PATTERN, value, re.IGNORECASE): 

1126 raise ValueError(f"{field_name} contains script patterns that may cause security issues") 

1127 

1128 except ValueError: 

1129 # Re-raise ValueError as-is 

1130 raise 

1131 except Exception: 

1132 raise ValueError(f"{field_name} is not a valid URL") 

1133 

1134 return value 

1135 

1136 @classmethod 

1137 def _validate_ssrf(cls, hostname: str, field_name: str) -> None: 

1138 """Validate hostname/IP against SSRF protection rules. 

1139 

1140 This method implements configurable SSRF (Server-Side Request Forgery) protection 

1141 to prevent the gateway from being used to access internal resources or cloud 

1142 metadata services. 

1143 

1144 Args: 

1145 hostname (str): The hostname or IP address to validate. 

1146 field_name (str): Name of field being validated (for error messages). 

1147 

1148 Raises: 

1149 ValueError: If the hostname/IP is blocked by SSRF protection rules. 

1150 

1151 Configuration (via settings): 

1152 - ssrf_protection_enabled: Master switch (must be True for this to be called) 

1153 - ssrf_blocked_networks: CIDR ranges always blocked (e.g., cloud metadata) 

1154 - ssrf_blocked_hosts: Hostnames always blocked 

1155 - ssrf_allow_localhost: If False, blocks 127.0.0.0/8 and localhost 

1156 - ssrf_allow_private_networks: If False, blocks RFC 1918 private ranges 

1157 - ssrf_allowed_networks: Optional CIDR allowlist for private ranges 

1158 

1159 Examples: 

1160 Cloud metadata (always blocked): 

1161 

1162 >>> from unittest.mock import patch, MagicMock 

1163 >>> mock_settings = MagicMock() 

1164 >>> mock_settings.ssrf_protection_enabled = True 

1165 >>> mock_settings.ssrf_blocked_networks = ["169.254.169.254/32"] 

1166 >>> mock_settings.ssrf_blocked_hosts = ["metadata.google.internal"] 

1167 >>> mock_settings.ssrf_allow_localhost = True 

1168 >>> mock_settings.ssrf_allow_private_networks = True 

1169 >>> with patch('mcpgateway.common.validators.settings', mock_settings): 

1170 ... try: 

1171 ... SecurityValidator._validate_ssrf('169.254.169.254', 'URL') 

1172 ... except ValueError as e: 

1173 ... 'blocked by SSRF protection' in str(e) 

1174 True 

1175 

1176 Localhost (configurable): 

1177 

1178 >>> mock_settings.ssrf_allow_localhost = False 

1179 >>> with patch('mcpgateway.common.validators.settings', mock_settings): 

1180 ... try: 

1181 ... SecurityValidator._validate_ssrf('127.0.0.1', 'URL') 

1182 ... except ValueError as e: 

1183 ... 'localhost' in str(e).lower() 

1184 True 

1185 

1186 Public IPs (always allowed): 

1187 

1188 >>> mock_settings.ssrf_allow_localhost = True 

1189 >>> mock_settings.ssrf_allow_private_networks = True 

1190 >>> mock_settings.ssrf_allowed_networks = [] 

1191 >>> with patch('mcpgateway.common.validators.settings', mock_settings): 

1192 ... SecurityValidator._validate_ssrf('8.8.8.8', 'URL') # Should not raise 

1193 """ 

1194 # Normalize hostname: lowercase, strip trailing dots (DNS FQDN notation) 

1195 hostname_normalized = hostname.lower().rstrip(".") 

1196 

1197 # Check blocked hostnames (case-insensitive, normalized) 

1198 for blocked_host in settings.ssrf_blocked_hosts: 

1199 blocked_normalized = blocked_host.lower().rstrip(".") 

1200 if hostname_normalized == blocked_normalized: 

1201 raise ValueError(f"{field_name} contains blocked hostname '{hostname}' (SSRF protection)") 

1202 

1203 # Resolve hostname to IP for network-based checks 

1204 # Uses getaddrinfo to check ALL resolved addresses (A and AAAA records) 

1205 ip_addresses: list = [] 

1206 try: 

1207 # Try to parse as IP address directly 

1208 ip_addresses = [ipaddress.ip_address(hostname)] 

1209 except ValueError: 

1210 # It's a hostname, resolve ALL addresses (IPv4 and IPv6) 

1211 try: 

1212 # getaddrinfo returns all A/AAAA records 

1213 addr_info = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM) 

1214 for _, _, _, _, sockaddr in addr_info: 

1215 try: 

1216 ip_addresses.append(ipaddress.ip_address(sockaddr[0])) 

1217 except ValueError: 

1218 continue 

1219 except (socket.gaierror, socket.herror): 

1220 # DNS resolution failed 

1221 if settings.ssrf_dns_fail_closed: 

1222 raise ValueError(f"{field_name} DNS resolution failed and SSRF_DNS_FAIL_CLOSED is enabled") 

1223 # Fail open: allow through (hostname blocking above catches known dangerous hostnames) 

1224 return 

1225 

1226 if not ip_addresses: 

1227 if settings.ssrf_dns_fail_closed: 

1228 raise ValueError(f"{field_name} DNS resolution returned no addresses and SSRF_DNS_FAIL_CLOSED is enabled") 

1229 return 

1230 

1231 # Check ALL resolved addresses - if ANY is blocked, reject the request 

1232 for ip_addr in ip_addresses: 

1233 # Check against blocked networks (always blocked regardless of other settings) 

1234 for network_str in settings.ssrf_blocked_networks: 

1235 try: 

1236 network = ipaddress.ip_network(network_str, strict=False) 

1237 except ValueError: 

1238 # Invalid network in config - log and skip 

1239 logger.warning(f"Invalid CIDR in ssrf_blocked_networks: {network_str}") 

1240 continue 

1241 

1242 if ip_addr in network: 

1243 raise ValueError(f"{field_name} contains IP address blocked by SSRF protection (network: {network_str})") 

1244 

1245 # Check localhost/loopback (if not allowed) 

1246 if not settings.ssrf_allow_localhost: 

1247 if ip_addr.is_loopback or hostname_normalized in ("localhost", "localhost.localdomain"): 

1248 raise ValueError(f"{field_name} contains localhost address which is blocked by SSRF protection") 

1249 

1250 # Check private networks (if not allowed) 

1251 if not settings.ssrf_allow_private_networks: 

1252 if ip_addr.is_private and not ip_addr.is_loopback: 

1253 allowed_private = False 

1254 allowed_networks = getattr(settings, "ssrf_allowed_networks", []) or [] 

1255 for network_str in allowed_networks: 

1256 try: 

1257 network = ipaddress.ip_network(network_str, strict=False) 

1258 except ValueError: 

1259 logger.warning(f"Invalid CIDR in ssrf_allowed_networks: {network_str}") 

1260 continue 

1261 if ip_addr in network: 

1262 allowed_private = True 

1263 break 

1264 

1265 if not allowed_private: 

1266 raise ValueError(f"{field_name} contains private network address which is blocked by SSRF protection") 

1267 

1268 @classmethod 

1269 def validate_no_xss(cls, value: str, field_name: str) -> None: 

1270 """ 

1271 Validate that a string does not contain XSS patterns. 

1272 

1273 Args: 

1274 value (str): Value to validate. 

1275 field_name (str): Name of the field being validated. 

1276 

1277 Raises: 

1278 ValueError: If the value contains XSS patterns. 

1279 

1280 Examples: 

1281 Safe strings pass validation: 

1282 

1283 >>> SecurityValidator.validate_no_xss('Hello World', 'test_field') 

1284 >>> SecurityValidator.validate_no_xss('User: admin@example.com', 'email') 

1285 >>> SecurityValidator.validate_no_xss('Price: $10.99', 'price') 

1286 

1287 Empty/None strings are considered safe: 

1288 

1289 >>> SecurityValidator.validate_no_xss('', 'empty_field') 

1290 >>> SecurityValidator.validate_no_xss(None, 'none_field') #doctest: +SKIP 

1291 

1292 Dangerous HTML tags trigger validation errors: 

1293 

1294 >>> SecurityValidator.validate_no_xss('<script>alert(1)</script>', 'test_field') 

1295 Traceback (most recent call last): 

1296 ... 

1297 ValueError: test_field contains HTML tags that may cause security issues 

1298 >>> SecurityValidator.validate_no_xss('<iframe src="evil.com"></iframe>', 'content') 

1299 Traceback (most recent call last): 

1300 ... 

1301 ValueError: content contains HTML tags that may cause security issues 

1302 >>> SecurityValidator.validate_no_xss('<object data="malware.swf"></object>', 'data') 

1303 Traceback (most recent call last): 

1304 ... 

1305 ValueError: data contains HTML tags that may cause security issues 

1306 >>> SecurityValidator.validate_no_xss('<embed src="evil.swf">', 'embed') 

1307 Traceback (most recent call last): 

1308 ... 

1309 ValueError: embed contains HTML tags that may cause security issues 

1310 >>> SecurityValidator.validate_no_xss('<link rel="stylesheet" href="evil.css">', 'style') 

1311 Traceback (most recent call last): 

1312 ... 

1313 ValueError: style contains HTML tags that may cause security issues 

1314 >>> SecurityValidator.validate_no_xss('<meta http-equiv="refresh" content="0;url=evil.com">', 'meta') 

1315 Traceback (most recent call last): 

1316 ... 

1317 ValueError: meta contains HTML tags that may cause security issues 

1318 >>> SecurityValidator.validate_no_xss('<base href="http://evil.com">', 'base') 

1319 Traceback (most recent call last): 

1320 ... 

1321 ValueError: base contains HTML tags that may cause security issues 

1322 >>> SecurityValidator.validate_no_xss('<form action="evil.php">', 'form') 

1323 Traceback (most recent call last): 

1324 ... 

1325 ValueError: form contains HTML tags that may cause security issues 

1326 >>> SecurityValidator.validate_no_xss('<img src="x" onerror="alert(1)">', 'image') 

1327 Traceback (most recent call last): 

1328 ... 

1329 ValueError: image contains HTML tags that may cause security issues 

1330 >>> SecurityValidator.validate_no_xss('<svg onload="alert(1)"></svg>', 'svg') 

1331 Traceback (most recent call last): 

1332 ... 

1333 ValueError: svg contains HTML tags that may cause security issues 

1334 >>> SecurityValidator.validate_no_xss('<video src="x" onerror="alert(1)"></video>', 'video') 

1335 Traceback (most recent call last): 

1336 ... 

1337 ValueError: video contains HTML tags that may cause security issues 

1338 >>> SecurityValidator.validate_no_xss('<audio src="x" onerror="alert(1)"></audio>', 'audio') 

1339 Traceback (most recent call last): 

1340 ... 

1341 ValueError: audio contains HTML tags that may cause security issues 

1342 """ 

1343 if not value: 

1344 return # Empty values are considered safe 

1345 # Check for dangerous HTML tags 

1346 if re.search(cls.DANGEROUS_HTML_PATTERN, value, re.IGNORECASE): 

1347 raise ValueError(f"{field_name} contains HTML tags that may cause security issues") 

1348 

1349 @classmethod 

1350 def validate_json_depth( 

1351 cls, 

1352 obj: object, 

1353 max_depth: int | None = None, 

1354 current_depth: int = 0, 

1355 ) -> None: 

1356 """Validate that a JSON‑like structure does not exceed a depth limit. 

1357 

1358 A *depth* is counted **only** when we enter a container (`dict` or 

1359 `list`). Primitive values (`str`, `int`, `bool`, `None`, etc.) do not 

1360 increase the depth, but an *empty* container still counts as one level. 

1361 

1362 Args: 

1363 obj: Any Python object to inspect recursively. 

1364 max_depth: Maximum allowed depth (defaults to 

1365 :pyattr:`SecurityValidator.MAX_JSON_DEPTH`). 

1366 current_depth: Internal recursion counter. **Do not** set this 

1367 from user code. 

1368 

1369 Raises: 

1370 ValueError: If the nesting level exceeds *max_depth*. 

1371 

1372 Examples: 

1373 Simple flat dictionary – depth 1: :: 

1374 

1375 >>> SecurityValidator.validate_json_depth({'name': 'Alice'}) 

1376 

1377 Nested dict – depth 2: :: 

1378 

1379 >>> SecurityValidator.validate_json_depth( 

1380 ... {'user': {'name': 'Alice'}} 

1381 ... ) 

1382 

1383 Mixed dict/list – depth 3: :: 

1384 

1385 >>> SecurityValidator.validate_json_depth( 

1386 ... {'users': [{'name': 'Alice', 'meta': {'age': 30}}]} 

1387 ... ) 

1388 

1389 At 10 levels of nesting – allowed: :: 

1390 

1391 >>> deep_10 = {'1': {'2': {'3': {'4': {'5': {'6': {'7': {'8': 

1392 ... {'9': {'10': 'end'}}}}}}}}}} 

1393 >>> SecurityValidator.validate_json_depth(deep_10) 

1394 

1395 At new default limit (30) – allowed: :: 

1396 

1397 >>> deep_30 = {'1': {'2': {'3': {'4': {'5': {'6': {'7': {'8': 

1398 ... {'9': {'10': {'11': {'12': {'13': {'14': {'15': {'16': 

1399 ... {'17': {'18': {'19': {'20': {'21': {'22': {'23': {'24': 

1400 ... {'25': {'26': {'27': {'28': {'29': {'30': 'end'}}}}}}}}}}}}}}}}}}}}}}}}}}}}}} 

1401 >>> SecurityValidator.validate_json_depth(deep_30) 

1402 

1403 One level deeper – rejected: :: 

1404 

1405 >>> deep_31 = {'1': {'2': {'3': {'4': {'5': {'6': {'7': {'8': 

1406 ... {'9': {'10': {'11': {'12': {'13': {'14': {'15': {'16': 

1407 ... {'17': {'18': {'19': {'20': {'21': {'22': {'23': {'24': 

1408 ... {'25': {'26': {'27': {'28': {'29': {'30': {'31': 'end'}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}} 

1409 >>> SecurityValidator.validate_json_depth(deep_31) 

1410 Traceback (most recent call last): 

1411 ... 

1412 ValueError: JSON structure exceeds maximum depth of 30 

1413 """ 

1414 if max_depth is None: 

1415 max_depth = cls.MAX_JSON_DEPTH 

1416 

1417 # Only containers count toward depth; primitives are ignored 

1418 if not isinstance(obj, (dict, list)): 

1419 return 

1420 

1421 next_depth = current_depth + 1 

1422 if next_depth > max_depth: 

1423 raise ValueError(f"JSON structure exceeds maximum depth of {max_depth}") 

1424 

1425 if isinstance(obj, dict): 

1426 for value in obj.values(): 

1427 cls.validate_json_depth(value, max_depth, next_depth) 

1428 else: # obj is a list 

1429 for item in obj: 

1430 cls.validate_json_depth(item, max_depth, next_depth) 

1431 

1432 @classmethod 

1433 def validate_mime_type(cls, value: str) -> str: 

1434 """Validate MIME type format 

1435 

1436 Args: 

1437 value (str): Value to validate 

1438 

1439 Returns: 

1440 str: Value if acceptable 

1441 

1442 Raises: 

1443 ValueError: When input is not acceptable 

1444 

1445 Examples: 

1446 Empty/None handling: 

1447 

1448 >>> SecurityValidator.validate_mime_type('') 

1449 '' 

1450 >>> SecurityValidator.validate_mime_type(None) #doctest: +SKIP 

1451 

1452 Valid standard MIME types: 

1453 

1454 >>> SecurityValidator.validate_mime_type('text/plain') 

1455 'text/plain' 

1456 >>> SecurityValidator.validate_mime_type('application/json') 

1457 'application/json' 

1458 >>> SecurityValidator.validate_mime_type('image/jpeg') 

1459 'image/jpeg' 

1460 >>> SecurityValidator.validate_mime_type('text/html') 

1461 'text/html' 

1462 >>> SecurityValidator.validate_mime_type('application/pdf') 

1463 'application/pdf' 

1464 

1465 Valid vendor-specific MIME types: 

1466 

1467 >>> SecurityValidator.validate_mime_type('application/x-custom') 

1468 'application/x-custom' 

1469 >>> SecurityValidator.validate_mime_type('text/x-log') 

1470 'text/x-log' 

1471 

1472 Valid MIME types with suffixes: 

1473 

1474 >>> SecurityValidator.validate_mime_type('application/vnd.api+json') 

1475 'application/vnd.api+json' 

1476 >>> SecurityValidator.validate_mime_type('image/svg+xml') 

1477 'image/svg+xml' 

1478 

1479 Invalid MIME type formats: 

1480 

1481 >>> SecurityValidator.validate_mime_type('invalid') 

1482 Traceback (most recent call last): 

1483 ... 

1484 ValueError: Invalid MIME type format 

1485 >>> SecurityValidator.validate_mime_type('text/') 

1486 Traceback (most recent call last): 

1487 ... 

1488 ValueError: Invalid MIME type format 

1489 >>> SecurityValidator.validate_mime_type('/plain') 

1490 Traceback (most recent call last): 

1491 ... 

1492 ValueError: Invalid MIME type format 

1493 >>> SecurityValidator.validate_mime_type('text//plain') 

1494 Traceback (most recent call last): 

1495 ... 

1496 ValueError: Invalid MIME type format 

1497 >>> SecurityValidator.validate_mime_type('text/plain/extra') 

1498 Traceback (most recent call last): 

1499 ... 

1500 ValueError: Invalid MIME type format 

1501 >>> SecurityValidator.validate_mime_type('text plain') 

1502 Traceback (most recent call last): 

1503 ... 

1504 ValueError: Invalid MIME type format 

1505 >>> SecurityValidator.validate_mime_type('<text/plain>') 

1506 Traceback (most recent call last): 

1507 ... 

1508 ValueError: Invalid MIME type format 

1509 

1510 Disallowed MIME types (not in whitelist - line 620): 

1511 

1512 >>> try: 

1513 ... SecurityValidator.validate_mime_type('application/evil') 

1514 ... except ValueError as e: 

1515 ... 'not in the allowed list' in str(e) 

1516 True 

1517 >>> try: 

1518 ... SecurityValidator.validate_mime_type('text/evil') 

1519 ... except ValueError as e: 

1520 ... 'not in the allowed list' in str(e) 

1521 True 

1522 

1523 Test MIME type with parameters (line 618): 

1524 

1525 >>> try: 

1526 ... SecurityValidator.validate_mime_type('application/evil; charset=utf-8') 

1527 ... except ValueError as e: 

1528 ... 'Invalid MIME type format' in str(e) 

1529 True 

1530 """ 

1531 if not value: 

1532 return value 

1533 

1534 # Basic MIME type pattern (uses precompiled regex) 

1535 if not _MIME_TYPE_RE.match(value): 

1536 raise ValueError("Invalid MIME type format") 

1537 

1538 # Common safe MIME types 

1539 safe_mime_types = settings.validation_allowed_mime_types 

1540 if value not in safe_mime_types: 

1541 # Allow x- vendor types and + suffixes 

1542 base_type = value.split(";", maxsplit=1)[0].strip() 

1543 if not (base_type.startswith("application/x-") or base_type.startswith("text/x-") or "+" in base_type): 

1544 raise ValueError(f"MIME type '{value}' is not in the allowed list") 

1545 

1546 return value 

1547 

1548 @classmethod 

1549 def validate_shell_parameter(cls, value: str) -> str: 

1550 """Validate and escape shell parameters to prevent command injection. 

1551 

1552 Args: 

1553 value (str): Shell parameter to validate 

1554 

1555 Returns: 

1556 str: Validated/escaped parameter 

1557 

1558 Raises: 

1559 ValueError: If parameter contains dangerous characters in strict mode 

1560 

1561 Examples: 

1562 >>> SecurityValidator.validate_shell_parameter('safe_param') 

1563 'safe_param' 

1564 >>> SecurityValidator.validate_shell_parameter('param with spaces') 

1565 'param with spaces' 

1566 """ 

1567 if not isinstance(value, str): 

1568 raise ValueError("Parameter must be string") 

1569 

1570 # Check for dangerous patterns (uses precompiled regex) 

1571 if _SHELL_DANGEROUS_CHARS_RE.search(value): 

1572 # Check if validation is strict 

1573 strict_mode = getattr(settings, "validation_strict", True) 

1574 if strict_mode: 

1575 raise ValueError("Parameter contains shell metacharacters") 

1576 # In non-strict mode, escape using shlex 

1577 return shlex.quote(value) 

1578 

1579 return value 

1580 

1581 @classmethod 

1582 def validate_path(cls, path: str, allowed_roots: Optional[List[str]] = None) -> str: 

1583 """Validate and normalize file paths to prevent directory traversal. 

1584 

1585 Args: 

1586 path (str): File path to validate 

1587 allowed_roots (Optional[List[str]]): List of allowed root directories 

1588 

1589 Returns: 

1590 str: Validated and normalized path 

1591 

1592 Raises: 

1593 ValueError: If path contains traversal attempts or is outside allowed roots 

1594 

1595 Examples: 

1596 >>> SecurityValidator.validate_path('/safe/path') 

1597 '/safe/path' 

1598 >>> SecurityValidator.validate_path('http://example.com/file') 

1599 'http://example.com/file' 

1600 """ 

1601 if not isinstance(path, str): 

1602 raise ValueError("Path must be string") 

1603 

1604 # Skip validation for URI schemes (http://, plugin://, etc.) (uses precompiled regex) 

1605 if _URI_SCHEME_RE.match(path): 

1606 return path 

1607 

1608 try: 

1609 p = Path(path) 

1610 # Check for path traversal 

1611 if ".." in p.parts: 

1612 raise ValueError("Path traversal detected") 

1613 

1614 resolved_path = p.resolve() 

1615 

1616 # Check against allowed roots 

1617 if allowed_roots: 

1618 allowed = any(str(resolved_path).startswith(str(Path(root).resolve())) for root in allowed_roots) 

1619 if not allowed: 

1620 raise ValueError("Path outside allowed roots") 

1621 

1622 return str(resolved_path) 

1623 except (OSError, ValueError) as e: 

1624 raise ValueError(f"Invalid path: {e}") 

1625 

1626 @classmethod 

1627 def validate_sql_parameter(cls, value: str) -> str: 

1628 """Validate SQL parameters to prevent SQL injection attacks. 

1629 

1630 Args: 

1631 value (str): SQL parameter to validate 

1632 

1633 Returns: 

1634 str: Validated/escaped parameter 

1635 

1636 Raises: 

1637 ValueError: If parameter contains SQL injection patterns in strict mode 

1638 

1639 Examples: 

1640 >>> SecurityValidator.validate_sql_parameter('safe_value') 

1641 'safe_value' 

1642 >>> SecurityValidator.validate_sql_parameter('123') 

1643 '123' 

1644 """ 

1645 if not isinstance(value, str): 

1646 return value 

1647 

1648 # Check for SQL injection patterns (uses precompiled regex list) 

1649 for pattern in _SQL_PATTERNS: 

1650 if pattern.search(value): 

1651 if getattr(settings, "validation_strict", True): 

1652 raise ValueError("Parameter contains SQL injection patterns") 

1653 # Basic escaping 

1654 value = value.replace("'", "''").replace('"', '""') 

1655 

1656 return value 

1657 

1658 @classmethod 

1659 def validate_parameter_length(cls, value: str, max_length: Optional[int] = None) -> str: 

1660 """Validate parameter length against configured limits. 

1661 

1662 Args: 

1663 value (str): Parameter to validate 

1664 max_length (int): Maximum allowed length 

1665 

1666 Returns: 

1667 str: Parameter if within length limits 

1668 

1669 Raises: 

1670 ValueError: If parameter exceeds maximum length 

1671 

1672 Examples: 

1673 >>> SecurityValidator.validate_parameter_length('short', 10) 

1674 'short' 

1675 """ 

1676 max_len = max_length or getattr(settings, "max_param_length", 10000) 

1677 if len(value) > max_len: 

1678 raise ValueError(f"Parameter exceeds maximum length of {max_len}") 

1679 return value 

1680 

1681 @classmethod 

1682 def sanitize_text(cls, text: str) -> str: 

1683 """Remove control characters and ANSI escape sequences from text. 

1684 

1685 Args: 

1686 text (str): Text to sanitize 

1687 

1688 Returns: 

1689 str: Sanitized text with control characters removed 

1690 

1691 Examples: 

1692 >>> SecurityValidator.sanitize_text('Hello World') 

1693 'Hello World' 

1694 >>> SecurityValidator.sanitize_text('Text\x1b[31mwith\x1b[0mcolors') 

1695 'Textwithcolors' 

1696 """ 

1697 if not isinstance(text, str): 

1698 return text 

1699 

1700 # Remove ANSI escape sequences (uses precompiled regex) 

1701 text = _ANSI_ESCAPE_RE.sub("", text) 

1702 # Remove control characters except newlines and tabs (uses precompiled regex) 

1703 sanitized = _CONTROL_CHARS_RE.sub("", text) 

1704 return sanitized 

1705 

1706 @classmethod 

1707 def sanitize_json_response(cls, data: Any) -> Any: 

1708 """Recursively sanitize JSON response data by removing control characters. 

1709 

1710 Args: 

1711 data (Any): JSON data structure to sanitize 

1712 

1713 Returns: 

1714 Any: Sanitized data structure with same type as input 

1715 

1716 Examples: 

1717 >>> SecurityValidator.sanitize_json_response('clean text') 

1718 'clean text' 

1719 >>> SecurityValidator.sanitize_json_response({'key': 'value'}) 

1720 {'key': 'value'} 

1721 >>> SecurityValidator.sanitize_json_response(['item1', 'item2']) 

1722 ['item1', 'item2'] 

1723 """ 

1724 if isinstance(data, str): 

1725 return cls.sanitize_text(data) 

1726 if isinstance(data, dict): 

1727 return {k: cls.sanitize_json_response(v) for k, v in data.items()} 

1728 if isinstance(data, list): 

1729 return [cls.sanitize_json_response(item) for item in data] 

1730 return data 

1731 

1732 

1733def validate_core_url(value: str, field_name: str = "URL") -> str: 

1734 """Core ContextForge URL validation entry point. 

1735 

1736 This wrapper provides an explicit core-only entry point so the core 

1737 processing path does not depend on plugin-framework validators. 

1738 

1739 Args: 

1740 value: The URL string to validate. 

1741 field_name: Descriptive name for error messages. 

1742 

1743 Returns: 

1744 The validated URL string. 

1745 """ 

1746 return SecurityValidator.validate_url(value, field_name)