Coverage for mcpgateway / utils / trace_redaction.py: 92%
153 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Trace payload redaction and bounded serialization helpers."""
4# Standard
5import json
6import re
7from typing import Any
9# First-Party
10from mcpgateway.config import get_settings
11from mcpgateway.utils.url_auth import sanitize_exception_message, sanitize_url_for_logging
13_DEFAULT_REDACT_FIELDS = ",".join(
14 [
15 "password",
16 "secret",
17 "token",
18 "api_key",
19 "authorization",
20 "credential",
21 "auth_value",
22 "access_token",
23 "refresh_token",
24 "auth_token",
25 "client_secret",
26 "cookie",
27 "set-cookie",
28 "private_key",
29 "session_id",
30 "sessionid",
31 ]
32)
33_DEFAULT_MAX_PAYLOAD_SIZE = 32768
35_CONFIG_LOADED = False
36_REDACT_FIELDS: set[str] = set()
37_MAX_PAYLOAD_SIZE = _DEFAULT_MAX_PAYLOAD_SIZE
38_INPUT_CAPTURE_SPANS: set[str] = set()
39_OUTPUT_CAPTURE_SPANS: set[str] = set()
40_TEXT_REDACT_PATTERNS: list[tuple[re.Pattern[str], re.Pattern[str]]] = []
43def _normalize_field_name(value: str) -> str:
44 """Normalize a field name for loose matching across key styles.
46 Args:
47 value: Raw field name to normalize.
49 Returns:
50 Lowercase alphanumeric field name used for loose redaction matching.
51 """
52 return re.sub(r"[^a-z0-9]", "", value.lower())
55def _coerce_int(value: str, default: int) -> int:
56 """Coerce an integer env var with a sane minimum.
58 Args:
59 value: Raw environment variable value.
60 default: Fallback value to use when parsing fails.
62 Returns:
63 Parsed integer constrained to the configured minimum, or ``default`` on failure.
64 """
65 try:
66 return max(256, int(value))
67 except (TypeError, ValueError):
68 return default
71def _load_config() -> None:
72 """Load redaction and output-capture configuration from the environment."""
73 global _CONFIG_LOADED, _INPUT_CAPTURE_SPANS, _MAX_PAYLOAD_SIZE, _OUTPUT_CAPTURE_SPANS, _REDACT_FIELDS, _TEXT_REDACT_PATTERNS # pylint: disable=global-statement
75 settings = get_settings()
76 fields = settings.otel_redact_fields or _DEFAULT_REDACT_FIELDS
77 raw_fields = [field.strip() for field in fields.split(",") if field.strip()]
78 _REDACT_FIELDS = {_normalize_field_name(field) for field in raw_fields}
79 _MAX_PAYLOAD_SIZE = _coerce_int(str(settings.otel_max_trace_payload_size), _DEFAULT_MAX_PAYLOAD_SIZE)
80 _INPUT_CAPTURE_SPANS = {span.strip() for span in settings.otel_capture_input_spans.split(",") if span.strip()}
81 _OUTPUT_CAPTURE_SPANS = {span.strip() for span in settings.otel_capture_output_spans.split(",") if span.strip()}
82 _TEXT_REDACT_PATTERNS = [_build_text_redaction_patterns(field) for field in raw_fields]
83 _CONFIG_LOADED = True
86def reload_trace_redaction_config() -> None:
87 """Reload trace redaction configuration from the current environment."""
88 global _CONFIG_LOADED # pylint: disable=global-statement
89 get_settings.cache_clear()
90 _CONFIG_LOADED = False
91 _load_config()
94def _ensure_loaded() -> None:
95 """Load configuration on first use."""
96 if not _CONFIG_LOADED:
97 _load_config()
100def redact_sensitive_fields(data: Any) -> Any:
101 """Recursively redact sensitive values in structured or scalar payloads.
103 Args:
104 data: Arbitrary payload to redact.
106 Returns:
107 Redacted payload preserving the original container structure where possible.
108 """
109 _ensure_loaded()
110 data = _prepare_for_json(data)
112 if isinstance(data, dict):
113 redacted: dict[Any, Any] = {}
114 for key, value in data.items():
115 normalized_key = _normalize_field_name(str(key))
116 if normalized_key in _REDACT_FIELDS:
117 redacted[key] = "***"
118 else:
119 redacted[key] = _sanitize_trace_value(str(key), value)
120 return redacted
122 if isinstance(data, list):
123 return [_sanitize_trace_value("item", item) for item in data]
125 if isinstance(data, tuple):
126 return tuple(_sanitize_trace_value("item", item) for item in data)
128 if isinstance(data, str):
129 return sanitize_trace_text(data)
131 return data
134def _field_looks_like_url(field_name: str) -> bool:
135 """Return whether a normalized field name likely carries a URL or URI.
137 Args:
138 field_name: Candidate field name.
140 Returns:
141 ``True`` when the field likely carries a URL, URI, or endpoint value.
142 """
143 normalized = _normalize_field_name(field_name)
144 return normalized.endswith("url") or normalized.endswith("uri") or normalized.endswith("endpoint")
147def _sanitize_trace_value(field_name: str, value: Any) -> Any:
148 """Sanitize a trace value using field-name context.
150 Args:
151 field_name: Field name associated with the value.
152 value: Value to sanitize.
154 Returns:
155 Sanitized value with recursive redaction applied where appropriate.
156 """
157 prepared = _prepare_for_json(value)
159 if isinstance(prepared, dict):
160 return redact_sensitive_fields(prepared)
162 if isinstance(prepared, list):
163 return [_sanitize_trace_value(field_name, item) for item in prepared]
165 if isinstance(prepared, tuple):
166 return tuple(_sanitize_trace_value(field_name, item) for item in prepared)
168 if isinstance(prepared, str):
169 if _field_looks_like_url(field_name):
170 return sanitize_url_for_logging(prepared)
171 return sanitize_trace_text(prepared)
173 return prepared
176def _field_name_text_pattern(field_name: str) -> str:
177 """Build a permissive regex for matching a field name in free text.
179 Args:
180 field_name: Configured field name, potentially containing separators.
182 Returns:
183 Regex snippet that tolerates separator variations such as ``_`` or ``-``.
184 """
185 parts = [re.escape(part) for part in re.split(r"[^A-Za-z0-9]+", field_name) if part]
186 if not parts:
187 return re.escape(field_name)
188 return r"[\W_]*".join(parts)
191def _build_text_redaction_patterns(field_name: str) -> tuple[re.Pattern[str], re.Pattern[str]]:
192 """Build regexes that redact free-text ``key=value`` and ``key:"value"`` secrets.
194 Args:
195 field_name: Configured sensitive field name.
197 Returns:
198 Tuple of quoted-value and bare-value regex patterns.
199 """
200 key_pattern = _field_name_text_pattern(field_name)
201 quoted_pattern = re.compile(rf"(?i)(\b{key_pattern}\b\s*(?:=|:)\s*['\"])([^'\"]*)(['\"])", re.IGNORECASE)
202 # Include & as a terminator for URL query parameters
203 bare_pattern = re.compile(rf"(?i)(\b{key_pattern}\b\s*(?:=|:)\s*)(?!['\"])(?!REDACTED\b)(?!\*\*\*)([^\s,;&]+)", re.IGNORECASE)
204 return quoted_pattern, bare_pattern
207def sanitize_trace_text(text: str) -> str:
208 """Sanitize free-text trace content such as exception messages.
210 This redacts embedded URLs with sensitive query parameters, common
211 ``key=value`` or ``key: value`` secret patterns derived from
212 ``OTEL_REDACT_FIELDS``, and standalone bearer/basic credentials.
214 Args:
215 text: Raw free-text value.
217 Returns:
218 Sanitized text safe to attach to trace metadata.
219 """
220 _ensure_loaded()
221 sanitized = sanitize_exception_message(text)
222 sanitized = re.sub(r"(?i)\b(Bearer|Basic)\s+[A-Za-z0-9._~+/=-]+(?=$|[\s,;\x27\x22])", r"\1 ***", sanitized)
224 for quoted_pattern, bare_pattern in _TEXT_REDACT_PATTERNS:
225 sanitized = quoted_pattern.sub(r"\1***\3", sanitized)
226 sanitized = bare_pattern.sub(r"\1***", sanitized)
228 sanitized = re.sub(r"\*\*\*(?:\s+\*\*\*)+", "***", sanitized)
229 return sanitized
232def sanitize_trace_attribute_value(attribute_name: str, value: Any) -> Any:
233 """Sanitize a scalar or structured span attribute before export.
235 Args:
236 attribute_name: Span attribute key.
237 value: Attribute value.
239 Returns:
240 Sanitized attribute value.
241 """
242 _ensure_loaded()
244 normalized_key = _normalize_field_name(attribute_name)
245 if normalized_key in _REDACT_FIELDS:
246 return "***"
248 return _sanitize_trace_value(attribute_name, value)
251def is_input_capture_enabled(span_name: str) -> bool:
252 """Return whether input capture is enabled for the given span name.
254 Args:
255 span_name: Span name to check against the configured allowlist.
257 Returns:
258 ``True`` when input capture is enabled for the span.
259 """
260 _ensure_loaded()
261 return span_name in _INPUT_CAPTURE_SPANS
264def is_output_capture_enabled(span_name: str) -> bool:
265 """Return whether output capture is enabled for the given span name.
267 Args:
268 span_name: Span name to check against the configured allowlist.
270 Returns:
271 ``True`` when output capture is enabled for the span.
272 """
273 _ensure_loaded()
274 return span_name in _OUTPUT_CAPTURE_SPANS
277def _prepare_for_json(value: Any) -> Any:
278 """Convert Pydantic-like objects to JSON-ready data when possible.
280 Args:
281 value: Arbitrary object that may support ``model_dump``.
283 Returns:
284 JSON-ready representation of ``value`` when conversion is available, otherwise the original object.
285 """
286 if hasattr(value, "model_dump") and callable(value.model_dump):
287 return value.model_dump(mode="json", by_alias=True)
288 return value
291def _iterencode_preview(value: Any, max_size: int) -> tuple[str, bool, int]:
292 """Serialize JSON incrementally while keeping only a bounded preview.
294 Args:
295 value: JSON-serializable value to encode.
296 max_size: Maximum preview size to retain while encoding.
298 Returns:
299 Tuple of preview text, truncation flag, and full serialized size.
300 """
301 encoder = json.JSONEncoder(ensure_ascii=False, default=str, separators=(",", ":"))
302 preview_chunks: list[str] = []
303 preview_size = 0
304 total_size = 0
305 truncated = False
307 for chunk in encoder.iterencode(value):
308 chunk_length = len(chunk)
309 remaining = max_size - preview_size
310 total_size += chunk_length
311 if preview_size < max_size:
312 preview_chunks.append(chunk[:remaining])
313 preview_size += min(chunk_length, remaining)
314 if total_size > max_size:
315 truncated = True
317 return "".join(preview_chunks), truncated, total_size
320def _bounded_truncation_wrapper(preview: str, total_size: int, max_size: int) -> str:
321 """Wrap a truncated preview in valid JSON that fits within the size budget.
323 Args:
324 preview: Truncated serialized preview content.
325 total_size: Size of the original full serialized payload.
326 max_size: Maximum number of characters allowed for the wrapped payload.
328 Returns:
329 Valid JSON string describing the truncation while fitting within ``max_size``.
330 """
331 payload = {"_truncated": True, "_original_size": total_size, "_preview": preview}
332 wrapped = json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
334 while len(wrapped) > max_size and payload["_preview"]:
335 overflow = len(wrapped) - max_size
336 payload["_preview"] = payload["_preview"][: max(0, len(payload["_preview"]) - overflow - 1)]
337 wrapped = json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
339 if len(wrapped) <= max_size:
340 return wrapped
342 minimal = json.dumps({"_truncated": True}, ensure_ascii=False, separators=(",", ":"))
343 if len(minimal) <= max_size:
344 return minimal
346 return minimal[:max_size]
349def safe_serialize(obj: Any, max_size: int = 0) -> str:
350 """Serialize a trace payload to bounded JSON.
352 Args:
353 obj: Arbitrary payload to serialize.
354 max_size: Optional maximum serialized size. When zero, the configured default is used.
356 Returns:
357 JSON string representation of the payload, truncated safely when necessary.
358 """
359 _ensure_loaded()
360 effective_max_size = max_size or _MAX_PAYLOAD_SIZE
362 try:
363 prepared = _prepare_for_json(obj)
365 if isinstance(prepared, (dict, list, tuple)):
366 preview, truncated, total_size = _iterencode_preview(prepared, effective_max_size)
367 if not truncated:
368 return preview
369 return _bounded_truncation_wrapper(preview, total_size, effective_max_size)
371 scalar_preview, truncated, total_size = _iterencode_preview(prepared, effective_max_size)
372 if not truncated:
373 return scalar_preview
374 return _bounded_truncation_wrapper(scalar_preview, total_size, effective_max_size)
375 except Exception:
376 return json.dumps({"_error": "serialization_failed"}, ensure_ascii=False, separators=(",", ":"))
379def serialize_trace_payload(obj: Any, max_size: int = 0) -> str:
380 """Redact and serialize a trace payload to bounded JSON.
382 Args:
383 obj: Arbitrary payload to sanitize and serialize.
384 max_size: Optional maximum serialized size. When zero, the configured default is used.
386 Returns:
387 JSON string representation of the sanitized payload.
388 """
389 return safe_serialize(redact_sensitive_fields(obj), max_size=max_size)