Coverage for mcpgateway / cli_export_import.py: 99%

178 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/cli_export_import.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Export/Import CLI Commands. 

8This module provides CLI commands for exporting and importing MCP Gateway configuration. 

9It implements the export/import CLI functionality according to the specification including: 

10- Complete configuration export with filtering options 

11- Configuration import with conflict resolution strategies 

12- Dry-run validation for imports 

13- Cross-environment key rotation support 

14- Progress reporting and status tracking 

15""" 

16 

17# Standard 

18import argparse 

19import asyncio 

20import base64 

21from datetime import datetime 

22import logging 

23import os 

24from pathlib import Path 

25import sys 

26from typing import Any, Dict, Optional 

27 

28# Third-Party 

29import httpx 

30import orjson 

31 

32# First-Party 

33from mcpgateway import __version__ 

34from mcpgateway.config import settings 

35 

36logger = logging.getLogger(__name__) 

37 

38 

39class CLIError(Exception): 

40 """Base class for CLI-related errors.""" 

41 

42 

43class AuthenticationError(CLIError): 

44 """Raised when authentication fails.""" 

45 

46 

47async def get_auth_token() -> Optional[str]: 

48 """Get authentication token from environment or config. 

49 

50 Preference order: 

51 1. MCPGATEWAY_BEARER_TOKEN environment variable (JWT) - preferred 

52 2. Basic auth fallback (only if API_ALLOW_BASIC_AUTH=true) 

53 

54 Returns: 

55 Authentication token string or None if not configured 

56 """ 

57 # Try environment variable first (preferred) 

58 token = os.getenv("MCPGATEWAY_BEARER_TOKEN") 

59 if token: 

60 return token 

61 

62 # Fallback to basic auth only if enabled and configured 

63 if settings.api_allow_basic_auth and settings.basic_auth_user and settings.basic_auth_password: 

64 creds = base64.b64encode(f"{settings.basic_auth_user}:{settings.basic_auth_password}".encode()).decode() 

65 return f"Basic {creds}" 

66 

67 return None 

68 

69 

70async def make_authenticated_request(method: str, url: str, json_data: Optional[Dict[str, Any]] = None, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 

71 """Make an authenticated HTTP request to the gateway API. 

72 

73 Args: 

74 method: HTTP method (GET, POST, etc.) 

75 url: URL path for the request 

76 json_data: Optional JSON data for request body 

77 params: Optional query parameters 

78 

79 Returns: 

80 JSON response from the API 

81 

82 Raises: 

83 AuthenticationError: If no authentication is configured 

84 CLIError: If the API request fails 

85 """ 

86 token = await get_auth_token() 

87 if not token: 

88 raise AuthenticationError("No authentication configured. Set MCPGATEWAY_BEARER_TOKEN environment variable or configure BASIC_AUTH_USER/BASIC_AUTH_PASSWORD.") 

89 

90 headers = {"Content-Type": "application/json"} 

91 if token.startswith("Basic "): 

92 headers["Authorization"] = token 

93 else: 

94 headers["Authorization"] = f"Bearer {token}" 

95 

96 gateway_url = f"http://{settings.host}:{settings.port}" 

97 full_url = f"{gateway_url}{url}" 

98 

99 # First-Party 

100 from mcpgateway.services.http_client_service import get_isolated_http_client # pylint: disable=import-outside-toplevel 

101 

102 async with get_isolated_http_client(timeout=300.0, headers=headers, connect_timeout=300.0, write_timeout=300.0, pool_timeout=300.0) as client: 

103 try: 

104 response = await client.request(method=method, url=full_url, json=json_data, params=params) 

105 if response.status_code >= 400: 

106 error_text = response.text 

107 raise CLIError(f"API request failed ({response.status_code}): {error_text}") 

108 

109 return response.json() 

110 

111 except httpx.HTTPError as e: 

112 raise CLIError(f"Failed to connect to gateway at {gateway_url}: {str(e)}") 

113 

114 

115async def export_command(args: argparse.Namespace) -> None: 

116 """Execute the export command. 

117 

118 Args: 

119 args: Parsed command line arguments 

120 """ 

121 try: 

122 print(f"Exporting configuration from gateway at http://{settings.host}:{settings.port}") 

123 

124 # Build API parameters 

125 params = {} 

126 if args.types: 

127 params["types"] = args.types 

128 if args.exclude_types: 

129 params["exclude_types"] = args.exclude_types 

130 if args.tags: 

131 params["tags"] = args.tags 

132 if args.include_inactive: 

133 params["include_inactive"] = "true" 

134 if not args.include_dependencies: 

135 params["include_dependencies"] = "false" 

136 

137 # Make export request 

138 export_data = await make_authenticated_request("GET", "/export", params=params) 

139 

140 # Determine output file 

141 if args.output: 

142 output_file = Path(args.output) 

143 else: 

144 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") 

145 output_file = Path(f"mcpgateway-export-{timestamp}.json") 

146 

147 # Write export data 

148 output_file.parent.mkdir(parents=True, exist_ok=True) 

149 await asyncio.to_thread(output_file.write_bytes, orjson.dumps(export_data, option=orjson.OPT_INDENT_2)) 

150 

151 # Print summary 

152 metadata = export_data.get("metadata", {}) 

153 entity_counts = metadata.get("entity_counts", {}) 

154 total_entities = sum(entity_counts.values()) 

155 

156 print("✅ Export completed successfully!") 

157 print(f"📁 Output file: {output_file}") 

158 print(f"📊 Exported {total_entities} total entities:") 

159 for entity_type, count in entity_counts.items(): 

160 if count > 0: 

161 print(f"{entity_type}: {count}") 

162 

163 if args.verbose: 

164 print("\n🔍 Export details:") 

165 print(f" • Version: {export_data.get('version')}") 

166 print(f" • Exported at: {export_data.get('exported_at')}") 

167 print(f" • Exported by: {export_data.get('exported_by')}") 

168 print(f" • Source: {export_data.get('source_gateway')}") 

169 

170 except Exception as e: 

171 print(f"❌ Export failed: {str(e)}", file=sys.stderr) 

172 sys.exit(1) 

173 

174 

175async def import_command(args: argparse.Namespace) -> None: 

176 """Execute the import command. 

177 

178 Args: 

179 args: Parsed command line arguments 

180 """ 

181 try: 

182 input_file = Path(args.input_file) 

183 if not input_file.exists(): 

184 print(f"❌ Input file not found: {input_file}", file=sys.stderr) 

185 sys.exit(1) 

186 

187 print(f"Importing configuration from {input_file}") 

188 

189 # Load import data 

190 content = await asyncio.to_thread(input_file.read_bytes) 

191 import_data = orjson.loads(content) 

192 

193 # Build request data 

194 request_data = { 

195 "import_data": import_data, 

196 "conflict_strategy": args.conflict_strategy, 

197 "dry_run": args.dry_run, 

198 } 

199 

200 if args.rekey_secret: 

201 request_data["rekey_secret"] = args.rekey_secret 

202 

203 if args.include: 

204 # Parse include parameter: "tool:tool1,tool2;server:server1" 

205 selected_entities = {} 

206 for selection in args.include.split(";"): 

207 if ":" in selection: 207 ↛ 206line 207 didn't jump to line 206 because the condition on line 207 was always true

208 entity_type, entity_list = selection.split(":", 1) 

209 entities = [e.strip() for e in entity_list.split(",") if e.strip()] 

210 selected_entities[entity_type] = entities 

211 request_data["selected_entities"] = selected_entities 

212 

213 # Make import request 

214 result = await make_authenticated_request("POST", "/import", json_data=request_data) 

215 

216 # Print results 

217 status = result.get("status", "unknown") 

218 progress = result.get("progress", {}) 

219 

220 if args.dry_run: 

221 print("🔍 Dry-run validation completed!") 

222 else: 

223 print(f"✅ Import {status}!") 

224 

225 print("📊 Results:") 

226 print(f" • Total entities: {progress.get('total', 0)}") 

227 print(f" • Processed: {progress.get('processed', 0)}") 

228 print(f" • Created: {progress.get('created', 0)}") 

229 print(f" • Updated: {progress.get('updated', 0)}") 

230 print(f" • Skipped: {progress.get('skipped', 0)}") 

231 print(f" • Failed: {progress.get('failed', 0)}") 

232 

233 # Show warnings if any 

234 warnings = result.get("warnings", []) 

235 if warnings: 

236 print(f"\n⚠️ Warnings ({len(warnings)}):") 

237 for warning in warnings[:5]: # Show first 5 warnings 

238 print(f"{warning}") 

239 if len(warnings) > 5: 

240 print(f" • ... and {len(warnings) - 5} more warnings") 

241 

242 # Show errors if any 

243 errors = result.get("errors", []) 

244 if errors: 

245 print(f"\n❌ Errors ({len(errors)}):") 

246 for error in errors[:5]: # Show first 5 errors 

247 print(f"{error}") 

248 if len(errors) > 5: 248 ↛ 251line 248 didn't jump to line 251 because the condition on line 248 was always true

249 print(f" • ... and {len(errors) - 5} more errors") 

250 

251 if args.verbose: 

252 print("\n🔍 Import details:") 

253 print(f" • Import ID: {result.get('import_id')}") 

254 print(f" • Started at: {result.get('started_at')}") 

255 print(f" • Completed at: {result.get('completed_at')}") 

256 

257 # Exit with error code if there were failures 

258 if progress.get("failed", 0) > 0: 

259 sys.exit(1) 

260 

261 except Exception as e: 

262 print(f"❌ Import failed: {str(e)}", file=sys.stderr) 

263 sys.exit(1) 

264 

265 

266def create_parser() -> argparse.ArgumentParser: 

267 """Create the argument parser for export/import commands. 

268 

269 Returns: 

270 Configured argument parser 

271 """ 

272 parser = argparse.ArgumentParser(prog="mcpgateway", description="MCP Gateway configuration export/import tool") 

273 

274 parser.add_argument("--version", "-V", action="version", version=f"mcpgateway {__version__}") 

275 

276 subparsers = parser.add_subparsers(dest="command", help="Available commands") 

277 

278 # Export command 

279 export_parser = subparsers.add_parser("export", help="Export gateway configuration") 

280 export_parser.add_argument("--output", "--out", "-o", help="Output file path (default: mcpgateway-export-YYYYMMDD-HHMMSS.json)") 

281 export_parser.add_argument("--types", "--type", help="Comma-separated entity types to include (tools,gateways,servers,prompts,resources,roots)") 

282 export_parser.add_argument("--exclude-types", help="Comma-separated entity types to exclude") 

283 export_parser.add_argument("--tags", help="Comma-separated tags to filter by") 

284 export_parser.add_argument("--include-inactive", action="store_true", help="Include inactive entities in export") 

285 export_parser.add_argument("--no-dependencies", action="store_true", help="Don't include dependent entities") 

286 export_parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") 

287 export_parser.set_defaults(func=export_command, include_dependencies=True) 

288 

289 # Import command 

290 import_parser = subparsers.add_parser("import", help="Import gateway configuration") 

291 import_parser.add_argument("input_file", help="Input file containing export data") 

292 import_parser.add_argument("--conflict-strategy", choices=["skip", "update", "rename", "fail"], default="update", help="How to handle naming conflicts (default: update)") 

293 import_parser.add_argument("--dry-run", action="store_true", help="Validate but don't make changes") 

294 import_parser.add_argument("--rekey-secret", help="New encryption secret for cross-environment imports") 

295 import_parser.add_argument("--include", help="Selective import: entity_type:name1,name2;entity_type2:name3") 

296 import_parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") 

297 import_parser.set_defaults(func=import_command) 

298 

299 return parser 

300 

301 

302def main_with_subcommands() -> None: 

303 """Main CLI entry point with export/import subcommands support.""" 

304 parser = create_parser() 

305 

306 # Check if we have export/import commands 

307 if len(sys.argv) > 1 and sys.argv[1] in ["export", "import"]: 

308 args = parser.parse_args() 

309 

310 if hasattr(args, "func"): 

311 # Handle no-dependencies flag 

312 if hasattr(args, "include_dependencies"): 312 ↛ 316line 312 didn't jump to line 316 because the condition on line 312 was always true

313 args.include_dependencies = not getattr(args, "no_dependencies", False) 

314 

315 # Run the async command 

316 try: 

317 asyncio.run(args.func(args)) 

318 except KeyboardInterrupt: 

319 print("\n❌ Operation cancelled by user", file=sys.stderr) 

320 sys.exit(1) 

321 else: 

322 parser.print_help() 

323 sys.exit(1) 

324 else: 

325 # Fall back to the original uvicorn-based CLI 

326 # First-Party 

327 from mcpgateway.cli import main # pylint: disable=import-outside-toplevel,cyclic-import 

328 

329 main() 

330 

331 

332if __name__ == "__main__": 

333 main_with_subcommands()