Coverage for mcpgateway / services / support_bundle_service.py: 100%
117 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/support_bundle_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Support Bundle Service - Generate diagnostic bundles for troubleshooting.
9This module provides functionality to create comprehensive support bundles containing
10system diagnostics, logs, configuration, and other debugging information with automatic
11sanitization of sensitive data (passwords, tokens, API keys).
13Features:
14- Version and system information collection
15- Log file collection with size limits and sanitization
16- Environment configuration with secret redaction
17- Database connection info (sanitized)
18- Platform and dependency information
19- ZIP archive generation with timestamped filenames
21Examples:
22 >>> from mcpgateway.services.support_bundle_service import SupportBundleService
23 >>> service = SupportBundleService()
24 >>> bundle_path = service.generate_bundle()
25 >>> bundle_path.exists()
26 True
27 >>> bundle_path.name.startswith('mcpgateway-support-')
28 True
29 >>> bundle_path.suffix
30 '.zip'
31"""
33# Future
34from __future__ import annotations
36# Standard
37from datetime import datetime, timezone
38import os
39from pathlib import Path
40import platform
41import re
42import socket
43import tempfile
44from typing import Any, Dict, Optional
45import zipfile
47# Third-Party
48import orjson
49from pydantic import BaseModel, Field
51# First-Party
52from mcpgateway import __version__
53from mcpgateway.config import settings
54from mcpgateway.db import engine
57class SupportBundleConfig(BaseModel):
58 """Configuration for support bundle generation.
60 Attributes:
61 include_logs: Include log files in bundle
62 include_env: Include environment configuration
63 include_system_info: Include system diagnostics
64 max_log_size_mb: Maximum log file size to include (MB)
65 log_tail_lines: Number of log lines to include (0 = all)
66 output_dir: Directory for bundle output
67 """
69 include_logs: bool = Field(default=True, description="Include log files in bundle")
70 include_env: bool = Field(default=True, description="Include environment configuration")
71 include_system_info: bool = Field(default=True, description="Include system diagnostics")
72 max_log_size_mb: float = Field(default=10.0, description="Maximum log file size in MB")
73 log_tail_lines: int = Field(default=1000, description="Number of log lines to include (0 = all)")
74 output_dir: Optional[Path] = Field(default=None, description="Output directory for bundle")
77class SupportBundleService:
78 """Service for generating support bundles with sanitized diagnostic information.
80 This service collects system information, logs, and configuration data while
81 automatically sanitizing sensitive information like passwords, tokens, and API keys.
83 Examples:
84 >>> from mcpgateway.services.support_bundle_service import SupportBundleService, SupportBundleConfig
85 >>> service = SupportBundleService()
86 >>> config = SupportBundleConfig(log_tail_lines=500)
87 >>> bundle_path = service.generate_bundle(config)
88 >>> bundle_path.exists()
89 True
90 >>> bundle_path.suffix
91 '.zip'
92 """
94 # Patterns for sanitizing sensitive data in logs
95 SENSITIVE_PATTERNS = [
96 (re.compile(r'password["\']?\s*[:=]\s*["\']?([^"\'\s,}]+)', re.IGNORECASE), r"password: *****"),
97 (re.compile(r'token["\']?\s*[:=]\s*["\']?([^"\'\s,}]+)', re.IGNORECASE), r"token: *****"),
98 (re.compile(r'api[_-]?key["\']?\s*[:=]\s*["\']?([^"\'\s,}]+)', re.IGNORECASE), r"api_key: *****"),
99 (re.compile(r'secret["\']?\s*[:=]\s*["\']?([^"\'\s,}]+)', re.IGNORECASE), r"secret: *****"),
100 (re.compile(r"bearer\s+[A-Za-z0-9\-._~+/]+=*", re.IGNORECASE), r"bearer *****"),
101 (re.compile(r'authorization:\s*["\']?([^"\'\s,}]+)', re.IGNORECASE), r"authorization: *****"),
102 # Database URLs
103 (re.compile(r"(postgresql|mysql|redis)://([^:]+):([^@]+)@"), r"\1://\2:*****@"),
104 # JWT tokens (eyJ pattern)
105 (re.compile(r"eyJ[A-Za-z0-9\-_]+\.eyJ[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+"), r"eyJ*****"),
106 ]
108 def __init__(self):
109 """Initialize the support bundle service."""
110 self.hostname = socket.gethostname()
111 self.timestamp = datetime.now(timezone.utc)
113 def _is_secret(self, key: str) -> bool:
114 """Check if an environment variable key represents a secret.
116 Args:
117 key: Environment variable name
119 Returns:
120 bool: True if the key likely contains sensitive data
122 Examples:
123 >>> service = SupportBundleService()
124 >>> service._is_secret("DATABASE_PASSWORD")
125 True
126 >>> service._is_secret("API_KEY")
127 True
128 >>> service._is_secret("DEBUG")
129 False
130 """
131 key_upper = key.upper()
132 # Check for common secret keywords
133 if any(tok in key_upper for tok in ("SECRET", "TOKEN", "PASS", "KEY")):
134 return True
135 # Check for specific secret environment variables
136 secret_vars = {
137 "BASIC_AUTH_USER",
138 "BASIC_AUTH_PASSWORD",
139 "DATABASE_URL",
140 "REDIS_URL",
141 "JWT_SECRET_KEY",
142 "AUTH_ENCRYPTION_SECRET",
143 }
144 return key_upper in secret_vars
146 def _sanitize_url(self, url: Optional[str]) -> Optional[str]:
147 """Redact credentials from URLs.
149 Args:
150 url: URL to sanitize
152 Returns:
153 Optional[str]: Sanitized URL or None
155 Examples:
156 >>> service = SupportBundleService()
157 >>> service._sanitize_url("postgresql://user:password@localhost/db")
158 'postgresql://user:*****@localhost/db'
159 >>> service._sanitize_url("http://example.com")
160 'http://example.com'
161 """
162 if not url:
163 return None
164 # Remove password from URLs
165 for pattern, replacement in self.SENSITIVE_PATTERNS:
166 url = pattern.sub(replacement, url)
167 return url
169 def _sanitize_line(self, line: str) -> str:
170 """Sanitize a single line of text by removing sensitive data.
172 Args:
173 line: Line to sanitize
175 Returns:
176 str: Sanitized line
178 Examples:
179 >>> service = SupportBundleService()
180 >>> service._sanitize_line('password: secret123')
181 'password: *****'
182 >>> service._sanitize_line('debug: true')
183 'debug: true'
184 """
185 for pattern, replacement in self.SENSITIVE_PATTERNS:
186 line = pattern.sub(replacement, line)
187 return line
189 def _collect_version_info(self) -> Dict[str, Any]:
190 """Collect version and application information.
192 Returns:
193 Dict containing version information
195 Examples:
196 >>> service = SupportBundleService()
197 >>> info = service._collect_version_info()
198 >>> 'app_version' in info
199 True
200 >>> 'python_version' in info
201 True
202 """
203 return {
204 "app_name": settings.app_name,
205 "app_version": __version__,
206 "mcp_protocol_version": settings.protocol_version,
207 "python_version": platform.python_version(),
208 "platform": f"{platform.system()} {platform.release()} ({platform.machine()})",
209 "hostname": self.hostname,
210 "timestamp": self.timestamp.isoformat(),
211 }
213 def _collect_system_info(self) -> Dict[str, Any]:
214 """Collect system diagnostics and metrics.
216 Returns:
217 Dict containing system information
219 Examples:
220 >>> service = SupportBundleService()
221 >>> info = service._collect_system_info()
222 >>> 'platform' in info
223 True
224 """
225 info = {
226 "platform": {
227 "system": platform.system(),
228 "release": platform.release(),
229 "version": platform.version(),
230 "machine": platform.machine(),
231 "processor": platform.processor(),
232 },
233 "python": {
234 "version": platform.python_version(),
235 "implementation": platform.python_implementation(),
236 "compiler": platform.python_compiler(),
237 },
238 "database": {
239 "dialect": engine.dialect.name,
240 "url": self._sanitize_url(settings.database_url),
241 },
242 }
244 # Try to collect psutil metrics if available
245 try:
246 # Third-Party
247 import psutil # pylint: disable=import-outside-toplevel
249 info["system"] = {
250 "cpu_count": psutil.cpu_count(logical=True),
251 "cpu_percent": psutil.cpu_percent(interval=0.1),
252 "memory_total_mb": round(psutil.virtual_memory().total / 1_048_576),
253 "memory_used_mb": round(psutil.virtual_memory().used / 1_048_576),
254 "disk_total_gb": round(psutil.disk_usage("/").total / 1_073_741_824, 2),
255 "disk_used_gb": round(psutil.disk_usage("/").used / 1_073_741_824, 2),
256 }
257 except ImportError:
258 info["system"] = {"note": "psutil not installed, skipping system metrics"}
260 return info
262 def _collect_env_config(self) -> Dict[str, str]:
263 """Collect environment configuration with secrets redacted.
265 Returns:
266 Dict of environment variables (secrets redacted)
268 Examples:
269 >>> service = SupportBundleService()
270 >>> env = service._collect_env_config()
271 >>> 'PATH' in env or len(env) >= 0 # May vary by environment
272 True
273 """
274 return {k: "*****" if self._is_secret(k) else v for k, v in os.environ.items()}
276 def _collect_settings(self) -> Dict[str, Any]:
277 """Collect application settings with secrets redacted.
279 Returns:
280 Dict of application settings
282 Examples:
283 >>> service = SupportBundleService()
284 >>> config = service._collect_settings()
285 >>> 'host' in config
286 True
287 """
288 # Export settings as dict but exclude sensitive fields
289 exclude_fields = {
290 "basic_auth_password",
291 "jwt_secret_key",
292 "auth_encryption_secret",
293 "platform_admin_password",
294 "sso_github_client_secret",
295 "sso_google_client_secret",
296 "sso_ibm_verify_client_secret",
297 "sso_okta_client_secret",
298 "sso_keycloak_client_secret",
299 "sso_entra_client_secret",
300 "sso_generic_client_secret",
301 }
302 config = settings.model_dump(exclude=exclude_fields)
304 # Sanitize URLs
305 if "database_url" in config:
306 config["database_url"] = self._sanitize_url(config["database_url"])
307 if "redis_url" in config:
308 config["redis_url"] = self._sanitize_url(config["redis_url"])
310 return config
312 def _collect_logs(self, config: SupportBundleConfig) -> Dict[str, str]:
313 """Collect log files with sanitization and size limits.
315 Args:
316 config: Bundle configuration
318 Returns:
319 Dict mapping log file names to sanitized content
321 Examples:
322 >>> service = SupportBundleService()
323 >>> config = SupportBundleConfig(log_tail_lines=100)
324 >>> logs = service._collect_logs(config)
325 >>> isinstance(logs, dict)
326 True
327 """
328 logs = {}
330 # Collect main log file
331 log_file = settings.log_file or "mcpgateway.log"
332 log_folder = settings.log_folder or "logs"
333 log_path = Path(log_folder) / log_file
335 if log_path.exists():
336 try:
337 file_size_mb = log_path.stat().st_size / 1_048_576
338 if file_size_mb > config.max_log_size_mb:
339 logs[log_file] = f"[Log file too large: {file_size_mb:.2f} MB > {config.max_log_size_mb} MB limit]\n"
340 else:
341 with open(log_path, "r", encoding="utf-8", errors="ignore") as f:
342 lines = f.readlines()
344 # Tail lines if configured
345 if config.log_tail_lines > 0 and len(lines) > config.log_tail_lines:
346 lines = lines[-config.log_tail_lines :]
347 lines.insert(0, f"[Showing last {config.log_tail_lines} lines]\n")
349 # Sanitize each line
350 sanitized_lines = [self._sanitize_line(line) for line in lines]
351 logs[log_file] = "".join(sanitized_lines)
353 except Exception as e:
354 logs[log_file] = f"[Error reading log file: {e}]\n"
355 else:
356 logs[log_file] = "[Log file not found]\n"
358 return logs
360 def _create_manifest(self, config: SupportBundleConfig) -> Dict[str, Any]:
361 """Create bundle manifest with metadata.
363 Args:
364 config: Bundle configuration
366 Returns:
367 Dict containing bundle manifest
369 Examples:
370 >>> service = SupportBundleService()
371 >>> config = SupportBundleConfig()
372 >>> manifest = service._create_manifest(config)
373 >>> 'bundle_version' in manifest
374 True
375 """
376 return {
377 "bundle_version": "1.0",
378 "generated_at": self.timestamp.isoformat(),
379 "hostname": self.hostname,
380 "app_version": __version__,
381 "configuration": {
382 "include_logs": config.include_logs,
383 "include_env": config.include_env,
384 "include_system_info": config.include_system_info,
385 "log_tail_lines": config.log_tail_lines,
386 },
387 "warning": "This bundle may contain sensitive information. Review before sharing.",
388 }
390 def generate_bundle(self, config: Optional[SupportBundleConfig] = None) -> Path:
391 """Generate a complete support bundle as a ZIP file.
393 Args:
394 config: Optional bundle configuration
396 Returns:
397 Path: Path to the generated ZIP file
399 Examples:
400 >>> from mcpgateway.services.support_bundle_service import SupportBundleService, SupportBundleConfig
401 >>> service = SupportBundleService()
402 >>> config = SupportBundleConfig(log_tail_lines=100, output_dir=Path("/tmp"))
403 >>> bundle_path = service.generate_bundle(config)
404 >>> bundle_path.exists()
405 True
406 >>> bundle_path.name.startswith('mcpgateway-support-')
407 True
408 >>> bundle_path.suffix
409 '.zip'
410 """
411 if config is None:
412 config = SupportBundleConfig()
414 # Determine output directory
415 output_dir = config.output_dir or Path(tempfile.gettempdir())
416 output_dir.mkdir(parents=True, exist_ok=True)
418 # Create timestamped filename
419 timestamp_str = self.timestamp.strftime("%Y-%m-%d-%H%M%S")
420 bundle_filename = f"mcpgateway-support-{timestamp_str}.zip"
421 bundle_path = output_dir / bundle_filename
423 # Create ZIP file
424 with zipfile.ZipFile(bundle_path, "w", zipfile.ZIP_DEFLATED) as zf:
425 # Add manifest
426 manifest = self._create_manifest(config)
427 zf.writestr("MANIFEST.json", orjson.dumps(manifest, option=orjson.OPT_INDENT_2))
429 # Add version info
430 version_info = self._collect_version_info()
431 zf.writestr("version.json", orjson.dumps(version_info, option=orjson.OPT_INDENT_2))
433 # Add system info
434 if config.include_system_info:
435 system_info = self._collect_system_info()
436 zf.writestr("system_info.json", orjson.dumps(system_info, option=orjson.OPT_INDENT_2))
438 # Add settings
439 if config.include_env:
440 app_settings = self._collect_settings()
441 zf.writestr("settings.json", orjson.dumps(app_settings, default=str, option=orjson.OPT_INDENT_2))
443 # Add environment variables
444 env_config = self._collect_env_config()
445 zf.writestr("environment.json", orjson.dumps(env_config, option=orjson.OPT_INDENT_2))
447 # Add logs
448 if config.include_logs:
449 logs = self._collect_logs(config)
450 for log_name, log_content in logs.items():
451 zf.writestr(f"logs/{log_name}", log_content)
453 # Add README
454 readme = f"""# MCP Gateway Support Bundle
456This bundle contains diagnostic information for troubleshooting MCP Gateway issues.
458## Contents
460- MANIFEST.json: Bundle metadata and generation info
461- version.json: Application and dependency versions
462- system_info.json: Platform and system metrics
463- settings.json: Application configuration (secrets redacted)
464- environment.json: Environment variables (secrets redacted)
465- logs/: Application logs (sanitized)
467## Security Notice
469This bundle has been automatically sanitized to remove:
470- Passwords and authentication credentials
471- API keys and tokens
472- JWT secrets
473- Database connection passwords
474- Other sensitive configuration values
476However, please review the contents before sharing with support or external parties.
478## Usage
480Extract the ZIP file and review the JSON files for diagnostic information.
481Pay special attention to logs/ for error messages and stack traces.
483---
484Generated: {self.timestamp.isoformat()}
485Hostname: {self.hostname}
486Version: {__version__}
487"""
489 zf.writestr("README.md", readme)
491 return bundle_path
494def create_support_bundle(config: Optional[SupportBundleConfig] = None) -> Path:
495 """Convenience function to create a support bundle.
497 Args:
498 config: Optional bundle configuration
500 Returns:
501 Path to the generated bundle ZIP file
503 Examples:
504 >>> from mcpgateway.services.support_bundle_service import create_support_bundle, SupportBundleConfig
505 >>> from pathlib import Path
506 >>> import tempfile
507 >>> with tempfile.TemporaryDirectory() as tmpdir:
508 ... config = SupportBundleConfig(log_tail_lines=500, output_dir=Path(tmpdir))
509 ... bundle_path = create_support_bundle(config)
510 ... bundle_path.suffix
511 '.zip'
512 """
513 service = SupportBundleService()
514 return service.generate_bundle(config)