Coverage for mcpgateway / utils / url_auth.py: 100%
45 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""URL authentication helpers for query parameter auth.
4Provides utilities for appending decrypted auth query parameters to URLs
5and sanitizing URLs for safe logging (redacting sensitive query params).
7Security Note:
8 Query parameter authentication is inherently insecure (CWE-598: Use of GET
9 Request Method With Sensitive Query Strings). API keys in URLs may appear
10 in proxy logs, browser history, and server access logs. Use only when the
11 upstream server (e.g., Tavily MCP) requires this authentication method.
13Copyright 2025
14SPDX-License-Identifier: Apache-2.0
15"""
17# Standard
18import re
19from typing import Dict, FrozenSet, Optional
20from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
22# Static set of commonly sensitive query parameter names
23# Used as a fallback when gateway-specific params aren't available
24STATIC_SENSITIVE_PARAMS: FrozenSet[str] = frozenset(
25 {
26 "api_key",
27 "apikey",
28 "api-key",
29 "key",
30 "token",
31 "access_token",
32 "auth",
33 "auth_token",
34 "secret",
35 "password",
36 "pwd",
37 "credential",
38 "credentials",
39 "tavilyapikey", # Tavily-specific
40 "tavilyApiKey", # Tavily-specific (camelCase)
41 }
42)
45def apply_query_param_auth(
46 url: str,
47 auth_query_params: Optional[Dict[str, str]],
48) -> str:
49 """Append decrypted auth query parameters to a URL.
51 Args:
52 url: The base URL to append parameters to.
53 auth_query_params: Dict of {param_name: decrypted_value}.
54 If None or empty, returns the original URL unchanged.
56 Returns:
57 URL with auth query parameters appended.
59 Example:
60 >>> apply_query_param_auth(
61 ... "https://api.tavily.com/mcp",
62 ... {"tavilyApiKey": "secret123"}
63 ... )
64 'https://api.tavily.com/mcp?tavilyApiKey=secret123'
66 >>> apply_query_param_auth(
67 ... "https://api.example.com/search?q=test",
68 ... {"api_key": "abc123"}
69 ... )
70 'https://api.example.com/search?q=test&api_key=abc123'
71 """
72 if not auth_query_params:
73 return url
75 parsed = urlparse(url)
77 # Parse existing query params (preserving order and duplicates)
78 existing_params = parse_qs(parsed.query, keep_blank_values=True)
80 # Flatten existing params (parse_qs returns lists)
81 flat_params: Dict[str, str] = {}
82 for k, v in existing_params.items():
83 flat_params[k] = v[0] if v else ""
85 # Add auth params (these will override if same key exists)
86 flat_params.update(auth_query_params)
88 # Rebuild the query string
89 new_query = urlencode(flat_params)
91 # Reconstruct URL
92 new_parsed = parsed._replace(query=new_query)
93 return urlunparse(new_parsed)
96def sanitize_url_for_logging(
97 url: str,
98 auth_query_params: Optional[Dict[str, str]] = None,
99) -> str:
100 """Redact sensitive query parameters from a URL for safe logging.
102 This function removes or masks sensitive query parameters to prevent
103 API keys and other secrets from appearing in logs, error messages,
104 and exception traces.
106 Args:
107 url: The URL to sanitize.
108 auth_query_params: Optional dict of {param_name: value} that are
109 known to be sensitive (e.g., the gateway's configured auth params).
110 These param names will always be redacted regardless of their value.
112 Returns:
113 URL with sensitive parameter values replaced with "REDACTED".
115 Example:
116 >>> sanitize_url_for_logging(
117 ... "https://api.tavily.com/mcp?tavilyApiKey=secret123",
118 ... {"tavilyApiKey": "secret123"}
119 ... )
120 'https://api.tavily.com/mcp?tavilyApiKey=REDACTED'
122 >>> # Also catches static sensitive params
123 >>> sanitize_url_for_logging(
124 ... "https://api.example.com?api_key=secret&q=search"
125 ... )
126 'https://api.example.com?api_key=REDACTED&q=search'
128 >>> # Also redacts userinfo (user:pass@host)
129 >>> sanitize_url_for_logging(
130 ... "https://admin:secret123@api.example.com/endpoint"
131 ... )
132 'https://REDACTED:REDACTED@api.example.com/endpoint'
134 >>> # Preserves IPv6 bracket formatting
135 >>> sanitize_url_for_logging(
136 ... "https://user:pass@[::1]:8080/path"
137 ... )
138 'https://REDACTED:REDACTED@[::1]:8080/path'
139 """
140 parsed = urlparse(url)
142 # Redact userinfo (user:pass@host) if present - defense in depth
143 if parsed.username or parsed.password:
144 # Extract host:port from original netloc (preserves IPv6 brackets, handles None hostname)
145 host_part = parsed.netloc.split("@", 1)[-1] if "@" in parsed.netloc else parsed.netloc
146 netloc = f"REDACTED:REDACTED@{host_part}"
147 parsed = parsed._replace(netloc=netloc)
149 if not parsed.query:
150 return urlunparse(parsed) if (parsed.username or parsed.password) else url
152 # Build set of param names to redact
153 sensitive_names = set(STATIC_SENSITIVE_PARAMS)
154 if auth_query_params:
155 # Add gateway-specific param names (case-insensitive lookup)
156 sensitive_names.update(k.lower() for k in auth_query_params.keys())
157 sensitive_names.update(auth_query_params.keys()) # Also exact case
159 # Parse existing query params
160 existing_params = parse_qs(parsed.query, keep_blank_values=True)
162 # Redact sensitive values
163 sanitized_params: Dict[str, str] = {}
164 for k, v in existing_params.items():
165 # Check if this param name is sensitive (case-insensitive)
166 if k.lower() in sensitive_names or k in sensitive_names:
167 sanitized_params[k] = "REDACTED"
168 else:
169 sanitized_params[k] = v[0] if v else ""
171 # Rebuild the query string
172 new_query = urlencode(sanitized_params)
174 # Reconstruct URL
175 new_parsed = parsed._replace(query=new_query)
176 return urlunparse(new_parsed)
179# Regex to match URLs in text (http:// or https://)
180_URL_PATTERN = re.compile(r"https?://[^\s<>\"']+")
183def sanitize_exception_message(
184 message: str,
185 auth_query_params: Optional[Dict[str, str]] = None,
186) -> str:
187 """Sanitize URLs embedded within exception messages.
189 Exception messages from HTTP libraries (httpx, aiohttp, etc.) often include
190 the full URL, which may contain sensitive query parameters. This function
191 finds and sanitizes all URLs in the message.
193 Args:
194 message: The exception message (str(e)) to sanitize.
195 auth_query_params: Optional dict of known sensitive param names.
197 Returns:
198 Message with all embedded URLs sanitized.
200 Example:
201 >>> sanitize_exception_message(
202 ... "Connection failed: https://api.tavily.com/mcp?tavilyApiKey=secret123",
203 ... {"tavilyApiKey": "secret123"}
204 ... )
205 'Connection failed: https://api.tavily.com/mcp?tavilyApiKey=REDACTED'
207 >>> sanitize_exception_message(
208 ... "Error connecting to https://api.example.com?api_key=abc&q=test"
209 ... )
210 'Error connecting to https://api.example.com?api_key=REDACTED&q=test'
211 """
212 if not message:
213 return message
215 def replace_url(match: re.Match) -> str:
216 """Replace a matched URL with its sanitized version.
218 Args:
219 match: Regex match object containing the URL.
221 Returns:
222 Sanitized URL with sensitive params redacted.
223 """
224 url = match.group(0)
225 return sanitize_url_for_logging(url, auth_query_params)
227 return _URL_PATTERN.sub(replace_url, message)