Coverage for mcpgateway / services / export_service.py: 96%

340 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2# pylint: disable=import-outside-toplevel,no-name-in-module 

3"""Location: ./mcpgateway/services/export_service.py 

4Copyright 2025 

5SPDX-License-Identifier: Apache-2.0 

6Authors: Mihai Criveti 

7 

8Export Service Implementation. 

9This module implements comprehensive configuration export functionality according to the export specification. 

10It handles: 

11- Entity collection from all entity types (Tools, Gateways, Servers, Prompts, Resources, Roots) 

12- Secure authentication data encryption using AES-256-GCM 

13- Dependency resolution and inclusion 

14- Filtering by entity types, tags, and active/inactive status 

15- Export format validation and schema compliance 

16- Only exports locally configured entities (not federated content) 

17""" 

18 

19# Standard 

20from datetime import datetime, timezone 

21import logging 

22from typing import Any, cast, Dict, List, Optional, TypedDict 

23 

24# Third-Party 

25from sqlalchemy import or_, select 

26from sqlalchemy.orm import selectinload, Session 

27 

28# First-Party 

29from mcpgateway.config import settings 

30from mcpgateway.db import Gateway as DbGateway 

31from mcpgateway.db import Prompt as DbPrompt 

32from mcpgateway.db import Resource as DbResource 

33from mcpgateway.db import Server as DbServer 

34from mcpgateway.db import Tool as DbTool 

35 

36# Service singletons are imported lazily in __init__ to avoid circular imports 

37 

38logger = logging.getLogger(__name__) 

39 

40 

41class ExportError(Exception): 

42 """Base class for export-related errors. 

43 

44 Examples: 

45 >>> try: 

46 ... raise ExportError("General export error") 

47 ... except ExportError as e: 

48 ... str(e) 

49 'General export error' 

50 >>> try: 

51 ... raise ExportError("Export failed") 

52 ... except Exception as e: 

53 ... isinstance(e, ExportError) 

54 True 

55 """ 

56 

57 

58class ExportValidationError(ExportError): 

59 """Raised when export data validation fails. 

60 

61 Examples: 

62 >>> try: 

63 ... raise ExportValidationError("Invalid export format") 

64 ... except ExportValidationError as e: 

65 ... str(e) 

66 'Invalid export format' 

67 >>> try: 

68 ... raise ExportValidationError("Schema validation failed") 

69 ... except ExportError as e: 

70 ... isinstance(e, ExportError) # Should inherit from ExportError 

71 True 

72 >>> try: 

73 ... raise ExportValidationError("Missing required field") 

74 ... except Exception as e: 

75 ... isinstance(e, ExportValidationError) 

76 True 

77 """ 

78 

79 

80class ExportService: 

81 """Service for exporting MCP Gateway configuration and data. 

82 

83 This service provides comprehensive export functionality including: 

84 - Collection of all entity types (tools, gateways, servers, prompts, resources, roots) 

85 - Secure handling of authentication data with encryption 

86 - Dependency resolution between entities 

87 - Filtering options (by type, tags, status) 

88 - Export format validation 

89 

90 The service only exports locally configured entities, excluding dynamic content 

91 from federated sources to ensure exports contain only configuration data. 

92 

93 Examples: 

94 >>> service = ExportService() 

95 >>> hasattr(service, 'gateway_service') 

96 True 

97 >>> hasattr(service, 'tool_service') 

98 True 

99 >>> hasattr(service, 'resource_service') 

100 True 

101 >>> # Test entity type validation 

102 >>> valid_types = ["tools", "gateways", "servers", "prompts", "resources", "roots"] 

103 >>> "tools" in valid_types 

104 True 

105 >>> "invalid_type" in valid_types 

106 False 

107 >>> # Test filtering logic 

108 >>> include_types = ["tools", "servers"] 

109 >>> exclude_types = ["gateways"] 

110 >>> "tools" in include_types and "tools" not in exclude_types 

111 True 

112 >>> "gateways" in include_types and "gateways" not in exclude_types 

113 False 

114 >>> # Test tag filtering 

115 >>> entity_tags = ["production", "api"] 

116 >>> filter_tags = ["production"] 

117 >>> any(tag in entity_tags for tag in filter_tags) 

118 True 

119 >>> filter_tags = ["development"] 

120 >>> any(tag in entity_tags for tag in filter_tags) 

121 False 

122 """ 

123 

124 def __init__(self): 

125 """Initialize the export service with required dependencies.""" 

126 # Use globally-exported singletons from service modules so they 

127 # share initialized EventService/Redis clients created at app startup. 

128 # Import lazily to avoid circular import at module load time. 

129 # First-Party 

130 from mcpgateway.services.gateway_service import gateway_service 

131 from mcpgateway.services.prompt_service import prompt_service 

132 from mcpgateway.services.resource_service import resource_service 

133 from mcpgateway.services.root_service import root_service 

134 from mcpgateway.services.server_service import server_service 

135 from mcpgateway.services.tool_service import tool_service 

136 

137 self.gateway_service = gateway_service 

138 self.tool_service = tool_service 

139 self.resource_service = resource_service 

140 self.prompt_service = prompt_service 

141 self.server_service = server_service 

142 self.root_service = root_service 

143 

144 async def initialize(self) -> None: 

145 """Initialize the export service.""" 

146 logger.info("Export service initialized") 

147 

148 async def shutdown(self) -> None: 

149 """Shutdown the export service.""" 

150 logger.info("Export service shutdown") 

151 

152 async def _fetch_all_tools(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Any]: 

153 """Fetch all tools by following pagination cursors. 

154 

155 Args: 

156 db: Database session 

157 tags: Filter by tags 

158 include_inactive: Include inactive tools 

159 

160 Returns: 

161 List of all tools across all pages 

162 """ 

163 all_tools = [] 

164 cursor = None 

165 while True: 

166 tools, next_cursor = await self.tool_service.list_tools(db, tags=tags, include_inactive=include_inactive, cursor=cursor) 

167 all_tools.extend(tools) 

168 if not next_cursor: 

169 break 

170 cursor = next_cursor 

171 return all_tools 

172 

173 async def _fetch_all_prompts(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Any]: 

174 """Fetch all prompts by following pagination cursors. 

175 

176 Args: 

177 db: Database session 

178 tags: Filter by tags 

179 include_inactive: Include inactive prompts 

180 

181 Returns: 

182 List of all prompts across all pages 

183 """ 

184 all_prompts = [] 

185 cursor = None 

186 while True: 

187 prompts, next_cursor = await self.prompt_service.list_prompts(db, tags=tags, include_inactive=include_inactive, cursor=cursor) 

188 all_prompts.extend(prompts) 

189 if not next_cursor: 

190 break 

191 cursor = next_cursor 

192 return all_prompts 

193 

194 async def _fetch_all_resources(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Any]: 

195 """Fetch all resources by following pagination cursors. 

196 

197 Args: 

198 db: Database session 

199 tags: Filter by tags 

200 include_inactive: Include inactive resources 

201 

202 Returns: 

203 List of all resources across all pages 

204 """ 

205 all_resources = [] 

206 cursor = None 

207 while True: 

208 resources, next_cursor = await self.resource_service.list_resources(db, tags=tags, include_inactive=include_inactive, cursor=cursor) 

209 all_resources.extend(resources) 

210 if not next_cursor: 

211 break 

212 cursor = next_cursor 

213 return all_resources 

214 

215 async def _fetch_all_gateways(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Any]: 

216 """Fetch all gateways by following pagination cursors. 

217 

218 Args: 

219 db: Database session 

220 tags: Filter by tags 

221 include_inactive: Include inactive gateways 

222 

223 Returns: 

224 List of all gateways across all pages 

225 """ 

226 all_gateways = [] 

227 cursor = None 

228 while True: 

229 gateways, next_cursor = await self.gateway_service.list_gateways(db, tags=tags, include_inactive=include_inactive, cursor=cursor) 

230 all_gateways.extend(gateways) 

231 if not next_cursor: 

232 break 

233 cursor = next_cursor 

234 return all_gateways 

235 

236 async def _fetch_all_servers(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Any]: 

237 """Fetch all servers by following pagination cursors. 

238 

239 Args: 

240 db: Database session 

241 tags: Filter by tags 

242 include_inactive: Include inactive servers 

243 

244 Returns: 

245 List of all servers across all pages 

246 """ 

247 all_servers = [] 

248 cursor = None 

249 while True: 

250 servers, next_cursor = await self.server_service.list_servers(db, tags=tags, include_inactive=include_inactive, cursor=cursor) 

251 all_servers.extend(servers) 

252 if not next_cursor: 

253 break 

254 cursor = next_cursor 

255 return all_servers 

256 

257 async def export_configuration( 

258 self, 

259 db: Session, 

260 include_types: Optional[List[str]] = None, 

261 exclude_types: Optional[List[str]] = None, 

262 tags: Optional[List[str]] = None, 

263 include_inactive: bool = False, 

264 include_dependencies: bool = True, 

265 exported_by: str = "system", 

266 root_path: str = "", 

267 ) -> Dict[str, Any]: 

268 """Export complete gateway configuration to a standardized format. 

269 

270 Args: 

271 db: Database session 

272 include_types: List of entity types to include (tools, gateways, servers, prompts, resources, roots) 

273 exclude_types: List of entity types to exclude 

274 tags: Filter entities by tags (only export entities with these tags) 

275 include_inactive: Whether to include inactive entities 

276 include_dependencies: Whether to include dependent entities automatically 

277 exported_by: Username of the person performing the export 

278 root_path: Root path for constructing API endpoints 

279 

280 Returns: 

281 Dict containing the complete export data in the specified schema format 

282 

283 Raises: 

284 ExportError: If export fails 

285 ExportValidationError: If validation fails 

286 """ 

287 try: 

288 logger.info(f"Starting configuration export by {exported_by}") 

289 

290 # Determine which entity types to include 

291 all_types = ["tools", "gateways", "servers", "prompts", "resources", "roots"] 

292 if include_types: 

293 entity_types = [t.lower() for t in include_types if t.lower() in all_types] 

294 else: 

295 entity_types = all_types 

296 

297 if exclude_types: 

298 entity_types = [t for t in entity_types if t.lower() not in [e.lower() for e in exclude_types]] 

299 

300 class ExportOptions(TypedDict, total=False): 

301 """Options that control export behavior (full export).""" 

302 

303 include_inactive: bool 

304 include_dependencies: bool 

305 selected_types: List[str] 

306 filter_tags: List[str] 

307 

308 class ExportMetadata(TypedDict): 

309 """Metadata for full export including counts, dependencies, and options.""" 

310 

311 entity_counts: Dict[str, int] 

312 dependencies: Dict[str, Any] 

313 export_options: ExportOptions 

314 

315 class ExportData(TypedDict): 

316 """Top-level full export payload shape.""" 

317 

318 version: str 

319 exported_at: str 

320 exported_by: str 

321 source_gateway: str 

322 encryption_method: str 

323 entities: Dict[str, List[Dict[str, Any]]] 

324 metadata: ExportMetadata 

325 

326 entities: Dict[str, List[Dict[str, Any]]] = {} 

327 metadata: ExportMetadata = { 

328 "entity_counts": {}, 

329 "dependencies": {}, 

330 "export_options": { 

331 "include_inactive": include_inactive, 

332 "include_dependencies": include_dependencies, 

333 "selected_types": entity_types, 

334 "filter_tags": tags or [], 

335 }, 

336 } 

337 

338 export_data: ExportData = { 

339 "version": settings.protocol_version, 

340 "exported_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), 

341 "exported_by": exported_by, 

342 "source_gateway": f"http://{settings.host}:{settings.port}", 

343 "encryption_method": "AES-256-GCM", 

344 "entities": entities, 

345 "metadata": metadata, 

346 } 

347 

348 # Export each entity type 

349 if "tools" in entity_types: 349 ↛ 352line 349 didn't jump to line 352 because the condition on line 349 was always true

350 export_data["entities"]["tools"] = await self._export_tools(db, tags, include_inactive) 

351 

352 if "gateways" in entity_types: 352 ↛ 355line 352 didn't jump to line 355 because the condition on line 352 was always true

353 export_data["entities"]["gateways"] = await self._export_gateways(db, tags, include_inactive) 

354 

355 if "servers" in entity_types: 

356 export_data["entities"]["servers"] = await self._export_servers(db, tags, include_inactive, root_path) 

357 

358 if "prompts" in entity_types: 

359 export_data["entities"]["prompts"] = await self._export_prompts(db, tags, include_inactive) 

360 

361 if "resources" in entity_types: 

362 export_data["entities"]["resources"] = await self._export_resources(db, tags, include_inactive) 

363 

364 if "roots" in entity_types: 

365 export_data["entities"]["roots"] = await self._export_roots() 

366 

367 # Add dependency information 

368 if include_dependencies: 368 ↛ 372line 368 didn't jump to line 372 because the condition on line 368 was always true

369 export_data["metadata"]["dependencies"] = await self._extract_dependencies(db, export_data["entities"]) 

370 

371 # Calculate entity counts 

372 for entity_type, entities_list in export_data["entities"].items(): 

373 export_data["metadata"]["entity_counts"][entity_type] = len(entities_list) 

374 

375 # Validate export data 

376 self._validate_export_data(cast(Dict[str, Any], export_data)) 

377 

378 logger.info(f"Export completed successfully with {sum(export_data['metadata']['entity_counts'].values())} total entities") 

379 return cast(Dict[str, Any], export_data) 

380 

381 except Exception as e: 

382 logger.error(f"Export failed: {str(e)}") 

383 raise ExportError(f"Failed to export configuration: {str(e)}") 

384 

385 async def _export_tools(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Dict[str, Any]]: 

386 """Export tools with encrypted authentication data. 

387 

388 Uses batch queries to fetch auth data efficiently, avoiding N+1 query patterns. 

389 

390 Args: 

391 db: Database session 

392 tags: Filter by tags 

393 include_inactive: Include inactive tools 

394 

395 Returns: 

396 List of exported tool dictionaries 

397 """ 

398 # Fetch all tools across all pages (bypasses pagination limit) 

399 tools = await self._fetch_all_tools(db, tags, include_inactive) 

400 

401 # Filter to only exportable tools (local REST tools, not MCP tools from gateways) 

402 exportable_tools = [t for t in tools if not (t.integration_type == "MCP" and t.gateway_id)] 

403 

404 # Batch fetch auth data for tools with masked values (single query instead of N queries) 

405 tool_ids_needing_auth = [ 

406 tool.id for tool in exportable_tools if hasattr(tool, "auth") and tool.auth and hasattr(tool.auth, "auth_value") and tool.auth.auth_value == settings.masked_auth_value 

407 ] 

408 

409 auth_data_map: Dict[Any, tuple] = {} 

410 if tool_ids_needing_auth: 

411 db_tools_with_auth = db.execute(select(DbTool.id, DbTool.auth_type, DbTool.auth_value).where(DbTool.id.in_(tool_ids_needing_auth))).all() 

412 auth_data_map = {row[0]: (row[1], row[2]) for row in db_tools_with_auth} 

413 

414 exported_tools = [] 

415 for tool in exportable_tools: 

416 tool_data = { 

417 "name": tool.original_name, # Use original name, not the slugified version 

418 "displayName": tool.displayName, # Export displayName field from ToolRead 

419 "url": str(tool.url), 

420 "integration_type": tool.integration_type, 

421 "request_type": tool.request_type, 

422 "description": tool.description, 

423 "headers": tool.headers or {}, 

424 "input_schema": tool.input_schema or {"type": "object", "properties": {}}, 

425 "output_schema": tool.output_schema, 

426 "annotations": tool.annotations or {}, 

427 "jsonpath_filter": tool.jsonpath_filter, 

428 "tags": tool.tags or [], 

429 "rate_limit": getattr(tool, "rate_limit", None), 

430 "timeout": getattr(tool, "timeout", None), 

431 "is_active": tool.enabled, 

432 "created_at": tool.created_at.isoformat() if hasattr(tool.created_at, "isoformat") and tool.created_at else None, 

433 "updated_at": tool.updated_at.isoformat() if hasattr(tool.updated_at, "isoformat") and tool.updated_at else None, 

434 } 

435 

436 # Handle authentication data securely - use batch-fetched values 

437 if hasattr(tool, "auth") and tool.auth: 

438 auth = tool.auth 

439 if hasattr(auth, "auth_type") and hasattr(auth, "auth_value"): 439 ↛ 452line 439 didn't jump to line 452 because the condition on line 439 was always true

440 if auth.auth_value == settings.masked_auth_value: 

441 # Use batch-fetched auth data 

442 if tool.id in auth_data_map: 442 ↛ 452line 442 didn't jump to line 452 because the condition on line 442 was always true

443 auth_type, auth_value = auth_data_map[tool.id] 

444 if auth_value: 444 ↛ 452line 444 didn't jump to line 452 because the condition on line 444 was always true

445 tool_data["auth_type"] = auth_type 

446 tool_data["auth_value"] = auth_value 

447 else: 

448 # Auth value is not masked, use as-is 

449 tool_data["auth_type"] = auth.auth_type 

450 tool_data["auth_value"] = auth.auth_value 

451 

452 exported_tools.append(tool_data) 

453 

454 return exported_tools 

455 

456 async def _export_gateways(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Dict[str, Any]]: 

457 """Export gateways with encrypted authentication data. 

458 

459 Uses batch queries to fetch auth data efficiently, avoiding N+1 query patterns. 

460 

461 Args: 

462 db: Database session 

463 tags: Filter by tags 

464 include_inactive: Include inactive gateways 

465 

466 Returns: 

467 List of exported gateway dictionaries 

468 """ 

469 # Fetch all gateways across all pages (bypasses pagination limit) 

470 gateways = await self._fetch_all_gateways(db, tags, include_inactive) 

471 

472 # Batch fetch auth data for gateways with masked values (single query instead of N queries) 

473 gateway_ids_needing_auth = [g.id for g in gateways if g.auth_type and g.auth_value == settings.masked_auth_value] 

474 

475 auth_data_map: Dict[Any, tuple] = {} 

476 if gateway_ids_needing_auth: 

477 db_gateways_with_auth = db.execute(select(DbGateway.id, DbGateway.auth_type, DbGateway.auth_value).where(DbGateway.id.in_(gateway_ids_needing_auth))).all() 

478 auth_data_map = {row[0]: (row[1], row[2]) for row in db_gateways_with_auth} 

479 

480 exported_gateways = [] 

481 for gateway in gateways: 

482 gateway_data = { 

483 "name": gateway.name, 

484 "url": str(gateway.url), 

485 "description": gateway.description, 

486 "transport": gateway.transport, 

487 "capabilities": gateway.capabilities or {}, 

488 "health_check": {"url": f"{gateway.url}/health", "interval": 30, "timeout": 10, "retries": 3}, 

489 "is_active": gateway.enabled, 

490 "tags": gateway.tags or [], 

491 "passthrough_headers": gateway.passthrough_headers or [], 

492 } 

493 

494 # Handle authentication data securely - use batch-fetched values 

495 if gateway.auth_type and gateway.auth_value: 

496 if gateway.auth_value == settings.masked_auth_value: 

497 # Use batch-fetched auth data 

498 if gateway.id in auth_data_map: 498 ↛ 508line 498 didn't jump to line 508 because the condition on line 498 was always true

499 auth_type, auth_value = auth_data_map[gateway.id] 

500 if auth_value: 500 ↛ 508line 500 didn't jump to line 508 because the condition on line 500 was always true

501 gateway_data["auth_type"] = auth_type 

502 gateway_data["auth_value"] = auth_value 

503 else: 

504 # Auth value is not masked, use as-is 

505 gateway_data["auth_type"] = gateway.auth_type 

506 gateway_data["auth_value"] = gateway.auth_value 

507 

508 exported_gateways.append(gateway_data) 

509 

510 return exported_gateways 

511 

512 async def _export_servers(self, db: Session, tags: Optional[List[str]], include_inactive: bool, root_path: str = "") -> List[Dict[str, Any]]: 

513 """Export virtual servers with their tool associations. 

514 

515 Args: 

516 db: Database session 

517 tags: Filter by tags 

518 include_inactive: Include inactive servers 

519 root_path: Root path for constructing API endpoints 

520 

521 Returns: 

522 List of exported server dictionaries 

523 """ 

524 # Fetch all servers across all pages (bypasses pagination limit) 

525 servers = await self._fetch_all_servers(db, tags, include_inactive) 

526 exported_servers = [] 

527 

528 for server in servers: 

529 server_data = { 

530 "name": server.name, 

531 "description": server.description, 

532 "tool_ids": list(server.associated_tools), 

533 "sse_endpoint": f"{root_path}/servers/{server.id}/sse", 

534 "websocket_endpoint": f"{root_path}/servers/{server.id}/ws", 

535 "jsonrpc_endpoint": f"{root_path}/servers/{server.id}/jsonrpc", 

536 "capabilities": {"tools": {"list_changed": True}, "prompts": {"list_changed": True}}, 

537 "is_active": getattr(server, "enabled", getattr(server, "is_active", False)), 

538 "tags": server.tags or [], 

539 } 

540 

541 exported_servers.append(server_data) 

542 

543 return exported_servers 

544 

545 async def _export_prompts(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Dict[str, Any]]: 

546 """Export prompts with their templates and schemas. 

547 

548 Args: 

549 db: Database session 

550 tags: Filter by tags 

551 include_inactive: Include inactive prompts 

552 

553 Returns: 

554 List of exported prompt dictionaries 

555 """ 

556 # Fetch all prompts across all pages (bypasses pagination limit) 

557 prompts = await self._fetch_all_prompts(db, tags, include_inactive) 

558 exported_prompts = [] 

559 

560 for prompt in prompts: 

561 input_schema: Dict[str, Any] = {"type": "object", "properties": {}, "required": []} 

562 prompt_data: Dict[str, Any] = { 

563 "name": getattr(prompt, "original_name", None) or prompt.name, 

564 "original_name": getattr(prompt, "original_name", None) or prompt.name, 

565 "custom_name": getattr(prompt, "custom_name", None) or getattr(prompt, "original_name", None) or prompt.name, 

566 "display_name": getattr(prompt, "display_name", None) or getattr(prompt, "custom_name", None) or getattr(prompt, "original_name", None) or prompt.name, 

567 "template": prompt.template, 

568 "description": prompt.description, 

569 "input_schema": input_schema, 

570 "tags": prompt.tags or [], 

571 # Use the new `enabled` attribute on prompt objects but keep export key `is_active` for compatibility 

572 "is_active": getattr(prompt, "enabled", getattr(prompt, "is_active", False)), 

573 } 

574 

575 # Convert arguments to input schema format 

576 if prompt.arguments: 576 ↛ 586line 576 didn't jump to line 586 because the condition on line 576 was always true

577 properties: Dict[str, Any] = {} 

578 required = [] 

579 for arg in prompt.arguments: 

580 properties[arg.name] = {"type": "string", "description": arg.description or ""} 

581 if arg.required: 

582 required.append(arg.name) 

583 input_schema["properties"] = properties 

584 input_schema["required"] = required 

585 

586 exported_prompts.append(prompt_data) 

587 

588 return exported_prompts 

589 

590 async def _export_resources(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Dict[str, Any]]: 

591 """Export resources with their content metadata. 

592 

593 Args: 

594 db: Database session 

595 tags: Filter by tags 

596 include_inactive: Include inactive resources 

597 

598 Returns: 

599 List of exported resource dictionaries 

600 """ 

601 # Fetch all resources across all pages (bypasses pagination limit) 

602 resources = await self._fetch_all_resources(db, tags, include_inactive) 

603 exported_resources = [] 

604 

605 for resource in resources: 

606 resource_data = { 

607 "name": resource.name, 

608 "uri": resource.uri, 

609 "description": resource.description, 

610 "mime_type": resource.mime_type, 

611 "tags": resource.tags or [], 

612 "is_active": getattr(resource, "enabled", getattr(resource, "is_active", False)), 

613 "last_modified": resource.updated_at.isoformat() if resource.updated_at else None, 

614 } 

615 

616 exported_resources.append(resource_data) 

617 

618 return exported_resources 

619 

620 async def _export_roots(self) -> List[Dict[str, Any]]: 

621 """Export filesystem roots. 

622 

623 Returns: 

624 List of exported root dictionaries 

625 """ 

626 roots = await self.root_service.list_roots() 

627 exported_roots = [] 

628 

629 for root in roots: 

630 root_data = {"uri": str(root.uri), "name": root.name} 

631 exported_roots.append(root_data) 

632 

633 return exported_roots 

634 

635 async def _extract_dependencies(self, db: Session, entities: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]: # pylint: disable=unused-argument 

636 """Extract dependency relationships between entities. 

637 

638 Args: 

639 db: Database session 

640 entities: Dictionary of exported entities 

641 

642 Returns: 

643 Dictionary containing dependency mappings 

644 """ 

645 dependencies = {"servers_to_tools": {}, "servers_to_resources": {}, "servers_to_prompts": {}} 

646 

647 # Extract server-to-tool dependencies 

648 if "servers" in entities and "tools" in entities: 

649 for server in entities["servers"]: 

650 if server.get("tool_ids"): 650 ↛ 649line 650 didn't jump to line 649 because the condition on line 650 was always true

651 dependencies["servers_to_tools"][server["name"]] = server["tool_ids"] 

652 

653 return dependencies 

654 

655 def _validate_export_data(self, export_data: Dict[str, Any]) -> None: 

656 """Validate export data against the schema. 

657 

658 Args: 

659 export_data: The export data to validate 

660 

661 Raises: 

662 ExportValidationError: If validation fails 

663 """ 

664 required_fields = ["version", "exported_at", "exported_by", "entities", "metadata"] 

665 

666 for field in required_fields: 

667 if field not in export_data: 

668 raise ExportValidationError(f"Missing required field: {field}") 

669 

670 # Validate version format 

671 if not export_data["version"]: 

672 raise ExportValidationError("Version cannot be empty") 

673 

674 # Validate entities structure 

675 if not isinstance(export_data["entities"], dict): 

676 raise ExportValidationError("Entities must be a dictionary") 

677 

678 # Validate metadata structure 

679 metadata = export_data["metadata"] 

680 if not isinstance(metadata.get("entity_counts"), dict): 

681 raise ExportValidationError("Metadata entity_counts must be a dictionary") 

682 

683 logger.debug("Export data validation passed") 

684 

685 async def export_selective(self, db: Session, entity_selections: Dict[str, List[str]], include_dependencies: bool = True, exported_by: str = "system", root_path: str = "") -> Dict[str, Any]: 

686 """Export specific entities by their IDs/names. 

687 

688 Args: 

689 db: Database session 

690 entity_selections: Dict mapping entity types to lists of IDs/names to export 

691 include_dependencies: Whether to include dependent entities 

692 exported_by: Username of the person performing the export 

693 root_path: Root path for constructing API endpoints 

694 

695 Returns: 

696 Dict containing the selective export data 

697 

698 Example: 

699 entity_selections = { 

700 "tools": ["tool1", "tool2"], 

701 "servers": ["server1"], 

702 "prompts": ["prompt1"] 

703 } 

704 """ 

705 logger.info(f"Starting selective export by {exported_by}") 

706 

707 class SelExportOptions(TypedDict, total=False): 

708 """Options that control behavior for selective export.""" 

709 

710 selective: bool 

711 include_dependencies: bool 

712 selections: Dict[str, List[str]] 

713 

714 class SelExportMetadata(TypedDict): 

715 """Metadata for selective export including counts, dependencies, and options.""" 

716 

717 entity_counts: Dict[str, int] 

718 dependencies: Dict[str, Any] 

719 export_options: SelExportOptions 

720 

721 class SelExportData(TypedDict): 

722 """Top-level selective export payload shape.""" 

723 

724 version: str 

725 exported_at: str 

726 exported_by: str 

727 source_gateway: str 

728 encryption_method: str 

729 entities: Dict[str, List[Dict[str, Any]]] 

730 metadata: SelExportMetadata 

731 

732 sel_entities: Dict[str, List[Dict[str, Any]]] = {} 

733 sel_metadata: SelExportMetadata = { 

734 "entity_counts": {}, 

735 "dependencies": {}, 

736 "export_options": {"selective": True, "include_dependencies": include_dependencies, "selections": entity_selections}, 

737 } 

738 export_data: SelExportData = { 

739 "version": settings.protocol_version, 

740 "exported_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), 

741 "exported_by": exported_by, 

742 "source_gateway": f"http://{settings.host}:{settings.port}", 

743 "encryption_method": "AES-256-GCM", 

744 "entities": sel_entities, 

745 "metadata": sel_metadata, 

746 } 

747 

748 # Export selected entities for each type 

749 for entity_type, selected_ids in entity_selections.items(): 

750 if entity_type == "tools": 

751 export_data["entities"]["tools"] = await self._export_selected_tools(db, selected_ids) 

752 elif entity_type == "gateways": 

753 export_data["entities"]["gateways"] = await self._export_selected_gateways(db, selected_ids) 

754 elif entity_type == "servers": 

755 export_data["entities"]["servers"] = await self._export_selected_servers(db, selected_ids, root_path) 

756 elif entity_type == "prompts": 

757 export_data["entities"]["prompts"] = await self._export_selected_prompts(db, selected_ids) 

758 elif entity_type == "resources": 

759 export_data["entities"]["resources"] = await self._export_selected_resources(db, selected_ids) 

760 elif entity_type == "roots": 760 ↛ 749line 760 didn't jump to line 749 because the condition on line 760 was always true

761 export_data["entities"]["roots"] = await self._export_selected_roots(selected_ids) 

762 

763 # Add dependencies if requested 

764 if include_dependencies: 764 ↛ 768line 764 didn't jump to line 768 because the condition on line 764 was always true

765 export_data["metadata"]["dependencies"] = await self._extract_dependencies(db, export_data["entities"]) 

766 

767 # Calculate entity counts 

768 for entity_type, entities_list in export_data["entities"].items(): 

769 export_data["metadata"]["entity_counts"][entity_type] = len(entities_list) 

770 

771 self._validate_export_data(cast(Dict[str, Any], export_data)) 

772 

773 logger.info(f"Selective export completed with {sum(export_data['metadata']['entity_counts'].values())} entities") 

774 return cast(Dict[str, Any], export_data) 

775 

776 async def _export_selected_tools(self, db: Session, tool_ids: List[str]) -> List[Dict[str, Any]]: 

777 """Export specific tools by their IDs using batch queries. 

778 

779 Uses a single batch query instead of fetching all tools N times. 

780 

781 Args: 

782 db: Database session 

783 tool_ids: List of tool IDs to export 

784 

785 Returns: 

786 List of exported tool dictionaries 

787 """ 

788 if not tool_ids: 

789 return [] 

790 

791 # Batch query for selected tools only 

792 db_tools = db.execute(select(DbTool).where(DbTool.id.in_(tool_ids))).scalars().all() 

793 

794 exported_tools = [] 

795 for db_tool in db_tools: 

796 # Only export local REST tools, not MCP tools from gateways 

797 if db_tool.integration_type == "MCP" and db_tool.gateway_id: 

798 continue 

799 

800 tool_data = { 

801 "name": db_tool.original_name or db_tool.custom_name, 

802 "displayName": db_tool.display_name, 

803 "url": str(db_tool.url) if db_tool.url else None, 

804 "integration_type": db_tool.integration_type, 

805 "request_type": db_tool.request_type, 

806 "description": db_tool.description, 

807 "headers": db_tool.headers or {}, 

808 "input_schema": db_tool.input_schema or {"type": "object", "properties": {}}, 

809 "output_schema": db_tool.output_schema, 

810 "annotations": db_tool.annotations or {}, 

811 "jsonpath_filter": db_tool.jsonpath_filter, 

812 "tags": db_tool.tags or [], 

813 "rate_limit": db_tool.rate_limit, 

814 "timeout": db_tool.timeout, 

815 "is_active": db_tool.is_active, 

816 "created_at": db_tool.created_at.isoformat() if db_tool.created_at else None, 

817 "updated_at": db_tool.updated_at.isoformat() if db_tool.updated_at else None, 

818 } 

819 

820 # Include auth data directly from DB (already have raw values) 

821 if db_tool.auth_type and db_tool.auth_value: 821 ↛ 825line 821 didn't jump to line 825 because the condition on line 821 was always true

822 tool_data["auth_type"] = db_tool.auth_type 

823 tool_data["auth_value"] = db_tool.auth_value 

824 

825 exported_tools.append(tool_data) 

826 

827 return exported_tools 

828 

829 async def _export_selected_gateways(self, db: Session, gateway_ids: List[str]) -> List[Dict[str, Any]]: 

830 """Export specific gateways by their IDs using batch queries. 

831 

832 Uses a single batch query instead of fetching all gateways N times. 

833 

834 Args: 

835 db: Database session 

836 gateway_ids: List of gateway IDs to export 

837 

838 Returns: 

839 List of exported gateway dictionaries 

840 """ 

841 if not gateway_ids: 

842 return [] 

843 

844 # Batch query for selected gateways only 

845 db_gateways = db.execute(select(DbGateway).where(DbGateway.id.in_(gateway_ids))).scalars().all() 

846 

847 exported_gateways = [] 

848 for db_gateway in db_gateways: 

849 gateway_data = { 

850 "name": db_gateway.name, 

851 "url": str(db_gateway.url) if db_gateway.url else None, 

852 "description": db_gateway.description, 

853 "transport": db_gateway.transport, 

854 "capabilities": db_gateway.capabilities or {}, 

855 "health_check": {"url": f"{db_gateway.url}/health", "interval": 30, "timeout": 10, "retries": 3}, 

856 "is_active": db_gateway.is_active, 

857 "tags": db_gateway.tags or [], 

858 "passthrough_headers": db_gateway.passthrough_headers or [], 

859 } 

860 

861 # Include auth data directly from DB (already have raw values) 

862 if db_gateway.auth_type: 862 ↛ 870line 862 didn't jump to line 870 because the condition on line 862 was always true

863 gateway_data["auth_type"] = db_gateway.auth_type 

864 if db_gateway.auth_value: 864 ↛ 867line 864 didn't jump to line 867 because the condition on line 864 was always true

865 gateway_data["auth_value"] = db_gateway.auth_value 

866 # Include query param auth if present 

867 if db_gateway.auth_type == "query_param" and getattr(db_gateway, "auth_query_params", None): 867 ↛ 870line 867 didn't jump to line 870 because the condition on line 867 was always true

868 gateway_data["auth_query_params"] = db_gateway.auth_query_params 

869 

870 exported_gateways.append(gateway_data) 

871 

872 return exported_gateways 

873 

874 async def _export_selected_servers(self, db: Session, server_ids: List[str], root_path: str = "") -> List[Dict[str, Any]]: 

875 """Export specific servers by their IDs using batch queries. 

876 

877 Uses a single batch query instead of fetching all servers N times. 

878 

879 Args: 

880 db: Database session 

881 server_ids: List of server IDs to export 

882 root_path: Root path for constructing API endpoints 

883 

884 Returns: 

885 List of exported server dictionaries 

886 """ 

887 if not server_ids: 

888 return [] 

889 

890 # Batch query for selected servers with eager loading to avoid N+1 queries 

891 db_servers = db.execute(select(DbServer).options(selectinload(DbServer.tools)).where(DbServer.id.in_(server_ids))).scalars().all() 

892 

893 exported_servers = [] 

894 for db_server in db_servers: 

895 # Get associated tool IDs (tools are eagerly loaded) 

896 tool_ids = [str(tool.id) for tool in db_server.tools] if db_server.tools else [] 

897 

898 server_data = { 

899 "name": db_server.name, 

900 "description": db_server.description, 

901 "tool_ids": tool_ids, 

902 "sse_endpoint": f"{root_path}/servers/{db_server.id}/sse", 

903 "websocket_endpoint": f"{root_path}/servers/{db_server.id}/ws", 

904 "jsonrpc_endpoint": f"{root_path}/servers/{db_server.id}/jsonrpc", 

905 "capabilities": {"tools": {"list_changed": True}, "prompts": {"list_changed": True}}, 

906 "is_active": db_server.is_active, 

907 "tags": db_server.tags or [], 

908 } 

909 

910 exported_servers.append(server_data) 

911 

912 return exported_servers 

913 

914 async def _export_selected_prompts(self, db: Session, prompt_names: List[str]) -> List[Dict[str, Any]]: 

915 """Export specific prompts by their identifiers using batch queries. 

916 

917 Uses a single batch query instead of fetching all prompts N times. 

918 

919 Args: 

920 db: Database session 

921 prompt_names: List of prompt IDs or names to export 

922 

923 Returns: 

924 List of exported prompt dictionaries 

925 """ 

926 if not prompt_names: 

927 return [] 

928 

929 # Batch query for selected prompts only 

930 db_prompts = db.execute(select(DbPrompt).where(or_(DbPrompt.id.in_(prompt_names), DbPrompt.name.in_(prompt_names)))).scalars().all() 

931 

932 exported_prompts = [] 

933 for db_prompt in db_prompts: 

934 # Build input schema from argument_schema 

935 input_schema: Dict[str, Any] = {"type": "object", "properties": {}, "required": []} 

936 if db_prompt.argument_schema: 936 ↛ 939line 936 didn't jump to line 939 because the condition on line 936 was always true

937 input_schema = db_prompt.argument_schema 

938 

939 prompt_data: Dict[str, Any] = { 

940 "name": db_prompt.original_name or db_prompt.name, 

941 "original_name": db_prompt.original_name or db_prompt.name, 

942 "custom_name": db_prompt.custom_name or db_prompt.original_name or db_prompt.name, 

943 "display_name": db_prompt.display_name or db_prompt.custom_name or db_prompt.original_name or db_prompt.name, 

944 "template": db_prompt.template, 

945 "description": db_prompt.description, 

946 "input_schema": input_schema, 

947 "tags": db_prompt.tags or [], 

948 "is_active": getattr(db_prompt, "enabled", getattr(db_prompt, "is_active", False)), 

949 } 

950 

951 exported_prompts.append(prompt_data) 

952 

953 return exported_prompts 

954 

955 async def _export_selected_resources(self, db: Session, resource_uris: List[str]) -> List[Dict[str, Any]]: 

956 """Export specific resources by their URIs using batch queries. 

957 

958 Uses a single batch query instead of fetching all resources N times. 

959 

960 Args: 

961 db: Database session 

962 resource_uris: List of resource URIs to export 

963 

964 Returns: 

965 List of exported resource dictionaries 

966 """ 

967 if not resource_uris: 

968 return [] 

969 

970 # Batch query for selected resources only 

971 db_resources = db.execute(select(DbResource).where(DbResource.uri.in_(resource_uris))).scalars().all() 

972 

973 exported_resources = [] 

974 for db_resource in db_resources: 

975 resource_data = { 

976 "name": db_resource.name, 

977 "uri": db_resource.uri, 

978 "description": db_resource.description, 

979 "mime_type": db_resource.mime_type, 

980 "tags": db_resource.tags or [], 

981 "is_active": db_resource.is_active, 

982 "last_modified": db_resource.updated_at.isoformat() if db_resource.updated_at else None, 

983 } 

984 

985 exported_resources.append(resource_data) 

986 

987 return exported_resources 

988 

989 async def _export_selected_roots(self, root_uris: List[str]) -> List[Dict[str, Any]]: 

990 """Export specific roots by their URIs. 

991 

992 Args: 

993 root_uris: List of root URIs to export 

994 

995 Returns: 

996 List of exported root dictionaries 

997 """ 

998 all_roots = await self._export_roots() 

999 return [r for r in all_roots if r["uri"] in root_uris]