Coverage for mcpgateway / services / support_bundle_service.py: 100%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/support_bundle_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Support Bundle Service - Generate diagnostic bundles for troubleshooting. 

8 

9This module provides functionality to create comprehensive support bundles containing 

10system diagnostics, logs, configuration, and other debugging information with automatic 

11sanitization of sensitive data (passwords, tokens, API keys). 

12 

13Features: 

14- Version and system information collection 

15- Log file collection with size limits and sanitization 

16- Environment configuration with secret redaction 

17- Database connection info (sanitized) 

18- Platform and dependency information 

19- ZIP archive generation with timestamped filenames 

20 

21Examples: 

22 >>> from mcpgateway.services.support_bundle_service import SupportBundleService 

23 >>> service = SupportBundleService() 

24 >>> bundle_path = service.generate_bundle() 

25 >>> bundle_path.exists() 

26 True 

27 >>> bundle_path.name.startswith('mcpgateway-support-') 

28 True 

29 >>> bundle_path.suffix 

30 '.zip' 

31""" 

32 

33# Future 

34from __future__ import annotations 

35 

36# Standard 

37from datetime import datetime, timezone 

38import os 

39from pathlib import Path 

40import platform 

41import re 

42import socket 

43import tempfile 

44from typing import Any, Dict, Optional 

45import zipfile 

46 

47# Third-Party 

48import orjson 

49from pydantic import BaseModel, Field 

50 

51# First-Party 

52from mcpgateway import __version__ 

53from mcpgateway.config import settings 

54from mcpgateway.db import engine 

55 

56 

57class SupportBundleConfig(BaseModel): 

58 """Configuration for support bundle generation. 

59 

60 Attributes: 

61 include_logs: Include log files in bundle 

62 include_env: Include environment configuration 

63 include_system_info: Include system diagnostics 

64 max_log_size_mb: Maximum log file size to include (MB) 

65 log_tail_lines: Number of log lines to include (0 = all) 

66 output_dir: Directory for bundle output 

67 """ 

68 

69 include_logs: bool = Field(default=True, description="Include log files in bundle") 

70 include_env: bool = Field(default=True, description="Include environment configuration") 

71 include_system_info: bool = Field(default=True, description="Include system diagnostics") 

72 max_log_size_mb: float = Field(default=10.0, description="Maximum log file size in MB") 

73 log_tail_lines: int = Field(default=1000, description="Number of log lines to include (0 = all)") 

74 output_dir: Optional[Path] = Field(default=None, description="Output directory for bundle") 

75 

76 

77class SupportBundleService: 

78 """Service for generating support bundles with sanitized diagnostic information. 

79 

80 This service collects system information, logs, and configuration data while 

81 automatically sanitizing sensitive information like passwords, tokens, and API keys. 

82 

83 Examples: 

84 >>> from mcpgateway.services.support_bundle_service import SupportBundleService, SupportBundleConfig 

85 >>> service = SupportBundleService() 

86 >>> config = SupportBundleConfig(log_tail_lines=500) 

87 >>> bundle_path = service.generate_bundle(config) 

88 >>> bundle_path.exists() 

89 True 

90 >>> bundle_path.suffix 

91 '.zip' 

92 """ 

93 

94 # Patterns for sanitizing sensitive data in logs 

95 SENSITIVE_PATTERNS = [ 

96 (re.compile(r'password["\']?\s*[:=]\s*["\']?([^"\'\s,}]+)', re.IGNORECASE), r"password: *****"), 

97 (re.compile(r'token["\']?\s*[:=]\s*["\']?([^"\'\s,}]+)', re.IGNORECASE), r"token: *****"), 

98 (re.compile(r'api[_-]?key["\']?\s*[:=]\s*["\']?([^"\'\s,}]+)', re.IGNORECASE), r"api_key: *****"), 

99 (re.compile(r'secret["\']?\s*[:=]\s*["\']?([^"\'\s,}]+)', re.IGNORECASE), r"secret: *****"), 

100 (re.compile(r"bearer\s+[A-Za-z0-9\-._~+/]+=*", re.IGNORECASE), r"bearer *****"), 

101 (re.compile(r'authorization:\s*["\']?([^"\'\s,}]+)', re.IGNORECASE), r"authorization: *****"), 

102 # Database URLs 

103 (re.compile(r"(postgresql|mysql|redis)://([^:]+):([^@]+)@"), r"\1://\2:*****@"), 

104 # JWT tokens (eyJ pattern) 

105 (re.compile(r"eyJ[A-Za-z0-9\-_]+\.eyJ[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+"), r"eyJ*****"), 

106 ] 

107 

108 def __init__(self): 

109 """Initialize the support bundle service.""" 

110 self.hostname = socket.gethostname() 

111 self.timestamp = datetime.now(timezone.utc) 

112 

113 def _is_secret(self, key: str) -> bool: 

114 """Check if an environment variable key represents a secret. 

115 

116 Args: 

117 key: Environment variable name 

118 

119 Returns: 

120 bool: True if the key likely contains sensitive data 

121 

122 Examples: 

123 >>> service = SupportBundleService() 

124 >>> service._is_secret("DATABASE_PASSWORD") 

125 True 

126 >>> service._is_secret("API_KEY") 

127 True 

128 >>> service._is_secret("DEBUG") 

129 False 

130 """ 

131 key_upper = key.upper() 

132 # Check for common secret keywords 

133 if any(tok in key_upper for tok in ("SECRET", "TOKEN", "PASS", "KEY")): 

134 return True 

135 # Check for specific secret environment variables 

136 secret_vars = { 

137 "BASIC_AUTH_USER", 

138 "BASIC_AUTH_PASSWORD", 

139 "DATABASE_URL", 

140 "REDIS_URL", 

141 "JWT_SECRET_KEY", 

142 "AUTH_ENCRYPTION_SECRET", 

143 } 

144 return key_upper in secret_vars 

145 

146 def _sanitize_url(self, url: Optional[str]) -> Optional[str]: 

147 """Redact credentials from URLs. 

148 

149 Args: 

150 url: URL to sanitize 

151 

152 Returns: 

153 Optional[str]: Sanitized URL or None 

154 

155 Examples: 

156 >>> service = SupportBundleService() 

157 >>> service._sanitize_url("postgresql://user:password@localhost/db") 

158 'postgresql://user:*****@localhost/db' 

159 >>> service._sanitize_url("http://example.com") 

160 'http://example.com' 

161 """ 

162 if not url: 

163 return None 

164 # Remove password from URLs 

165 for pattern, replacement in self.SENSITIVE_PATTERNS: 

166 url = pattern.sub(replacement, url) 

167 return url 

168 

169 def _sanitize_line(self, line: str) -> str: 

170 """Sanitize a single line of text by removing sensitive data. 

171 

172 Args: 

173 line: Line to sanitize 

174 

175 Returns: 

176 str: Sanitized line 

177 

178 Examples: 

179 >>> service = SupportBundleService() 

180 >>> service._sanitize_line('password: secret123') 

181 'password: *****' 

182 >>> service._sanitize_line('debug: true') 

183 'debug: true' 

184 """ 

185 for pattern, replacement in self.SENSITIVE_PATTERNS: 

186 line = pattern.sub(replacement, line) 

187 return line 

188 

189 def _collect_version_info(self) -> Dict[str, Any]: 

190 """Collect version and application information. 

191 

192 Returns: 

193 Dict containing version information 

194 

195 Examples: 

196 >>> service = SupportBundleService() 

197 >>> info = service._collect_version_info() 

198 >>> 'app_version' in info 

199 True 

200 >>> 'python_version' in info 

201 True 

202 """ 

203 return { 

204 "app_name": settings.app_name, 

205 "app_version": __version__, 

206 "mcp_protocol_version": settings.protocol_version, 

207 "python_version": platform.python_version(), 

208 "platform": f"{platform.system()} {platform.release()} ({platform.machine()})", 

209 "hostname": self.hostname, 

210 "timestamp": self.timestamp.isoformat(), 

211 } 

212 

213 def _collect_system_info(self) -> Dict[str, Any]: 

214 """Collect system diagnostics and metrics. 

215 

216 Returns: 

217 Dict containing system information 

218 

219 Examples: 

220 >>> service = SupportBundleService() 

221 >>> info = service._collect_system_info() 

222 >>> 'platform' in info 

223 True 

224 """ 

225 info = { 

226 "platform": { 

227 "system": platform.system(), 

228 "release": platform.release(), 

229 "version": platform.version(), 

230 "machine": platform.machine(), 

231 "processor": platform.processor(), 

232 }, 

233 "python": { 

234 "version": platform.python_version(), 

235 "implementation": platform.python_implementation(), 

236 "compiler": platform.python_compiler(), 

237 }, 

238 "database": { 

239 "dialect": engine.dialect.name, 

240 "url": self._sanitize_url(settings.database_url), 

241 }, 

242 } 

243 

244 # Try to collect psutil metrics if available 

245 try: 

246 # Third-Party 

247 import psutil # pylint: disable=import-outside-toplevel 

248 

249 info["system"] = { 

250 "cpu_count": psutil.cpu_count(logical=True), 

251 "cpu_percent": psutil.cpu_percent(interval=0.1), 

252 "memory_total_mb": round(psutil.virtual_memory().total / 1_048_576), 

253 "memory_used_mb": round(psutil.virtual_memory().used / 1_048_576), 

254 "disk_total_gb": round(psutil.disk_usage("/").total / 1_073_741_824, 2), 

255 "disk_used_gb": round(psutil.disk_usage("/").used / 1_073_741_824, 2), 

256 } 

257 except ImportError: 

258 info["system"] = {"note": "psutil not installed, skipping system metrics"} 

259 

260 return info 

261 

262 def _collect_env_config(self) -> Dict[str, str]: 

263 """Collect environment configuration with secrets redacted. 

264 

265 Returns: 

266 Dict of environment variables (secrets redacted) 

267 

268 Examples: 

269 >>> service = SupportBundleService() 

270 >>> env = service._collect_env_config() 

271 >>> 'PATH' in env or len(env) >= 0 # May vary by environment 

272 True 

273 """ 

274 return {k: "*****" if self._is_secret(k) else v for k, v in os.environ.items()} 

275 

276 def _collect_settings(self) -> Dict[str, Any]: 

277 """Collect application settings with secrets redacted. 

278 

279 Returns: 

280 Dict of application settings 

281 

282 Examples: 

283 >>> service = SupportBundleService() 

284 >>> config = service._collect_settings() 

285 >>> 'host' in config 

286 True 

287 """ 

288 # Export settings as dict but exclude sensitive fields 

289 exclude_fields = { 

290 "basic_auth_password", 

291 "jwt_secret_key", 

292 "auth_encryption_secret", 

293 "platform_admin_password", 

294 "sso_github_client_secret", 

295 "sso_google_client_secret", 

296 "sso_ibm_verify_client_secret", 

297 "sso_okta_client_secret", 

298 "sso_keycloak_client_secret", 

299 "sso_entra_client_secret", 

300 "sso_generic_client_secret", 

301 } 

302 config = settings.model_dump(exclude=exclude_fields) 

303 

304 # Sanitize URLs 

305 if "database_url" in config: 

306 config["database_url"] = self._sanitize_url(config["database_url"]) 

307 if "redis_url" in config: 

308 config["redis_url"] = self._sanitize_url(config["redis_url"]) 

309 

310 return config 

311 

312 def _collect_logs(self, config: SupportBundleConfig) -> Dict[str, str]: 

313 """Collect log files with sanitization and size limits. 

314 

315 Args: 

316 config: Bundle configuration 

317 

318 Returns: 

319 Dict mapping log file names to sanitized content 

320 

321 Examples: 

322 >>> service = SupportBundleService() 

323 >>> config = SupportBundleConfig(log_tail_lines=100) 

324 >>> logs = service._collect_logs(config) 

325 >>> isinstance(logs, dict) 

326 True 

327 """ 

328 logs = {} 

329 

330 # Collect main log file 

331 log_file = settings.log_file or "mcpgateway.log" 

332 log_folder = settings.log_folder or "logs" 

333 log_path = Path(log_folder) / log_file 

334 

335 if log_path.exists(): 

336 try: 

337 file_size_mb = log_path.stat().st_size / 1_048_576 

338 if file_size_mb > config.max_log_size_mb: 

339 logs[log_file] = f"[Log file too large: {file_size_mb:.2f} MB > {config.max_log_size_mb} MB limit]\n" 

340 else: 

341 with open(log_path, "r", encoding="utf-8", errors="ignore") as f: 

342 lines = f.readlines() 

343 

344 # Tail lines if configured 

345 if config.log_tail_lines > 0 and len(lines) > config.log_tail_lines: 

346 lines = lines[-config.log_tail_lines :] 

347 lines.insert(0, f"[Showing last {config.log_tail_lines} lines]\n") 

348 

349 # Sanitize each line 

350 sanitized_lines = [self._sanitize_line(line) for line in lines] 

351 logs[log_file] = "".join(sanitized_lines) 

352 

353 except Exception as e: 

354 logs[log_file] = f"[Error reading log file: {e}]\n" 

355 else: 

356 logs[log_file] = "[Log file not found]\n" 

357 

358 return logs 

359 

360 def _create_manifest(self, config: SupportBundleConfig) -> Dict[str, Any]: 

361 """Create bundle manifest with metadata. 

362 

363 Args: 

364 config: Bundle configuration 

365 

366 Returns: 

367 Dict containing bundle manifest 

368 

369 Examples: 

370 >>> service = SupportBundleService() 

371 >>> config = SupportBundleConfig() 

372 >>> manifest = service._create_manifest(config) 

373 >>> 'bundle_version' in manifest 

374 True 

375 """ 

376 return { 

377 "bundle_version": "1.0", 

378 "generated_at": self.timestamp.isoformat(), 

379 "hostname": self.hostname, 

380 "app_version": __version__, 

381 "configuration": { 

382 "include_logs": config.include_logs, 

383 "include_env": config.include_env, 

384 "include_system_info": config.include_system_info, 

385 "log_tail_lines": config.log_tail_lines, 

386 }, 

387 "warning": "This bundle may contain sensitive information. Review before sharing.", 

388 } 

389 

390 def generate_bundle(self, config: Optional[SupportBundleConfig] = None) -> Path: 

391 """Generate a complete support bundle as a ZIP file. 

392 

393 Args: 

394 config: Optional bundle configuration 

395 

396 Returns: 

397 Path: Path to the generated ZIP file 

398 

399 Examples: 

400 >>> from mcpgateway.services.support_bundle_service import SupportBundleService, SupportBundleConfig 

401 >>> service = SupportBundleService() 

402 >>> config = SupportBundleConfig(log_tail_lines=100, output_dir=Path("/tmp")) 

403 >>> bundle_path = service.generate_bundle(config) 

404 >>> bundle_path.exists() 

405 True 

406 >>> bundle_path.name.startswith('mcpgateway-support-') 

407 True 

408 >>> bundle_path.suffix 

409 '.zip' 

410 """ 

411 if config is None: 

412 config = SupportBundleConfig() 

413 

414 # Determine output directory 

415 output_dir = config.output_dir or Path(tempfile.gettempdir()) 

416 output_dir.mkdir(parents=True, exist_ok=True) 

417 

418 # Create timestamped filename 

419 timestamp_str = self.timestamp.strftime("%Y-%m-%d-%H%M%S") 

420 bundle_filename = f"mcpgateway-support-{timestamp_str}.zip" 

421 bundle_path = output_dir / bundle_filename 

422 

423 # Create ZIP file 

424 with zipfile.ZipFile(bundle_path, "w", zipfile.ZIP_DEFLATED) as zf: 

425 # Add manifest 

426 manifest = self._create_manifest(config) 

427 zf.writestr("MANIFEST.json", orjson.dumps(manifest, option=orjson.OPT_INDENT_2)) 

428 

429 # Add version info 

430 version_info = self._collect_version_info() 

431 zf.writestr("version.json", orjson.dumps(version_info, option=orjson.OPT_INDENT_2)) 

432 

433 # Add system info 

434 if config.include_system_info: 

435 system_info = self._collect_system_info() 

436 zf.writestr("system_info.json", orjson.dumps(system_info, option=orjson.OPT_INDENT_2)) 

437 

438 # Add settings 

439 if config.include_env: 

440 app_settings = self._collect_settings() 

441 zf.writestr("settings.json", orjson.dumps(app_settings, default=str, option=orjson.OPT_INDENT_2)) 

442 

443 # Add environment variables 

444 env_config = self._collect_env_config() 

445 zf.writestr("environment.json", orjson.dumps(env_config, option=orjson.OPT_INDENT_2)) 

446 

447 # Add logs 

448 if config.include_logs: 

449 logs = self._collect_logs(config) 

450 for log_name, log_content in logs.items(): 

451 zf.writestr(f"logs/{log_name}", log_content) 

452 

453 # Add README 

454 readme = f"""# MCP Gateway Support Bundle 

455 

456This bundle contains diagnostic information for troubleshooting MCP Gateway issues. 

457 

458## Contents 

459 

460- MANIFEST.json: Bundle metadata and generation info 

461- version.json: Application and dependency versions 

462- system_info.json: Platform and system metrics 

463- settings.json: Application configuration (secrets redacted) 

464- environment.json: Environment variables (secrets redacted) 

465- logs/: Application logs (sanitized) 

466 

467## Security Notice 

468 

469This bundle has been automatically sanitized to remove: 

470- Passwords and authentication credentials 

471- API keys and tokens 

472- JWT secrets 

473- Database connection passwords 

474- Other sensitive configuration values 

475 

476However, please review the contents before sharing with support or external parties. 

477 

478## Usage 

479 

480Extract the ZIP file and review the JSON files for diagnostic information. 

481Pay special attention to logs/ for error messages and stack traces. 

482 

483--- 

484Generated: {self.timestamp.isoformat()} 

485Hostname: {self.hostname} 

486Version: {__version__} 

487""" 

488 

489 zf.writestr("README.md", readme) 

490 

491 return bundle_path 

492 

493 

494def create_support_bundle(config: Optional[SupportBundleConfig] = None) -> Path: 

495 """Convenience function to create a support bundle. 

496 

497 Args: 

498 config: Optional bundle configuration 

499 

500 Returns: 

501 Path to the generated bundle ZIP file 

502 

503 Examples: 

504 >>> from mcpgateway.services.support_bundle_service import create_support_bundle, SupportBundleConfig 

505 >>> from pathlib import Path 

506 >>> import tempfile 

507 >>> with tempfile.TemporaryDirectory() as tmpdir: 

508 ... config = SupportBundleConfig(log_tail_lines=500, output_dir=Path(tmpdir)) 

509 ... bundle_path = create_support_bundle(config) 

510 ... bundle_path.suffix 

511 '.zip' 

512 """ 

513 service = SupportBundleService() 

514 return service.generate_bundle(config)