Coverage for mcpgateway / translate_header_utils.py: 100%

76 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Header processing utilities for dynamic environment injection in translate module. 

3 

4Location: ./mcpgateway/translate_header_utils.py 

5Copyright 2025 

6SPDX-License-Identifier: Apache-2.0 

7Authors: Manav Gupta 

8 

9Header processing utilities for dynamic environment variable injection in mcpgateway.translate. 

10""" 

11 

12# Standard 

13import logging 

14import re 

15from typing import Dict, List 

16 

17logger = logging.getLogger(__name__) 

18 

19# Security constants 

20ALLOWED_HEADERS_REGEX = re.compile(r"^[A-Za-z][A-Za-z0-9\-]*$") 

21MAX_HEADER_VALUE_LENGTH = 4096 

22MAX_ENV_VAR_NAME_LENGTH = 64 

23 

24 

25class HeaderMappingError(Exception): 

26 """Raised when header mapping configuration is invalid.""" 

27 

28 

29def validate_header_mapping(header_name: str, env_var_name: str) -> None: 

30 """Validate header name and environment variable name. 

31 

32 Args: 

33 header_name: HTTP header name 

34 env_var_name: Environment variable name 

35 

36 Raises: 

37 HeaderMappingError: If validation fails 

38 

39 Examples: 

40 >>> # Valid mappings 

41 >>> validate_header_mapping("Authorization", "AUTH_TOKEN") 

42 >>> validate_header_mapping("X-Custom-Header", "CUSTOM_VAR") 

43 >>> 

44 >>> # Invalid header name 

45 >>> try: 

46 ... validate_header_mapping("Invalid Header!", "VAR") 

47 ... except HeaderMappingError as e: 

48 ... "Invalid header name" in str(e) 

49 True 

50 >>> 

51 >>> # Invalid env var name 

52 >>> try: 

53 ... validate_header_mapping("Header", "123_VAR") 

54 ... except HeaderMappingError as e: 

55 ... "Invalid environment variable name" in str(e) 

56 True 

57 """ 

58 if not ALLOWED_HEADERS_REGEX.match(header_name): 

59 raise HeaderMappingError(f"Invalid header name '{header_name}' - must contain only alphanumeric characters and hyphens") 

60 

61 if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", env_var_name): 

62 raise HeaderMappingError(f"Invalid environment variable name '{env_var_name}' - must start with letter/underscore and contain only alphanumeric characters and underscores") 

63 

64 if len(env_var_name) > MAX_ENV_VAR_NAME_LENGTH: 

65 raise HeaderMappingError(f"Environment variable name too long: {env_var_name}") 

66 

67 

68def sanitize_header_value(value: str, max_length: int = MAX_HEADER_VALUE_LENGTH) -> str: 

69 """Sanitize header value for environment variable injection. 

70 

71 Args: 

72 value: Raw header value 

73 max_length: Maximum allowed length for the value 

74 

75 Returns: 

76 Sanitized value safe for environment variable 

77 

78 Examples: 

79 >>> # Normal value passes through 

80 >>> sanitize_header_value("Bearer token123") 

81 'Bearer token123' 

82 >>> 

83 >>> # Long value gets truncated 

84 >>> sanitize_header_value("a" * 100, max_length=10) 

85 'aaaaaaaaaa' 

86 >>> 

87 >>> # Non-printable characters removed 

88 >>> sanitize_header_value("hello\\x00world") 

89 'helloworld' 

90 >>> 

91 >>> # Only printable ASCII kept 

92 >>> sanitize_header_value("test\\x01value") 

93 'testvalue' 

94 """ 

95 if len(value) > max_length: 

96 logger.warning(f"Header value truncated from {len(value)} to {max_length} characters") 

97 value = value[:max_length] 

98 

99 # Remove potentially dangerous characters 

100 value = re.sub(r"[^\x20-\x7E]", "", value) # Only printable ASCII 

101 value = value.replace("\x00", "") # Remove null bytes 

102 

103 return value 

104 

105 

106def parse_header_mappings(header_mappings: List[str]) -> Dict[str, str]: 

107 """Parse header-to-environment mappings from CLI arguments. 

108 

109 Args: 

110 header_mappings: List of "HEADER=ENV_VAR" strings 

111 

112 Returns: 

113 Dictionary mapping header names to environment variable names 

114 

115 Raises: 

116 HeaderMappingError: If any mapping is invalid, including case-insensitive duplicates 

117 

118 Examples: 

119 >>> # Parse valid mappings 

120 >>> parse_header_mappings(["Authorization=AUTH_TOKEN"]) 

121 {'Authorization': 'AUTH_TOKEN'} 

122 >>> 

123 >>> # Multiple mappings 

124 >>> result = parse_header_mappings(["X-Api-Key=API_KEY", "X-User-Id=USER_ID"]) 

125 >>> result == {'X-Api-Key': 'API_KEY', 'X-User-Id': 'USER_ID'} 

126 True 

127 >>> 

128 >>> # Invalid format (no equals) 

129 >>> try: 

130 ... parse_header_mappings(["InvalidMapping"]) 

131 ... except HeaderMappingError as e: 

132 ... "Invalid mapping format" in str(e) 

133 True 

134 >>> 

135 >>> # Empty list returns empty dict 

136 >>> parse_header_mappings([]) 

137 {} 

138 >>> 

139 >>> # Case-insensitive duplicates are rejected 

140 >>> try: 

141 ... parse_header_mappings(["Authorization=AUTH1", "authorization=AUTH2"]) 

142 ... except HeaderMappingError as e: 

143 ... "Case-insensitive duplicate" in str(e) 

144 True 

145 """ 

146 mappings = {} 

147 # Track lowercase header names to detect case-insensitive duplicates 

148 seen_lowercase: Dict[str, str] = {} 

149 

150 for mapping in header_mappings: 

151 if "=" not in mapping: 

152 raise HeaderMappingError(f"Invalid mapping format '{mapping}' - expected HEADER=ENV_VAR") 

153 

154 header_name, env_var_name = mapping.split("=", 1) 

155 header_name = header_name.strip() 

156 env_var_name = env_var_name.strip() 

157 

158 if not header_name or not env_var_name: 

159 raise HeaderMappingError(f"Empty header name or environment variable name in '{mapping}'") 

160 

161 validate_header_mapping(header_name, env_var_name) 

162 

163 # Check for exact duplicate 

164 if header_name in mappings: 

165 raise HeaderMappingError(f"Duplicate header mapping for '{header_name}'") 

166 

167 # Check for case-insensitive duplicate (e.g., "Authorization" and "authorization") 

168 header_lower = header_name.lower() 

169 if header_lower in seen_lowercase: 

170 original = seen_lowercase[header_lower] 

171 raise HeaderMappingError(f"Case-insensitive duplicate header mapping: '{header_name}' conflicts with '{original}'") 

172 

173 seen_lowercase[header_lower] = header_name 

174 mappings[header_name] = env_var_name 

175 

176 return mappings 

177 

178 

179def normalize_headers(headers: Dict[str, str]) -> Dict[str, str]: 

180 """Normalize request headers to lowercase keys for O(1) lookups. 

181 

182 Args: 

183 headers: HTTP request headers with original case 

184 

185 Returns: 

186 Dictionary with lowercase keys mapping to original values 

187 

188 Examples: 

189 >>> normalize_headers({"Authorization": "Bearer token", "X-Api-Key": "key123"}) 

190 {'authorization': 'Bearer token', 'x-api-key': 'key123'} 

191 >>> normalize_headers({}) 

192 {} 

193 >>> normalize_headers({"CONTENT-TYPE": "application/json"}) 

194 {'content-type': 'application/json'} 

195 """ 

196 return {k.lower(): v for k, v in headers.items()} 

197 

198 

199class NormalizedMappings: 

200 """Pre-normalized header mappings for efficient lookups. 

201 

202 Stores mappings with lowercase header keys for O(1) case-insensitive lookups. 

203 Intended to be created once at config load time for repeated use. 

204 

205 Examples: 

206 >>> mappings = NormalizedMappings({"Authorization": "AUTH_TOKEN", "X-Api-Key": "API_KEY"}) 

207 >>> mappings.get_env_var("authorization") 

208 'AUTH_TOKEN' 

209 >>> mappings.get_env_var("AUTHORIZATION") 

210 'AUTH_TOKEN' 

211 >>> mappings.get_env_var("x-api-key") 

212 'API_KEY' 

213 >>> mappings.get_env_var("unknown") is None 

214 True 

215 >>> list(mappings) 

216 [('authorization', 'AUTH_TOKEN'), ('x-api-key', 'API_KEY')] 

217 """ 

218 

219 def __init__(self, header_mappings: Dict[str, str]): 

220 """Initialize with header-to-env-var mappings. 

221 

222 Args: 

223 header_mappings: Mapping of header names to environment variable names 

224 """ 

225 # Store with lowercase keys for O(1) case-insensitive lookups 

226 self._mappings: Dict[str, str] = {k.lower(): v for k, v in header_mappings.items()} 

227 

228 def get_env_var(self, header_name: str) -> str | None: 

229 """Get environment variable name for a header (case-insensitive). 

230 

231 Args: 

232 header_name: HTTP header name (any case) 

233 

234 Returns: 

235 Environment variable name or None if not mapped 

236 """ 

237 return self._mappings.get(header_name.lower()) 

238 

239 def __iter__(self): 

240 """Iterate over (lowercase_header, env_var) pairs. 

241 

242 Returns: 

243 Iterator of (header_name, env_var_name) tuples 

244 """ 

245 return iter(self._mappings.items()) 

246 

247 def __len__(self) -> int: 

248 """Return number of mappings. 

249 

250 Returns: 

251 Number of header-to-env-var mappings 

252 """ 

253 return len(self._mappings) 

254 

255 def values(self): 

256 """Return environment variable names (values of the mappings). 

257 

258 Returns: 

259 View of environment variable names 

260 

261 Examples: 

262 >>> mappings = NormalizedMappings({"Authorization": "AUTH", "X-Api-Key": "KEY"}) 

263 >>> sorted(mappings.values()) 

264 ['AUTH', 'KEY'] 

265 """ 

266 return self._mappings.values() 

267 

268 def __bool__(self) -> bool: 

269 """Return True if there are any mappings. 

270 

271 Returns: 

272 True if mappings exist, False if empty 

273 """ 

274 return bool(self._mappings) 

275 

276 

277def extract_env_vars_from_headers(request_headers: Dict[str, str], header_mappings: Dict[str, str] | NormalizedMappings) -> Dict[str, str]: 

278 """Extract environment variables from request headers. 

279 

280 Optimized for O(mappings + headers) complexity by pre-normalizing headers 

281 to lowercase for O(1) lookups instead of nested O(mappings × headers) scans. 

282 

283 Args: 

284 request_headers: HTTP request headers 

285 header_mappings: Mapping of header names to environment variable names, 

286 or a pre-normalized NormalizedMappings instance 

287 

288 Returns: 

289 Dictionary of environment variable name -> sanitized value 

290 

291 Examples: 

292 >>> # Extract matching headers 

293 >>> headers = {"Authorization": "Bearer token123", "Content-Type": "application/json"} 

294 >>> mappings = {"Authorization": "AUTH_TOKEN"} 

295 >>> extract_env_vars_from_headers(headers, mappings) 

296 {'AUTH_TOKEN': 'Bearer token123'} 

297 >>> 

298 >>> # Case-insensitive matching 

299 >>> headers = {"authorization": "Bearer token"} 

300 >>> mappings = {"Authorization": "AUTH"} 

301 >>> extract_env_vars_from_headers(headers, mappings) 

302 {'AUTH': 'Bearer token'} 

303 >>> 

304 >>> # No matching headers 

305 >>> headers = {"X-Other": "value"} 

306 >>> mappings = {"Authorization": "AUTH"} 

307 >>> extract_env_vars_from_headers(headers, mappings) 

308 {} 

309 >>> 

310 >>> # Empty mappings 

311 >>> extract_env_vars_from_headers({"Header": "value"}, {}) 

312 {} 

313 >>> 

314 >>> # Using NormalizedMappings for repeated lookups 

315 >>> nm = NormalizedMappings({"Authorization": "AUTH"}) 

316 >>> extract_env_vars_from_headers({"authorization": "token"}, nm) 

317 {'AUTH': 'token'} 

318 """ 

319 env_vars = {} 

320 

321 # Pre-normalize request headers once - O(headers) 

322 normalized_headers = normalize_headers(request_headers) 

323 

324 # Convert to NormalizedMappings if plain dict provided 

325 if isinstance(header_mappings, dict): 

326 normalized_mappings = NormalizedMappings(header_mappings) 

327 else: 

328 normalized_mappings = header_mappings 

329 

330 # O(1) lookup per mapping - O(mappings) total 

331 for header_lower, env_var_name in normalized_mappings: 

332 header_value = normalized_headers.get(header_lower) 

333 

334 if header_value is not None: 

335 try: 

336 sanitized_value = sanitize_header_value(header_value) 

337 if sanitized_value: # Only add non-empty values 

338 env_vars[env_var_name] = sanitized_value 

339 logger.debug(f"Mapped header {header_lower} to {env_var_name}") 

340 else: 

341 logger.warning(f"Header {header_lower} value became empty after sanitization") 

342 except Exception as e: 

343 logger.warning(f"Failed to process header {header_lower}: {e}") 

344 

345 return env_vars