Coverage for mcpgateway / utils / trace_redaction.py: 92%

153 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Trace payload redaction and bounded serialization helpers.""" 

3 

4# Standard 

5import json 

6import re 

7from typing import Any 

8 

9# First-Party 

10from mcpgateway.config import get_settings 

11from mcpgateway.utils.url_auth import sanitize_exception_message, sanitize_url_for_logging 

12 

13_DEFAULT_REDACT_FIELDS = ",".join( 

14 [ 

15 "password", 

16 "secret", 

17 "token", 

18 "api_key", 

19 "authorization", 

20 "credential", 

21 "auth_value", 

22 "access_token", 

23 "refresh_token", 

24 "auth_token", 

25 "client_secret", 

26 "cookie", 

27 "set-cookie", 

28 "private_key", 

29 "session_id", 

30 "sessionid", 

31 ] 

32) 

33_DEFAULT_MAX_PAYLOAD_SIZE = 32768 

34 

35_CONFIG_LOADED = False 

36_REDACT_FIELDS: set[str] = set() 

37_MAX_PAYLOAD_SIZE = _DEFAULT_MAX_PAYLOAD_SIZE 

38_INPUT_CAPTURE_SPANS: set[str] = set() 

39_OUTPUT_CAPTURE_SPANS: set[str] = set() 

40_TEXT_REDACT_PATTERNS: list[tuple[re.Pattern[str], re.Pattern[str]]] = [] 

41 

42 

43def _normalize_field_name(value: str) -> str: 

44 """Normalize a field name for loose matching across key styles. 

45 

46 Args: 

47 value: Raw field name to normalize. 

48 

49 Returns: 

50 Lowercase alphanumeric field name used for loose redaction matching. 

51 """ 

52 return re.sub(r"[^a-z0-9]", "", value.lower()) 

53 

54 

55def _coerce_int(value: str, default: int) -> int: 

56 """Coerce an integer env var with a sane minimum. 

57 

58 Args: 

59 value: Raw environment variable value. 

60 default: Fallback value to use when parsing fails. 

61 

62 Returns: 

63 Parsed integer constrained to the configured minimum, or ``default`` on failure. 

64 """ 

65 try: 

66 return max(256, int(value)) 

67 except (TypeError, ValueError): 

68 return default 

69 

70 

71def _load_config() -> None: 

72 """Load redaction and output-capture configuration from the environment.""" 

73 global _CONFIG_LOADED, _INPUT_CAPTURE_SPANS, _MAX_PAYLOAD_SIZE, _OUTPUT_CAPTURE_SPANS, _REDACT_FIELDS, _TEXT_REDACT_PATTERNS # pylint: disable=global-statement 

74 

75 settings = get_settings() 

76 fields = settings.otel_redact_fields or _DEFAULT_REDACT_FIELDS 

77 raw_fields = [field.strip() for field in fields.split(",") if field.strip()] 

78 _REDACT_FIELDS = {_normalize_field_name(field) for field in raw_fields} 

79 _MAX_PAYLOAD_SIZE = _coerce_int(str(settings.otel_max_trace_payload_size), _DEFAULT_MAX_PAYLOAD_SIZE) 

80 _INPUT_CAPTURE_SPANS = {span.strip() for span in settings.otel_capture_input_spans.split(",") if span.strip()} 

81 _OUTPUT_CAPTURE_SPANS = {span.strip() for span in settings.otel_capture_output_spans.split(",") if span.strip()} 

82 _TEXT_REDACT_PATTERNS = [_build_text_redaction_patterns(field) for field in raw_fields] 

83 _CONFIG_LOADED = True 

84 

85 

86def reload_trace_redaction_config() -> None: 

87 """Reload trace redaction configuration from the current environment.""" 

88 global _CONFIG_LOADED # pylint: disable=global-statement 

89 get_settings.cache_clear() 

90 _CONFIG_LOADED = False 

91 _load_config() 

92 

93 

94def _ensure_loaded() -> None: 

95 """Load configuration on first use.""" 

96 if not _CONFIG_LOADED: 

97 _load_config() 

98 

99 

100def redact_sensitive_fields(data: Any) -> Any: 

101 """Recursively redact sensitive values in structured or scalar payloads. 

102 

103 Args: 

104 data: Arbitrary payload to redact. 

105 

106 Returns: 

107 Redacted payload preserving the original container structure where possible. 

108 """ 

109 _ensure_loaded() 

110 data = _prepare_for_json(data) 

111 

112 if isinstance(data, dict): 

113 redacted: dict[Any, Any] = {} 

114 for key, value in data.items(): 

115 normalized_key = _normalize_field_name(str(key)) 

116 if normalized_key in _REDACT_FIELDS: 

117 redacted[key] = "***" 

118 else: 

119 redacted[key] = _sanitize_trace_value(str(key), value) 

120 return redacted 

121 

122 if isinstance(data, list): 

123 return [_sanitize_trace_value("item", item) for item in data] 

124 

125 if isinstance(data, tuple): 

126 return tuple(_sanitize_trace_value("item", item) for item in data) 

127 

128 if isinstance(data, str): 

129 return sanitize_trace_text(data) 

130 

131 return data 

132 

133 

134def _field_looks_like_url(field_name: str) -> bool: 

135 """Return whether a normalized field name likely carries a URL or URI. 

136 

137 Args: 

138 field_name: Candidate field name. 

139 

140 Returns: 

141 ``True`` when the field likely carries a URL, URI, or endpoint value. 

142 """ 

143 normalized = _normalize_field_name(field_name) 

144 return normalized.endswith("url") or normalized.endswith("uri") or normalized.endswith("endpoint") 

145 

146 

147def _sanitize_trace_value(field_name: str, value: Any) -> Any: 

148 """Sanitize a trace value using field-name context. 

149 

150 Args: 

151 field_name: Field name associated with the value. 

152 value: Value to sanitize. 

153 

154 Returns: 

155 Sanitized value with recursive redaction applied where appropriate. 

156 """ 

157 prepared = _prepare_for_json(value) 

158 

159 if isinstance(prepared, dict): 

160 return redact_sensitive_fields(prepared) 

161 

162 if isinstance(prepared, list): 

163 return [_sanitize_trace_value(field_name, item) for item in prepared] 

164 

165 if isinstance(prepared, tuple): 

166 return tuple(_sanitize_trace_value(field_name, item) for item in prepared) 

167 

168 if isinstance(prepared, str): 

169 if _field_looks_like_url(field_name): 

170 return sanitize_url_for_logging(prepared) 

171 return sanitize_trace_text(prepared) 

172 

173 return prepared 

174 

175 

176def _field_name_text_pattern(field_name: str) -> str: 

177 """Build a permissive regex for matching a field name in free text. 

178 

179 Args: 

180 field_name: Configured field name, potentially containing separators. 

181 

182 Returns: 

183 Regex snippet that tolerates separator variations such as ``_`` or ``-``. 

184 """ 

185 parts = [re.escape(part) for part in re.split(r"[^A-Za-z0-9]+", field_name) if part] 

186 if not parts: 

187 return re.escape(field_name) 

188 return r"[\W_]*".join(parts) 

189 

190 

191def _build_text_redaction_patterns(field_name: str) -> tuple[re.Pattern[str], re.Pattern[str]]: 

192 """Build regexes that redact free-text ``key=value`` and ``key:"value"`` secrets. 

193 

194 Args: 

195 field_name: Configured sensitive field name. 

196 

197 Returns: 

198 Tuple of quoted-value and bare-value regex patterns. 

199 """ 

200 key_pattern = _field_name_text_pattern(field_name) 

201 quoted_pattern = re.compile(rf"(?i)(\b{key_pattern}\b\s*(?:=|:)\s*['\"])([^'\"]*)(['\"])", re.IGNORECASE) 

202 # Include & as a terminator for URL query parameters 

203 bare_pattern = re.compile(rf"(?i)(\b{key_pattern}\b\s*(?:=|:)\s*)(?!['\"])(?!REDACTED\b)(?!\*\*\*)([^\s,;&]+)", re.IGNORECASE) 

204 return quoted_pattern, bare_pattern 

205 

206 

207def sanitize_trace_text(text: str) -> str: 

208 """Sanitize free-text trace content such as exception messages. 

209 

210 This redacts embedded URLs with sensitive query parameters, common 

211 ``key=value`` or ``key: value`` secret patterns derived from 

212 ``OTEL_REDACT_FIELDS``, and standalone bearer/basic credentials. 

213 

214 Args: 

215 text: Raw free-text value. 

216 

217 Returns: 

218 Sanitized text safe to attach to trace metadata. 

219 """ 

220 _ensure_loaded() 

221 sanitized = sanitize_exception_message(text) 

222 sanitized = re.sub(r"(?i)\b(Bearer|Basic)\s+[A-Za-z0-9._~+/=-]+(?=$|[\s,;\x27\x22])", r"\1 ***", sanitized) 

223 

224 for quoted_pattern, bare_pattern in _TEXT_REDACT_PATTERNS: 

225 sanitized = quoted_pattern.sub(r"\1***\3", sanitized) 

226 sanitized = bare_pattern.sub(r"\1***", sanitized) 

227 

228 sanitized = re.sub(r"\*\*\*(?:\s+\*\*\*)+", "***", sanitized) 

229 return sanitized 

230 

231 

232def sanitize_trace_attribute_value(attribute_name: str, value: Any) -> Any: 

233 """Sanitize a scalar or structured span attribute before export. 

234 

235 Args: 

236 attribute_name: Span attribute key. 

237 value: Attribute value. 

238 

239 Returns: 

240 Sanitized attribute value. 

241 """ 

242 _ensure_loaded() 

243 

244 normalized_key = _normalize_field_name(attribute_name) 

245 if normalized_key in _REDACT_FIELDS: 

246 return "***" 

247 

248 return _sanitize_trace_value(attribute_name, value) 

249 

250 

251def is_input_capture_enabled(span_name: str) -> bool: 

252 """Return whether input capture is enabled for the given span name. 

253 

254 Args: 

255 span_name: Span name to check against the configured allowlist. 

256 

257 Returns: 

258 ``True`` when input capture is enabled for the span. 

259 """ 

260 _ensure_loaded() 

261 return span_name in _INPUT_CAPTURE_SPANS 

262 

263 

264def is_output_capture_enabled(span_name: str) -> bool: 

265 """Return whether output capture is enabled for the given span name. 

266 

267 Args: 

268 span_name: Span name to check against the configured allowlist. 

269 

270 Returns: 

271 ``True`` when output capture is enabled for the span. 

272 """ 

273 _ensure_loaded() 

274 return span_name in _OUTPUT_CAPTURE_SPANS 

275 

276 

277def _prepare_for_json(value: Any) -> Any: 

278 """Convert Pydantic-like objects to JSON-ready data when possible. 

279 

280 Args: 

281 value: Arbitrary object that may support ``model_dump``. 

282 

283 Returns: 

284 JSON-ready representation of ``value`` when conversion is available, otherwise the original object. 

285 """ 

286 if hasattr(value, "model_dump") and callable(value.model_dump): 

287 return value.model_dump(mode="json", by_alias=True) 

288 return value 

289 

290 

291def _iterencode_preview(value: Any, max_size: int) -> tuple[str, bool, int]: 

292 """Serialize JSON incrementally while keeping only a bounded preview. 

293 

294 Args: 

295 value: JSON-serializable value to encode. 

296 max_size: Maximum preview size to retain while encoding. 

297 

298 Returns: 

299 Tuple of preview text, truncation flag, and full serialized size. 

300 """ 

301 encoder = json.JSONEncoder(ensure_ascii=False, default=str, separators=(",", ":")) 

302 preview_chunks: list[str] = [] 

303 preview_size = 0 

304 total_size = 0 

305 truncated = False 

306 

307 for chunk in encoder.iterencode(value): 

308 chunk_length = len(chunk) 

309 remaining = max_size - preview_size 

310 total_size += chunk_length 

311 if preview_size < max_size: 

312 preview_chunks.append(chunk[:remaining]) 

313 preview_size += min(chunk_length, remaining) 

314 if total_size > max_size: 

315 truncated = True 

316 

317 return "".join(preview_chunks), truncated, total_size 

318 

319 

320def _bounded_truncation_wrapper(preview: str, total_size: int, max_size: int) -> str: 

321 """Wrap a truncated preview in valid JSON that fits within the size budget. 

322 

323 Args: 

324 preview: Truncated serialized preview content. 

325 total_size: Size of the original full serialized payload. 

326 max_size: Maximum number of characters allowed for the wrapped payload. 

327 

328 Returns: 

329 Valid JSON string describing the truncation while fitting within ``max_size``. 

330 """ 

331 payload = {"_truncated": True, "_original_size": total_size, "_preview": preview} 

332 wrapped = json.dumps(payload, ensure_ascii=False, separators=(",", ":")) 

333 

334 while len(wrapped) > max_size and payload["_preview"]: 

335 overflow = len(wrapped) - max_size 

336 payload["_preview"] = payload["_preview"][: max(0, len(payload["_preview"]) - overflow - 1)] 

337 wrapped = json.dumps(payload, ensure_ascii=False, separators=(",", ":")) 

338 

339 if len(wrapped) <= max_size: 

340 return wrapped 

341 

342 minimal = json.dumps({"_truncated": True}, ensure_ascii=False, separators=(",", ":")) 

343 if len(minimal) <= max_size: 

344 return minimal 

345 

346 return minimal[:max_size] 

347 

348 

349def safe_serialize(obj: Any, max_size: int = 0) -> str: 

350 """Serialize a trace payload to bounded JSON. 

351 

352 Args: 

353 obj: Arbitrary payload to serialize. 

354 max_size: Optional maximum serialized size. When zero, the configured default is used. 

355 

356 Returns: 

357 JSON string representation of the payload, truncated safely when necessary. 

358 """ 

359 _ensure_loaded() 

360 effective_max_size = max_size or _MAX_PAYLOAD_SIZE 

361 

362 try: 

363 prepared = _prepare_for_json(obj) 

364 

365 if isinstance(prepared, (dict, list, tuple)): 

366 preview, truncated, total_size = _iterencode_preview(prepared, effective_max_size) 

367 if not truncated: 

368 return preview 

369 return _bounded_truncation_wrapper(preview, total_size, effective_max_size) 

370 

371 scalar_preview, truncated, total_size = _iterencode_preview(prepared, effective_max_size) 

372 if not truncated: 

373 return scalar_preview 

374 return _bounded_truncation_wrapper(scalar_preview, total_size, effective_max_size) 

375 except Exception: 

376 return json.dumps({"_error": "serialization_failed"}, ensure_ascii=False, separators=(",", ":")) 

377 

378 

379def serialize_trace_payload(obj: Any, max_size: int = 0) -> str: 

380 """Redact and serialize a trace payload to bounded JSON. 

381 

382 Args: 

383 obj: Arbitrary payload to sanitize and serialize. 

384 max_size: Optional maximum serialized size. When zero, the configured default is used. 

385 

386 Returns: 

387 JSON string representation of the sanitized payload. 

388 """ 

389 return safe_serialize(redact_sensitive_fields(obj), max_size=max_size)