Coverage for mcpgateway / utils / url_auth.py: 100%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""URL authentication helpers for query parameter auth. 

3 

4Provides utilities for appending decrypted auth query parameters to URLs 

5and sanitizing URLs for safe logging (redacting sensitive query params). 

6 

7Security Note: 

8 Query parameter authentication is inherently insecure (CWE-598: Use of GET 

9 Request Method With Sensitive Query Strings). API keys in URLs may appear 

10 in proxy logs, browser history, and server access logs. Use only when the 

11 upstream server (e.g., Tavily MCP) requires this authentication method. 

12 

13Copyright 2025 

14SPDX-License-Identifier: Apache-2.0 

15""" 

16 

17# Standard 

18import re 

19from typing import Dict, FrozenSet, Optional 

20from urllib.parse import parse_qs, urlencode, urlparse, urlunparse 

21 

22# Static set of commonly sensitive query parameter names 

23# Used as a fallback when gateway-specific params aren't available 

24STATIC_SENSITIVE_PARAMS: FrozenSet[str] = frozenset( 

25 { 

26 "api_key", 

27 "apikey", 

28 "api-key", 

29 "key", 

30 "token", 

31 "access_token", 

32 "auth", 

33 "auth_token", 

34 "secret", 

35 "password", 

36 "pwd", 

37 "credential", 

38 "credentials", 

39 "tavilyapikey", # Tavily-specific 

40 "tavilyApiKey", # Tavily-specific (camelCase) 

41 } 

42) 

43 

44 

45def apply_query_param_auth( 

46 url: str, 

47 auth_query_params: Optional[Dict[str, str]], 

48) -> str: 

49 """Append decrypted auth query parameters to a URL. 

50 

51 Args: 

52 url: The base URL to append parameters to. 

53 auth_query_params: Dict of {param_name: decrypted_value}. 

54 If None or empty, returns the original URL unchanged. 

55 

56 Returns: 

57 URL with auth query parameters appended. 

58 

59 Example: 

60 >>> apply_query_param_auth( 

61 ... "https://api.tavily.com/mcp", 

62 ... {"tavilyApiKey": "secret123"} 

63 ... ) 

64 'https://api.tavily.com/mcp?tavilyApiKey=secret123' 

65 

66 >>> apply_query_param_auth( 

67 ... "https://api.example.com/search?q=test", 

68 ... {"api_key": "abc123"} 

69 ... ) 

70 'https://api.example.com/search?q=test&api_key=abc123' 

71 """ 

72 if not auth_query_params: 

73 return url 

74 

75 parsed = urlparse(url) 

76 

77 # Parse existing query params (preserving order and duplicates) 

78 existing_params = parse_qs(parsed.query, keep_blank_values=True) 

79 

80 # Flatten existing params (parse_qs returns lists) 

81 flat_params: Dict[str, str] = {} 

82 for k, v in existing_params.items(): 

83 flat_params[k] = v[0] if v else "" 

84 

85 # Add auth params (these will override if same key exists) 

86 flat_params.update(auth_query_params) 

87 

88 # Rebuild the query string 

89 new_query = urlencode(flat_params) 

90 

91 # Reconstruct URL 

92 new_parsed = parsed._replace(query=new_query) 

93 return urlunparse(new_parsed) 

94 

95 

96def sanitize_url_for_logging( 

97 url: str, 

98 auth_query_params: Optional[Dict[str, str]] = None, 

99) -> str: 

100 """Redact sensitive query parameters from a URL for safe logging. 

101 

102 This function removes or masks sensitive query parameters to prevent 

103 API keys and other secrets from appearing in logs, error messages, 

104 and exception traces. 

105 

106 Args: 

107 url: The URL to sanitize. 

108 auth_query_params: Optional dict of {param_name: value} that are 

109 known to be sensitive (e.g., the gateway's configured auth params). 

110 These param names will always be redacted regardless of their value. 

111 

112 Returns: 

113 URL with sensitive parameter values replaced with "REDACTED". 

114 

115 Example: 

116 >>> sanitize_url_for_logging( 

117 ... "https://api.tavily.com/mcp?tavilyApiKey=secret123", 

118 ... {"tavilyApiKey": "secret123"} 

119 ... ) 

120 'https://api.tavily.com/mcp?tavilyApiKey=REDACTED' 

121 

122 >>> # Also catches static sensitive params 

123 >>> sanitize_url_for_logging( 

124 ... "https://api.example.com?api_key=secret&q=search" 

125 ... ) 

126 'https://api.example.com?api_key=REDACTED&q=search' 

127 

128 >>> # Also redacts userinfo (user:pass@host) 

129 >>> sanitize_url_for_logging( 

130 ... "https://admin:secret123@api.example.com/endpoint" 

131 ... ) 

132 'https://REDACTED:REDACTED@api.example.com/endpoint' 

133 

134 >>> # Preserves IPv6 bracket formatting 

135 >>> sanitize_url_for_logging( 

136 ... "https://user:pass@[::1]:8080/path" 

137 ... ) 

138 'https://REDACTED:REDACTED@[::1]:8080/path' 

139 """ 

140 parsed = urlparse(url) 

141 

142 # Redact userinfo (user:pass@host) if present - defense in depth 

143 if parsed.username or parsed.password: 

144 # Extract host:port from original netloc (preserves IPv6 brackets, handles None hostname) 

145 host_part = parsed.netloc.split("@", 1)[-1] if "@" in parsed.netloc else parsed.netloc 

146 netloc = f"REDACTED:REDACTED@{host_part}" 

147 parsed = parsed._replace(netloc=netloc) 

148 

149 if not parsed.query: 

150 return urlunparse(parsed) if (parsed.username or parsed.password) else url 

151 

152 # Build set of param names to redact 

153 sensitive_names = set(STATIC_SENSITIVE_PARAMS) 

154 if auth_query_params: 

155 # Add gateway-specific param names (case-insensitive lookup) 

156 sensitive_names.update(k.lower() for k in auth_query_params.keys()) 

157 sensitive_names.update(auth_query_params.keys()) # Also exact case 

158 

159 # Parse existing query params 

160 existing_params = parse_qs(parsed.query, keep_blank_values=True) 

161 

162 # Redact sensitive values 

163 sanitized_params: Dict[str, str] = {} 

164 for k, v in existing_params.items(): 

165 # Check if this param name is sensitive (case-insensitive) 

166 if k.lower() in sensitive_names or k in sensitive_names: 

167 sanitized_params[k] = "REDACTED" 

168 else: 

169 sanitized_params[k] = v[0] if v else "" 

170 

171 # Rebuild the query string 

172 new_query = urlencode(sanitized_params) 

173 

174 # Reconstruct URL 

175 new_parsed = parsed._replace(query=new_query) 

176 return urlunparse(new_parsed) 

177 

178 

179# Regex to match URLs in text (http:// or https://) 

180_URL_PATTERN = re.compile(r"https?://[^\s<>\"']+") 

181 

182 

183def sanitize_exception_message( 

184 message: str, 

185 auth_query_params: Optional[Dict[str, str]] = None, 

186) -> str: 

187 """Sanitize URLs embedded within exception messages. 

188 

189 Exception messages from HTTP libraries (httpx, aiohttp, etc.) often include 

190 the full URL, which may contain sensitive query parameters. This function 

191 finds and sanitizes all URLs in the message. 

192 

193 Args: 

194 message: The exception message (str(e)) to sanitize. 

195 auth_query_params: Optional dict of known sensitive param names. 

196 

197 Returns: 

198 Message with all embedded URLs sanitized. 

199 

200 Example: 

201 >>> sanitize_exception_message( 

202 ... "Connection failed: https://api.tavily.com/mcp?tavilyApiKey=secret123", 

203 ... {"tavilyApiKey": "secret123"} 

204 ... ) 

205 'Connection failed: https://api.tavily.com/mcp?tavilyApiKey=REDACTED' 

206 

207 >>> sanitize_exception_message( 

208 ... "Error connecting to https://api.example.com?api_key=abc&q=test" 

209 ... ) 

210 'Error connecting to https://api.example.com?api_key=REDACTED&q=test' 

211 """ 

212 if not message: 

213 return message 

214 

215 def replace_url(match: re.Match) -> str: 

216 """Replace a matched URL with its sanitized version. 

217 

218 Args: 

219 match: Regex match object containing the URL. 

220 

221 Returns: 

222 Sanitized URL with sensitive params redacted. 

223 """ 

224 url = match.group(0) 

225 return sanitize_url_for_logging(url, auth_query_params) 

226 

227 return _URL_PATTERN.sub(replace_url, message)