Coverage for mcpgateway / translate_header_utils.py: 100%
76 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Header processing utilities for dynamic environment injection in translate module.
4Location: ./mcpgateway/translate_header_utils.py
5Copyright 2025
6SPDX-License-Identifier: Apache-2.0
7Authors: Manav Gupta
9Header processing utilities for dynamic environment variable injection in mcpgateway.translate.
10"""
12# Standard
13import logging
14import re
15from typing import Dict, List
17logger = logging.getLogger(__name__)
19# Security constants
20ALLOWED_HEADERS_REGEX = re.compile(r"^[A-Za-z][A-Za-z0-9\-]*$")
21MAX_HEADER_VALUE_LENGTH = 4096
22MAX_ENV_VAR_NAME_LENGTH = 64
25class HeaderMappingError(Exception):
26 """Raised when header mapping configuration is invalid."""
29def validate_header_mapping(header_name: str, env_var_name: str) -> None:
30 """Validate header name and environment variable name.
32 Args:
33 header_name: HTTP header name
34 env_var_name: Environment variable name
36 Raises:
37 HeaderMappingError: If validation fails
39 Examples:
40 >>> # Valid mappings
41 >>> validate_header_mapping("Authorization", "AUTH_TOKEN")
42 >>> validate_header_mapping("X-Custom-Header", "CUSTOM_VAR")
43 >>>
44 >>> # Invalid header name
45 >>> try:
46 ... validate_header_mapping("Invalid Header!", "VAR")
47 ... except HeaderMappingError as e:
48 ... "Invalid header name" in str(e)
49 True
50 >>>
51 >>> # Invalid env var name
52 >>> try:
53 ... validate_header_mapping("Header", "123_VAR")
54 ... except HeaderMappingError as e:
55 ... "Invalid environment variable name" in str(e)
56 True
57 """
58 if not ALLOWED_HEADERS_REGEX.match(header_name):
59 raise HeaderMappingError(f"Invalid header name '{header_name}' - must contain only alphanumeric characters and hyphens")
61 if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", env_var_name):
62 raise HeaderMappingError(f"Invalid environment variable name '{env_var_name}' - must start with letter/underscore and contain only alphanumeric characters and underscores")
64 if len(env_var_name) > MAX_ENV_VAR_NAME_LENGTH:
65 raise HeaderMappingError(f"Environment variable name too long: {env_var_name}")
68def sanitize_header_value(value: str, max_length: int = MAX_HEADER_VALUE_LENGTH) -> str:
69 """Sanitize header value for environment variable injection.
71 Args:
72 value: Raw header value
73 max_length: Maximum allowed length for the value
75 Returns:
76 Sanitized value safe for environment variable
78 Examples:
79 >>> # Normal value passes through
80 >>> sanitize_header_value("Bearer token123")
81 'Bearer token123'
82 >>>
83 >>> # Long value gets truncated
84 >>> sanitize_header_value("a" * 100, max_length=10)
85 'aaaaaaaaaa'
86 >>>
87 >>> # Non-printable characters removed
88 >>> sanitize_header_value("hello\\x00world")
89 'helloworld'
90 >>>
91 >>> # Only printable ASCII kept
92 >>> sanitize_header_value("test\\x01value")
93 'testvalue'
94 """
95 if len(value) > max_length:
96 logger.warning(f"Header value truncated from {len(value)} to {max_length} characters")
97 value = value[:max_length]
99 # Remove potentially dangerous characters
100 value = re.sub(r"[^\x20-\x7E]", "", value) # Only printable ASCII
101 value = value.replace("\x00", "") # Remove null bytes
103 return value
106def parse_header_mappings(header_mappings: List[str]) -> Dict[str, str]:
107 """Parse header-to-environment mappings from CLI arguments.
109 Args:
110 header_mappings: List of "HEADER=ENV_VAR" strings
112 Returns:
113 Dictionary mapping header names to environment variable names
115 Raises:
116 HeaderMappingError: If any mapping is invalid, including case-insensitive duplicates
118 Examples:
119 >>> # Parse valid mappings
120 >>> parse_header_mappings(["Authorization=AUTH_TOKEN"])
121 {'Authorization': 'AUTH_TOKEN'}
122 >>>
123 >>> # Multiple mappings
124 >>> result = parse_header_mappings(["X-Api-Key=API_KEY", "X-User-Id=USER_ID"])
125 >>> result == {'X-Api-Key': 'API_KEY', 'X-User-Id': 'USER_ID'}
126 True
127 >>>
128 >>> # Invalid format (no equals)
129 >>> try:
130 ... parse_header_mappings(["InvalidMapping"])
131 ... except HeaderMappingError as e:
132 ... "Invalid mapping format" in str(e)
133 True
134 >>>
135 >>> # Empty list returns empty dict
136 >>> parse_header_mappings([])
137 {}
138 >>>
139 >>> # Case-insensitive duplicates are rejected
140 >>> try:
141 ... parse_header_mappings(["Authorization=AUTH1", "authorization=AUTH2"])
142 ... except HeaderMappingError as e:
143 ... "Case-insensitive duplicate" in str(e)
144 True
145 """
146 mappings = {}
147 # Track lowercase header names to detect case-insensitive duplicates
148 seen_lowercase: Dict[str, str] = {}
150 for mapping in header_mappings:
151 if "=" not in mapping:
152 raise HeaderMappingError(f"Invalid mapping format '{mapping}' - expected HEADER=ENV_VAR")
154 header_name, env_var_name = mapping.split("=", 1)
155 header_name = header_name.strip()
156 env_var_name = env_var_name.strip()
158 if not header_name or not env_var_name:
159 raise HeaderMappingError(f"Empty header name or environment variable name in '{mapping}'")
161 validate_header_mapping(header_name, env_var_name)
163 # Check for exact duplicate
164 if header_name in mappings:
165 raise HeaderMappingError(f"Duplicate header mapping for '{header_name}'")
167 # Check for case-insensitive duplicate (e.g., "Authorization" and "authorization")
168 header_lower = header_name.lower()
169 if header_lower in seen_lowercase:
170 original = seen_lowercase[header_lower]
171 raise HeaderMappingError(f"Case-insensitive duplicate header mapping: '{header_name}' conflicts with '{original}'")
173 seen_lowercase[header_lower] = header_name
174 mappings[header_name] = env_var_name
176 return mappings
179def normalize_headers(headers: Dict[str, str]) -> Dict[str, str]:
180 """Normalize request headers to lowercase keys for O(1) lookups.
182 Args:
183 headers: HTTP request headers with original case
185 Returns:
186 Dictionary with lowercase keys mapping to original values
188 Examples:
189 >>> normalize_headers({"Authorization": "Bearer token", "X-Api-Key": "key123"})
190 {'authorization': 'Bearer token', 'x-api-key': 'key123'}
191 >>> normalize_headers({})
192 {}
193 >>> normalize_headers({"CONTENT-TYPE": "application/json"})
194 {'content-type': 'application/json'}
195 """
196 return {k.lower(): v for k, v in headers.items()}
199class NormalizedMappings:
200 """Pre-normalized header mappings for efficient lookups.
202 Stores mappings with lowercase header keys for O(1) case-insensitive lookups.
203 Intended to be created once at config load time for repeated use.
205 Examples:
206 >>> mappings = NormalizedMappings({"Authorization": "AUTH_TOKEN", "X-Api-Key": "API_KEY"})
207 >>> mappings.get_env_var("authorization")
208 'AUTH_TOKEN'
209 >>> mappings.get_env_var("AUTHORIZATION")
210 'AUTH_TOKEN'
211 >>> mappings.get_env_var("x-api-key")
212 'API_KEY'
213 >>> mappings.get_env_var("unknown") is None
214 True
215 >>> list(mappings)
216 [('authorization', 'AUTH_TOKEN'), ('x-api-key', 'API_KEY')]
217 """
219 def __init__(self, header_mappings: Dict[str, str]):
220 """Initialize with header-to-env-var mappings.
222 Args:
223 header_mappings: Mapping of header names to environment variable names
224 """
225 # Store with lowercase keys for O(1) case-insensitive lookups
226 self._mappings: Dict[str, str] = {k.lower(): v for k, v in header_mappings.items()}
228 def get_env_var(self, header_name: str) -> str | None:
229 """Get environment variable name for a header (case-insensitive).
231 Args:
232 header_name: HTTP header name (any case)
234 Returns:
235 Environment variable name or None if not mapped
236 """
237 return self._mappings.get(header_name.lower())
239 def __iter__(self):
240 """Iterate over (lowercase_header, env_var) pairs.
242 Returns:
243 Iterator of (header_name, env_var_name) tuples
244 """
245 return iter(self._mappings.items())
247 def __len__(self) -> int:
248 """Return number of mappings.
250 Returns:
251 Number of header-to-env-var mappings
252 """
253 return len(self._mappings)
255 def values(self):
256 """Return environment variable names (values of the mappings).
258 Returns:
259 View of environment variable names
261 Examples:
262 >>> mappings = NormalizedMappings({"Authorization": "AUTH", "X-Api-Key": "KEY"})
263 >>> sorted(mappings.values())
264 ['AUTH', 'KEY']
265 """
266 return self._mappings.values()
268 def __bool__(self) -> bool:
269 """Return True if there are any mappings.
271 Returns:
272 True if mappings exist, False if empty
273 """
274 return bool(self._mappings)
277def extract_env_vars_from_headers(request_headers: Dict[str, str], header_mappings: Dict[str, str] | NormalizedMappings) -> Dict[str, str]:
278 """Extract environment variables from request headers.
280 Optimized for O(mappings + headers) complexity by pre-normalizing headers
281 to lowercase for O(1) lookups instead of nested O(mappings × headers) scans.
283 Args:
284 request_headers: HTTP request headers
285 header_mappings: Mapping of header names to environment variable names,
286 or a pre-normalized NormalizedMappings instance
288 Returns:
289 Dictionary of environment variable name -> sanitized value
291 Examples:
292 >>> # Extract matching headers
293 >>> headers = {"Authorization": "Bearer token123", "Content-Type": "application/json"}
294 >>> mappings = {"Authorization": "AUTH_TOKEN"}
295 >>> extract_env_vars_from_headers(headers, mappings)
296 {'AUTH_TOKEN': 'Bearer token123'}
297 >>>
298 >>> # Case-insensitive matching
299 >>> headers = {"authorization": "Bearer token"}
300 >>> mappings = {"Authorization": "AUTH"}
301 >>> extract_env_vars_from_headers(headers, mappings)
302 {'AUTH': 'Bearer token'}
303 >>>
304 >>> # No matching headers
305 >>> headers = {"X-Other": "value"}
306 >>> mappings = {"Authorization": "AUTH"}
307 >>> extract_env_vars_from_headers(headers, mappings)
308 {}
309 >>>
310 >>> # Empty mappings
311 >>> extract_env_vars_from_headers({"Header": "value"}, {})
312 {}
313 >>>
314 >>> # Using NormalizedMappings for repeated lookups
315 >>> nm = NormalizedMappings({"Authorization": "AUTH"})
316 >>> extract_env_vars_from_headers({"authorization": "token"}, nm)
317 {'AUTH': 'token'}
318 """
319 env_vars = {}
321 # Pre-normalize request headers once - O(headers)
322 normalized_headers = normalize_headers(request_headers)
324 # Convert to NormalizedMappings if plain dict provided
325 if isinstance(header_mappings, dict):
326 normalized_mappings = NormalizedMappings(header_mappings)
327 else:
328 normalized_mappings = header_mappings
330 # O(1) lookup per mapping - O(mappings) total
331 for header_lower, env_var_name in normalized_mappings:
332 header_value = normalized_headers.get(header_lower)
334 if header_value is not None:
335 try:
336 sanitized_value = sanitize_header_value(header_value)
337 if sanitized_value: # Only add non-empty values
338 env_vars[env_var_name] = sanitized_value
339 logger.debug(f"Mapped header {header_lower} to {env_var_name}")
340 else:
341 logger.warning(f"Header {header_lower} value became empty after sanitization")
342 except Exception as e:
343 logger.warning(f"Failed to process header {header_lower}: {e}")
345 return env_vars