Coverage for mcpgateway / services / export_service.py: 100%

398 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2# pylint: disable=import-outside-toplevel,no-name-in-module 

3"""Location: ./mcpgateway/services/export_service.py 

4Copyright 2025 

5SPDX-License-Identifier: Apache-2.0 

6Authors: Mihai Criveti 

7 

8Export Service Implementation. 

9This module implements comprehensive configuration export functionality according to the export specification. 

10It handles: 

11- Entity collection from all entity types (Tools, Gateways, Servers, Prompts, Resources, Roots) 

12- Secure authentication data encryption using AES-256-GCM 

13- Dependency resolution and inclusion 

14- Filtering by entity types, tags, and active/inactive status 

15- Export format validation and schema compliance 

16- Only exports locally configured entities (not federated content) 

17""" 

18 

19# Standard 

20from datetime import datetime, timezone 

21import logging 

22from typing import Any, cast, Dict, List, Optional, TypedDict 

23 

24# Third-Party 

25from sqlalchemy import or_, select 

26from sqlalchemy.orm import selectinload, Session 

27 

28# First-Party 

29from mcpgateway.config import settings 

30from mcpgateway.db import Gateway as DbGateway 

31from mcpgateway.db import Prompt as DbPrompt 

32from mcpgateway.db import Resource as DbResource 

33from mcpgateway.db import Server as DbServer 

34from mcpgateway.db import Tool as DbTool 

35from mcpgateway.utils.services_auth import encode_auth 

36 

37# Service singletons are imported lazily in __init__ to avoid circular imports 

38 

39logger = logging.getLogger(__name__) 

40 

41 

42class ExportError(Exception): 

43 """Base class for export-related errors. 

44 

45 Examples: 

46 >>> try: 

47 ... raise ExportError("General export error") 

48 ... except ExportError as e: 

49 ... str(e) 

50 'General export error' 

51 >>> try: 

52 ... raise ExportError("Export failed") 

53 ... except Exception as e: 

54 ... isinstance(e, ExportError) 

55 True 

56 """ 

57 

58 

59class ExportValidationError(ExportError): 

60 """Raised when export data validation fails. 

61 

62 Examples: 

63 >>> try: 

64 ... raise ExportValidationError("Invalid export format") 

65 ... except ExportValidationError as e: 

66 ... str(e) 

67 'Invalid export format' 

68 >>> try: 

69 ... raise ExportValidationError("Schema validation failed") 

70 ... except ExportError as e: 

71 ... isinstance(e, ExportError) # Should inherit from ExportError 

72 True 

73 >>> try: 

74 ... raise ExportValidationError("Missing required field") 

75 ... except Exception as e: 

76 ... isinstance(e, ExportValidationError) 

77 True 

78 """ 

79 

80 

81class ExportService: 

82 """Service for exporting ContextForge configuration and data. 

83 

84 This service provides comprehensive export functionality including: 

85 - Collection of all entity types (tools, gateways, servers, prompts, resources, roots) 

86 - Secure handling of authentication data with encryption 

87 - Dependency resolution between entities 

88 - Filtering options (by type, tags, status) 

89 - Export format validation 

90 

91 The service only exports locally configured entities, excluding dynamic content 

92 from federated sources to ensure exports contain only configuration data. 

93 

94 Examples: 

95 >>> service = ExportService() 

96 >>> hasattr(service, 'gateway_service') 

97 True 

98 >>> hasattr(service, 'tool_service') 

99 True 

100 >>> hasattr(service, 'resource_service') 

101 True 

102 >>> # Test entity type validation 

103 >>> valid_types = ["tools", "gateways", "servers", "prompts", "resources", "roots"] 

104 >>> "tools" in valid_types 

105 True 

106 >>> "invalid_type" in valid_types 

107 False 

108 >>> # Test filtering logic 

109 >>> include_types = ["tools", "servers"] 

110 >>> exclude_types = ["gateways"] 

111 >>> "tools" in include_types and "tools" not in exclude_types 

112 True 

113 >>> "gateways" in include_types and "gateways" not in exclude_types 

114 False 

115 >>> # Test tag filtering 

116 >>> entity_tags = ["production", "api"] 

117 >>> filter_tags = ["production"] 

118 >>> any(tag in entity_tags for tag in filter_tags) 

119 True 

120 >>> filter_tags = ["development"] 

121 >>> any(tag in entity_tags for tag in filter_tags) 

122 False 

123 """ 

124 

125 def __init__(self): 

126 """Initialize the export service with required dependencies.""" 

127 # Use globally-exported singletons from service modules so they 

128 # share initialized EventService/Redis clients created at app startup. 

129 # Import lazily to avoid circular import at module load time. 

130 # First-Party 

131 from mcpgateway.services.gateway_service import gateway_service 

132 from mcpgateway.services.prompt_service import prompt_service 

133 from mcpgateway.services.resource_service import resource_service 

134 from mcpgateway.services.root_service import root_service 

135 from mcpgateway.services.server_service import server_service 

136 from mcpgateway.services.tool_service import tool_service 

137 

138 self.gateway_service = gateway_service 

139 self.tool_service = tool_service 

140 self.resource_service = resource_service 

141 self.prompt_service = prompt_service 

142 self.server_service = server_service 

143 self.root_service = root_service 

144 

145 async def initialize(self) -> None: 

146 """Initialize the export service.""" 

147 logger.info("Export service initialized") 

148 

149 async def shutdown(self) -> None: 

150 """Shutdown the export service.""" 

151 logger.info("Export service shutdown") 

152 

153 async def _fetch_all_tools(self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Any]: 

154 """Fetch all tools by following pagination cursors with team scoping. 

155 

156 Args: 

157 db: Database session 

158 tags: Filter by tags 

159 include_inactive: Include inactive tools 

160 user_email: Requesting user's email for visibility filtering 

161 token_teams: Token team scope for visibility filtering 

162 

163 Returns: 

164 List of all tools across all pages 

165 """ 

166 all_tools = [] 

167 cursor = None 

168 while True: 

169 tools, next_cursor = await self.tool_service.list_tools(db, tags=tags, include_inactive=include_inactive, cursor=cursor, user_email=user_email, token_teams=token_teams) 

170 all_tools.extend(tools) 

171 if not next_cursor: 

172 break 

173 cursor = next_cursor 

174 return all_tools 

175 

176 async def _fetch_all_prompts(self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Any]: 

177 """Fetch all prompts by following pagination cursors with team scoping. 

178 

179 Args: 

180 db: Database session 

181 tags: Filter by tags 

182 include_inactive: Include inactive prompts 

183 user_email: Requesting user's email for visibility filtering 

184 token_teams: Token team scope for visibility filtering 

185 

186 Returns: 

187 List of all prompts across all pages 

188 """ 

189 all_prompts = [] 

190 cursor = None 

191 while True: 

192 prompts, next_cursor = await self.prompt_service.list_prompts(db, tags=tags, include_inactive=include_inactive, cursor=cursor, user_email=user_email, token_teams=token_teams) 

193 all_prompts.extend(prompts) 

194 if not next_cursor: 

195 break 

196 cursor = next_cursor 

197 return all_prompts 

198 

199 async def _fetch_all_resources(self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Any]: 

200 """Fetch all resources by following pagination cursors with team scoping. 

201 

202 Args: 

203 db: Database session 

204 tags: Filter by tags 

205 include_inactive: Include inactive resources 

206 user_email: Requesting user's email for visibility filtering 

207 token_teams: Token team scope for visibility filtering 

208 

209 Returns: 

210 List of all resources across all pages 

211 """ 

212 all_resources = [] 

213 cursor = None 

214 while True: 

215 resources, next_cursor = await self.resource_service.list_resources(db, tags=tags, include_inactive=include_inactive, cursor=cursor, user_email=user_email, token_teams=token_teams) 

216 all_resources.extend(resources) 

217 if not next_cursor: 

218 break 

219 cursor = next_cursor 

220 return all_resources 

221 

222 async def _fetch_all_gateways(self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Any]: 

223 """Fetch all gateways by following pagination cursors with team scoping. 

224 

225 Args: 

226 db: Database session 

227 tags: Filter by tags 

228 include_inactive: Include inactive gateways 

229 user_email: Requesting user's email for visibility filtering 

230 token_teams: Token team scope for visibility filtering 

231 

232 Returns: 

233 List of all gateways across all pages 

234 """ 

235 all_gateways = [] 

236 cursor = None 

237 while True: 

238 gateways, next_cursor = await self.gateway_service.list_gateways(db, tags=tags, include_inactive=include_inactive, cursor=cursor, user_email=user_email, token_teams=token_teams) 

239 all_gateways.extend(gateways) 

240 if not next_cursor: 

241 break 

242 cursor = next_cursor 

243 return all_gateways 

244 

245 async def _fetch_all_servers(self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Any]: 

246 """Fetch all servers by following pagination cursors with team scoping. 

247 

248 Args: 

249 db: Database session 

250 tags: Filter by tags 

251 include_inactive: Include inactive servers 

252 user_email: Requesting user's email for visibility filtering 

253 token_teams: Token team scope for visibility filtering 

254 

255 Returns: 

256 List of all servers across all pages 

257 """ 

258 all_servers = [] 

259 cursor = None 

260 while True: 

261 servers, next_cursor = await self.server_service.list_servers(db, tags=tags, include_inactive=include_inactive, cursor=cursor, user_email=user_email, token_teams=token_teams) 

262 all_servers.extend(servers) 

263 if not next_cursor: 

264 break 

265 cursor = next_cursor 

266 return all_servers 

267 

268 async def export_configuration( 

269 self, 

270 db: Session, 

271 include_types: Optional[List[str]] = None, 

272 exclude_types: Optional[List[str]] = None, 

273 tags: Optional[List[str]] = None, 

274 include_inactive: bool = False, 

275 include_dependencies: bool = True, 

276 exported_by: str = "system", 

277 root_path: str = "", 

278 user_email: Optional[str] = None, 

279 token_teams: Optional[List[str]] = None, 

280 ) -> Dict[str, Any]: 

281 """Export complete gateway configuration to a standardized format. 

282 

283 Args: 

284 db: Database session 

285 include_types: List of entity types to include (tools, gateways, servers, prompts, resources, roots) 

286 exclude_types: List of entity types to exclude 

287 tags: Filter entities by tags (only export entities with these tags) 

288 include_inactive: Whether to include inactive entities 

289 include_dependencies: Whether to include dependent entities automatically 

290 exported_by: Username of the person performing the export 

291 root_path: Root path for constructing API endpoints 

292 user_email: Requesting user's email for team-scoped visibility filtering 

293 token_teams: Token team scope for visibility filtering (None=admin bypass, []=public-only) 

294 

295 Returns: 

296 Dict containing the complete export data in the specified schema format 

297 

298 Raises: 

299 ExportError: If export fails 

300 ExportValidationError: If validation fails 

301 """ 

302 try: 

303 logger.info(f"Starting configuration export by {exported_by}") 

304 

305 # Determine which entity types to include 

306 all_types = ["tools", "gateways", "servers", "prompts", "resources", "roots"] 

307 if include_types: 

308 entity_types = [t.lower() for t in include_types if t.lower() in all_types] 

309 else: 

310 entity_types = all_types 

311 

312 if exclude_types: 

313 entity_types = [t for t in entity_types if t.lower() not in [e.lower() for e in exclude_types]] 

314 

315 class ExportOptions(TypedDict, total=False): 

316 """Options that control export behavior (full export).""" 

317 

318 include_inactive: bool 

319 include_dependencies: bool 

320 selected_types: List[str] 

321 filter_tags: List[str] 

322 

323 class ExportMetadata(TypedDict): 

324 """Metadata for full export including counts, dependencies, and options.""" 

325 

326 entity_counts: Dict[str, int] 

327 dependencies: Dict[str, Any] 

328 export_options: ExportOptions 

329 

330 class ExportData(TypedDict): 

331 """Top-level full export payload shape.""" 

332 

333 version: str 

334 exported_at: str 

335 exported_by: str 

336 source_gateway: str 

337 encryption_method: str 

338 entities: Dict[str, List[Dict[str, Any]]] 

339 metadata: ExportMetadata 

340 

341 entities: Dict[str, List[Dict[str, Any]]] = {} 

342 metadata: ExportMetadata = { 

343 "entity_counts": {}, 

344 "dependencies": {}, 

345 "export_options": { 

346 "include_inactive": include_inactive, 

347 "include_dependencies": include_dependencies, 

348 "selected_types": entity_types, 

349 "filter_tags": tags or [], 

350 }, 

351 } 

352 

353 export_data: ExportData = { 

354 "version": settings.protocol_version, 

355 "exported_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), 

356 "exported_by": exported_by, 

357 "source_gateway": f"http://{settings.host}:{settings.port}", 

358 "encryption_method": "AES-256-GCM", 

359 "entities": entities, 

360 "metadata": metadata, 

361 } 

362 

363 # Export each entity type (with team-scoped visibility filtering) 

364 if "tools" in entity_types: 

365 export_data["entities"]["tools"] = await self._export_tools(db, tags, include_inactive, user_email=user_email, token_teams=token_teams) 

366 

367 if "gateways" in entity_types: 

368 export_data["entities"]["gateways"] = await self._export_gateways(db, tags, include_inactive, user_email=user_email, token_teams=token_teams) 

369 

370 if "servers" in entity_types: 

371 export_data["entities"]["servers"] = await self._export_servers(db, tags, include_inactive, root_path, user_email=user_email, token_teams=token_teams) 

372 

373 if "prompts" in entity_types: 

374 export_data["entities"]["prompts"] = await self._export_prompts(db, tags, include_inactive, user_email=user_email, token_teams=token_teams) 

375 

376 if "resources" in entity_types: 

377 export_data["entities"]["resources"] = await self._export_resources(db, tags, include_inactive, user_email=user_email, token_teams=token_teams) 

378 

379 if "roots" in entity_types: 

380 export_data["entities"]["roots"] = await self._export_roots() 

381 

382 # Add dependency information 

383 if include_dependencies: 

384 export_data["metadata"]["dependencies"] = await self._extract_dependencies(db, export_data["entities"]) 

385 

386 # Calculate entity counts 

387 for entity_type, entities_list in export_data["entities"].items(): 

388 export_data["metadata"]["entity_counts"][entity_type] = len(entities_list) 

389 

390 # Validate export data 

391 self._validate_export_data(cast(Dict[str, Any], export_data)) 

392 

393 logger.info(f"Export completed successfully with {sum(export_data['metadata']['entity_counts'].values())} total entities") 

394 return cast(Dict[str, Any], export_data) 

395 

396 except Exception as e: 

397 logger.error(f"Export failed: {str(e)}") 

398 raise ExportError(f"Failed to export configuration: {str(e)}") 

399 

400 async def _export_tools(self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Dict[str, Any]]: 

401 """Export tools with encrypted authentication data. 

402 

403 Uses batch queries to fetch auth data efficiently, avoiding N+1 query patterns. 

404 

405 Args: 

406 db: Database session 

407 tags: Filter by tags 

408 include_inactive: Include inactive tools 

409 user_email: Requesting user's email for visibility filtering 

410 token_teams: Token team scope for visibility filtering 

411 

412 Returns: 

413 List of exported tool dictionaries 

414 """ 

415 # Fetch all tools across all pages (with team-scoped visibility) 

416 tools = await self._fetch_all_tools(db, tags, include_inactive, user_email=user_email, token_teams=token_teams) 

417 

418 # Filter to only exportable tools (local REST tools, not MCP tools from gateways) 

419 exportable_tools = [t for t in tools if not (t.integration_type == "MCP" and t.gateway_id)] 

420 

421 # Batch fetch auth data for tools with masked values (single query instead of N queries) 

422 tool_ids_needing_auth = [ 

423 tool.id for tool in exportable_tools if hasattr(tool, "auth") and tool.auth and hasattr(tool.auth, "auth_value") and tool.auth.auth_value == settings.masked_auth_value 

424 ] 

425 

426 auth_data_map: Dict[Any, tuple] = {} 

427 if tool_ids_needing_auth: 

428 db_tools_with_auth = db.execute(select(DbTool.id, DbTool.auth_type, DbTool.auth_value).where(DbTool.id.in_(tool_ids_needing_auth))).all() 

429 auth_data_map = {row[0]: (row[1], row[2]) for row in db_tools_with_auth} 

430 

431 exported_tools = [] 

432 for tool in exportable_tools: 

433 tool_data = { 

434 "name": tool.original_name, # Use original name, not the slugified version 

435 "displayName": tool.displayName, # Export displayName field from ToolRead 

436 "url": str(tool.url), 

437 "integration_type": tool.integration_type, 

438 "request_type": tool.request_type, 

439 "description": tool.description, 

440 "original_description": tool.original_description, 

441 "headers": tool.headers or {}, 

442 "input_schema": tool.input_schema or {"type": "object", "properties": {}}, 

443 "output_schema": tool.output_schema, 

444 "annotations": tool.annotations or {}, 

445 "jsonpath_filter": tool.jsonpath_filter, 

446 "tags": tool.tags or [], 

447 "rate_limit": getattr(tool, "rate_limit", None), 

448 "timeout": getattr(tool, "timeout", None), 

449 "is_active": tool.enabled, 

450 "created_at": tool.created_at.isoformat() if hasattr(tool.created_at, "isoformat") and tool.created_at else None, 

451 "updated_at": tool.updated_at.isoformat() if hasattr(tool.updated_at, "isoformat") and tool.updated_at else None, 

452 } 

453 

454 # Handle authentication data securely - use batch-fetched values 

455 if hasattr(tool, "auth") and tool.auth: 

456 auth = tool.auth 

457 if hasattr(auth, "auth_type") and hasattr(auth, "auth_value"): 

458 if auth.auth_value == settings.masked_auth_value: 

459 # Use batch-fetched auth data 

460 if tool.id in auth_data_map: 

461 auth_type, auth_value = auth_data_map[tool.id] 

462 if auth_value: 

463 tool_data["auth_type"] = auth_type 

464 tool_data["auth_value"] = auth_value 

465 else: 

466 # Auth value is not masked, use as-is 

467 tool_data["auth_type"] = auth.auth_type 

468 tool_data["auth_value"] = auth.auth_value 

469 

470 exported_tools.append(tool_data) 

471 

472 return exported_tools 

473 

474 async def _export_gateways(self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Dict[str, Any]]: 

475 """Export gateways with encrypted authentication data. 

476 

477 Uses batch queries to fetch auth data efficiently, avoiding N+1 query patterns. 

478 

479 Args: 

480 db: Database session 

481 tags: Filter by tags 

482 include_inactive: Include inactive gateways 

483 user_email: Requesting user's email for visibility filtering 

484 token_teams: Token team scope for visibility filtering 

485 

486 Returns: 

487 List of exported gateway dictionaries 

488 """ 

489 # Fetch all gateways across all pages (with team-scoped visibility) 

490 gateways = await self._fetch_all_gateways(db, tags, include_inactive, user_email=user_email, token_teams=token_teams) 

491 

492 # Batch fetch auth data for gateways with masked values (single query instead of N queries) 

493 gateway_ids_needing_auth = [g.id for g in gateways if g.auth_type and g.auth_value == settings.masked_auth_value] 

494 

495 auth_data_map: Dict[Any, tuple] = {} 

496 if gateway_ids_needing_auth: 

497 db_gateways_with_auth = db.execute(select(DbGateway.id, DbGateway.auth_type, DbGateway.auth_value).where(DbGateway.id.in_(gateway_ids_needing_auth))).all() 

498 auth_data_map = {row[0]: (row[1], row[2]) for row in db_gateways_with_auth} 

499 

500 exported_gateways = [] 

501 for gateway in gateways: 

502 gateway_data = { 

503 "name": gateway.name, 

504 "url": str(gateway.url), 

505 "description": gateway.description, 

506 "transport": gateway.transport, 

507 "capabilities": gateway.capabilities or {}, 

508 "health_check": {"url": f"{gateway.url}/health", "interval": 30, "timeout": 10, "retries": 3}, 

509 "is_active": gateway.enabled, 

510 "tags": gateway.tags or [], 

511 "passthrough_headers": gateway.passthrough_headers or [], 

512 } 

513 

514 # Handle authentication data securely - use batch-fetched values 

515 if gateway.auth_type and gateway.auth_value: 

516 if gateway.auth_value == settings.masked_auth_value: 

517 # Use batch-fetched auth data 

518 if gateway.id in auth_data_map: 

519 auth_type, auth_value = auth_data_map[gateway.id] 

520 if auth_value: 

521 gateway_data["auth_type"] = auth_type 

522 # DbGateway.auth_value is JSON (dict); export format expects encoded string. 

523 gateway_data["auth_value"] = encode_auth(auth_value) if isinstance(auth_value, dict) else auth_value 

524 else: 

525 # Auth value is not masked, use as-is 

526 gateway_data["auth_type"] = gateway.auth_type 

527 gateway_data["auth_value"] = gateway.auth_value 

528 

529 exported_gateways.append(gateway_data) 

530 

531 return exported_gateways 

532 

533 async def _export_servers( 

534 self, db: Session, tags: Optional[List[str]], include_inactive: bool, root_path: str = "", user_email: Optional[str] = None, token_teams: Optional[List[str]] = None 

535 ) -> List[Dict[str, Any]]: 

536 """Export virtual servers with their tool associations. 

537 

538 Args: 

539 db: Database session 

540 tags: Filter by tags 

541 include_inactive: Include inactive servers 

542 root_path: Root path for constructing API endpoints 

543 user_email: Requesting user's email for visibility filtering 

544 token_teams: Token team scope for visibility filtering 

545 

546 Returns: 

547 List of exported server dictionaries 

548 """ 

549 # Fetch all servers across all pages (with team-scoped visibility) 

550 servers = await self._fetch_all_servers(db, tags, include_inactive, user_email=user_email, token_teams=token_teams) 

551 exported_servers = [] 

552 

553 for server in servers: 

554 server_data = { 

555 "name": server.name, 

556 "description": server.description, 

557 "tool_ids": list(server.associated_tools), 

558 "sse_endpoint": f"{root_path}/servers/{server.id}/sse", 

559 "websocket_endpoint": f"{root_path}/servers/{server.id}/ws", 

560 "jsonrpc_endpoint": f"{root_path}/servers/{server.id}/jsonrpc", 

561 "capabilities": {"tools": {"list_changed": True}, "prompts": {"list_changed": True}}, 

562 "is_active": getattr(server, "enabled", getattr(server, "is_active", False)), 

563 "tags": server.tags or [], 

564 } 

565 

566 exported_servers.append(server_data) 

567 

568 return exported_servers 

569 

570 async def _export_prompts(self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Dict[str, Any]]: 

571 """Export prompts with their templates and schemas. 

572 

573 Args: 

574 db: Database session 

575 tags: Filter by tags 

576 include_inactive: Include inactive prompts 

577 user_email: Requesting user's email for visibility filtering 

578 token_teams: Token team scope for visibility filtering 

579 

580 Returns: 

581 List of exported prompt dictionaries 

582 """ 

583 # Fetch all prompts across all pages (with team-scoped visibility) 

584 prompts = await self._fetch_all_prompts(db, tags, include_inactive, user_email=user_email, token_teams=token_teams) 

585 exported_prompts = [] 

586 

587 for prompt in prompts: 

588 input_schema: Dict[str, Any] = {"type": "object", "properties": {}, "required": []} 

589 prompt_data: Dict[str, Any] = { 

590 "name": getattr(prompt, "original_name", None) or prompt.name, 

591 "original_name": getattr(prompt, "original_name", None) or prompt.name, 

592 "custom_name": getattr(prompt, "custom_name", None) or getattr(prompt, "original_name", None) or prompt.name, 

593 "display_name": getattr(prompt, "display_name", None) or getattr(prompt, "custom_name", None) or getattr(prompt, "original_name", None) or prompt.name, 

594 "template": prompt.template, 

595 "description": prompt.description, 

596 "input_schema": input_schema, 

597 "tags": prompt.tags or [], 

598 # Use the new `enabled` attribute on prompt objects but keep export key `is_active` for compatibility 

599 "is_active": getattr(prompt, "enabled", getattr(prompt, "is_active", False)), 

600 } 

601 

602 # Convert arguments to input schema format 

603 if prompt.arguments: 

604 properties: Dict[str, Any] = {} 

605 required = [] 

606 for arg in prompt.arguments: 

607 properties[arg.name] = {"type": "string", "description": arg.description or ""} 

608 if arg.required: 

609 required.append(arg.name) 

610 input_schema["properties"] = properties 

611 input_schema["required"] = required 

612 

613 exported_prompts.append(prompt_data) 

614 

615 return exported_prompts 

616 

617 async def _export_resources( 

618 self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None 

619 ) -> List[Dict[str, Any]]: 

620 """Export resources with their content metadata. 

621 

622 Args: 

623 db: Database session 

624 tags: Filter by tags 

625 include_inactive: Include inactive resources 

626 user_email: Requesting user's email for visibility filtering 

627 token_teams: Token team scope for visibility filtering 

628 

629 Returns: 

630 List of exported resource dictionaries 

631 """ 

632 # Fetch all resources across all pages (with team-scoped visibility) 

633 resources = await self._fetch_all_resources(db, tags, include_inactive, user_email=user_email, token_teams=token_teams) 

634 exported_resources = [] 

635 

636 for resource in resources: 

637 resource_data = { 

638 "name": resource.name, 

639 "uri": resource.uri, 

640 "description": resource.description, 

641 "mime_type": resource.mime_type, 

642 "tags": resource.tags or [], 

643 "is_active": getattr(resource, "enabled", getattr(resource, "is_active", False)), 

644 "last_modified": resource.updated_at.isoformat() if resource.updated_at else None, 

645 } 

646 

647 exported_resources.append(resource_data) 

648 

649 return exported_resources 

650 

651 async def _export_roots(self) -> List[Dict[str, Any]]: 

652 """Export filesystem roots. 

653 

654 Returns: 

655 List of exported root dictionaries 

656 """ 

657 roots = await self.root_service.list_roots() 

658 exported_roots = [] 

659 

660 for root in roots: 

661 root_data = {"uri": str(root.uri), "name": root.name} 

662 exported_roots.append(root_data) 

663 

664 return exported_roots 

665 

666 async def _extract_dependencies(self, db: Session, entities: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]: # pylint: disable=unused-argument 

667 """Extract dependency relationships between entities. 

668 

669 Args: 

670 db: Database session 

671 entities: Dictionary of exported entities 

672 

673 Returns: 

674 Dictionary containing dependency mappings 

675 """ 

676 dependencies = {"servers_to_tools": {}, "servers_to_resources": {}, "servers_to_prompts": {}} 

677 

678 # Extract server-to-tool dependencies 

679 if "servers" in entities and "tools" in entities: 

680 for server in entities["servers"]: 

681 if server.get("tool_ids"): 

682 dependencies["servers_to_tools"][server["name"]] = server["tool_ids"] 

683 

684 return dependencies 

685 

686 def _validate_export_data(self, export_data: Dict[str, Any]) -> None: 

687 """Validate export data against the schema. 

688 

689 Args: 

690 export_data: The export data to validate 

691 

692 Raises: 

693 ExportValidationError: If validation fails 

694 """ 

695 required_fields = ["version", "exported_at", "exported_by", "entities", "metadata"] 

696 

697 for field in required_fields: 

698 if field not in export_data: 

699 raise ExportValidationError(f"Missing required field: {field}") 

700 

701 # Validate version format 

702 if not export_data["version"]: 

703 raise ExportValidationError("Version cannot be empty") 

704 

705 # Validate entities structure 

706 if not isinstance(export_data["entities"], dict): 

707 raise ExportValidationError("Entities must be a dictionary") 

708 

709 # Validate metadata structure 

710 metadata = export_data["metadata"] 

711 if not isinstance(metadata.get("entity_counts"), dict): 

712 raise ExportValidationError("Metadata entity_counts must be a dictionary") 

713 

714 logger.debug("Export data validation passed") 

715 

716 async def export_selective( 

717 self, 

718 db: Session, 

719 entity_selections: Dict[str, List[str]], 

720 include_dependencies: bool = True, 

721 exported_by: str = "system", 

722 root_path: str = "", 

723 user_email: Optional[str] = None, 

724 token_teams: Optional[List[str]] = None, 

725 ) -> Dict[str, Any]: 

726 """Export specific entities by their IDs/names. 

727 

728 Args: 

729 db: Database session 

730 entity_selections: Dict mapping entity types to lists of IDs/names to export 

731 include_dependencies: Whether to include dependent entities 

732 exported_by: Username of the person performing the export 

733 root_path: Root path for constructing API endpoints 

734 user_email: Requesting user's email for team-scoped visibility filtering 

735 token_teams: Token team scope for visibility filtering (None=admin bypass, []=public-only) 

736 

737 Returns: 

738 Dict containing the selective export data 

739 

740 Example: 

741 entity_selections = { 

742 "tools": ["tool1", "tool2"], 

743 "servers": ["server1"], 

744 "prompts": ["prompt1"] 

745 } 

746 """ 

747 logger.info(f"Starting selective export by {exported_by}") 

748 

749 class SelExportOptions(TypedDict, total=False): 

750 """Options that control behavior for selective export.""" 

751 

752 selective: bool 

753 include_dependencies: bool 

754 selections: Dict[str, List[str]] 

755 

756 class SelExportMetadata(TypedDict): 

757 """Metadata for selective export including counts, dependencies, and options.""" 

758 

759 entity_counts: Dict[str, int] 

760 dependencies: Dict[str, Any] 

761 export_options: SelExportOptions 

762 

763 class SelExportData(TypedDict): 

764 """Top-level selective export payload shape.""" 

765 

766 version: str 

767 exported_at: str 

768 exported_by: str 

769 source_gateway: str 

770 encryption_method: str 

771 entities: Dict[str, List[Dict[str, Any]]] 

772 metadata: SelExportMetadata 

773 

774 sel_entities: Dict[str, List[Dict[str, Any]]] = {} 

775 sel_metadata: SelExportMetadata = { 

776 "entity_counts": {}, 

777 "dependencies": {}, 

778 "export_options": {"selective": True, "include_dependencies": include_dependencies, "selections": entity_selections}, 

779 } 

780 export_data: SelExportData = { 

781 "version": settings.protocol_version, 

782 "exported_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), 

783 "exported_by": exported_by, 

784 "source_gateway": f"http://{settings.host}:{settings.port}", 

785 "encryption_method": "AES-256-GCM", 

786 "entities": sel_entities, 

787 "metadata": sel_metadata, 

788 } 

789 

790 # Export selected entities for each type 

791 for entity_type, selected_ids in entity_selections.items(): 

792 if entity_type == "tools": 

793 export_data["entities"]["tools"] = await self._export_selected_tools(db, selected_ids, user_email=user_email, token_teams=token_teams) 

794 elif entity_type == "gateways": 

795 export_data["entities"]["gateways"] = await self._export_selected_gateways(db, selected_ids, user_email=user_email, token_teams=token_teams) 

796 elif entity_type == "servers": 

797 export_data["entities"]["servers"] = await self._export_selected_servers(db, selected_ids, root_path, user_email=user_email, token_teams=token_teams) 

798 elif entity_type == "prompts": 

799 export_data["entities"]["prompts"] = await self._export_selected_prompts(db, selected_ids, user_email=user_email, token_teams=token_teams) 

800 elif entity_type == "resources": 

801 export_data["entities"]["resources"] = await self._export_selected_resources(db, selected_ids, user_email=user_email, token_teams=token_teams) 

802 elif entity_type == "roots": 

803 export_data["entities"]["roots"] = await self._export_selected_roots(selected_ids) 

804 

805 # Add dependencies if requested 

806 if include_dependencies: 

807 export_data["metadata"]["dependencies"] = await self._extract_dependencies(db, export_data["entities"]) 

808 

809 # Calculate entity counts 

810 for entity_type, entities_list in export_data["entities"].items(): 

811 export_data["metadata"]["entity_counts"][entity_type] = len(entities_list) 

812 

813 self._validate_export_data(cast(Dict[str, Any], export_data)) 

814 

815 logger.info(f"Selective export completed with {sum(export_data['metadata']['entity_counts'].values())} entities") 

816 return cast(Dict[str, Any], export_data) 

817 

818 @staticmethod 

819 def _is_scoped_selective_export(user_email: Optional[str], token_teams: Optional[List[str]]) -> bool: 

820 """Return whether selective export should apply visibility filtering. 

821 

822 Args: 

823 user_email: Requesting user's email. 

824 token_teams: Token team scope for visibility filtering. 

825 

826 Returns: 

827 ``True`` when selective export should apply team/public filtering. 

828 """ 

829 return user_email is not None or token_teams is not None 

830 

831 async def _export_selected_tools(self, db: Session, tool_ids: List[str], user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Dict[str, Any]]: 

832 """Export specific tools by their IDs using batch queries. 

833 

834 Uses a single batch query instead of fetching all tools N times. 

835 

836 Args: 

837 db: Database session 

838 tool_ids: List of tool IDs to export 

839 user_email: Requesting user's email for visibility filtering 

840 token_teams: Token team scope for visibility filtering 

841 

842 Returns: 

843 List of exported tool dictionaries 

844 """ 

845 if not tool_ids: 

846 return [] 

847 

848 visible_tool_ids: Optional[set[str]] = None 

849 if self._is_scoped_selective_export(user_email, token_teams): 

850 visible_tools = await self._fetch_all_tools(db, tags=None, include_inactive=True, user_email=user_email, token_teams=token_teams) 

851 visible_tool_ids = {str(tool.id) for tool in visible_tools} 

852 tool_ids = [tool_id for tool_id in tool_ids if tool_id in visible_tool_ids] 

853 if not tool_ids: 

854 return [] 

855 

856 # Batch query for selected tools only 

857 db_tools = db.execute(select(DbTool).where(DbTool.id.in_(tool_ids))).scalars().all() 

858 if visible_tool_ids is not None: 

859 db_tools = [db_tool for db_tool in db_tools if str(db_tool.id) in visible_tool_ids] 

860 

861 exported_tools = [] 

862 for db_tool in db_tools: 

863 # Only export local REST tools, not MCP tools from gateways 

864 if db_tool.integration_type == "MCP" and db_tool.gateway_id: 

865 continue 

866 

867 tool_data = { 

868 "name": db_tool.original_name or db_tool.custom_name, 

869 "displayName": db_tool.display_name, 

870 "url": str(db_tool.url) if db_tool.url else None, 

871 "integration_type": db_tool.integration_type, 

872 "request_type": db_tool.request_type, 

873 "description": db_tool.description, 

874 "headers": db_tool.headers or {}, 

875 "input_schema": db_tool.input_schema or {"type": "object", "properties": {}}, 

876 "output_schema": db_tool.output_schema, 

877 "annotations": db_tool.annotations or {}, 

878 "jsonpath_filter": db_tool.jsonpath_filter, 

879 "tags": db_tool.tags or [], 

880 "rate_limit": getattr(db_tool, "rate_limit", None), 

881 "timeout": getattr(db_tool, "timeout", None), 

882 "is_active": db_tool.enabled, 

883 "created_at": db_tool.created_at.isoformat() if db_tool.created_at else None, 

884 "updated_at": db_tool.updated_at.isoformat() if db_tool.updated_at else None, 

885 } 

886 

887 # Include auth data directly from DB (already have raw values) 

888 if db_tool.auth_type and db_tool.auth_value: 

889 tool_data["auth_type"] = db_tool.auth_type 

890 tool_data["auth_value"] = db_tool.auth_value 

891 

892 exported_tools.append(tool_data) 

893 

894 return exported_tools 

895 

896 async def _export_selected_gateways(self, db: Session, gateway_ids: List[str], user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Dict[str, Any]]: 

897 """Export specific gateways by their IDs using batch queries. 

898 

899 Uses a single batch query instead of fetching all gateways N times. 

900 

901 Args: 

902 db: Database session 

903 gateway_ids: List of gateway IDs to export 

904 user_email: Requesting user's email for visibility filtering 

905 token_teams: Token team scope for visibility filtering 

906 

907 Returns: 

908 List of exported gateway dictionaries 

909 """ 

910 if not gateway_ids: 

911 return [] 

912 

913 visible_gateway_ids: Optional[set[str]] = None 

914 if self._is_scoped_selective_export(user_email, token_teams): 

915 visible_gateways = await self._fetch_all_gateways(db, tags=None, include_inactive=True, user_email=user_email, token_teams=token_teams) 

916 visible_gateway_ids = {str(gateway.id) for gateway in visible_gateways} 

917 gateway_ids = [gateway_id for gateway_id in gateway_ids if gateway_id in visible_gateway_ids] 

918 if not gateway_ids: 

919 return [] 

920 

921 # Batch query for selected gateways only 

922 db_gateways = db.execute(select(DbGateway).where(DbGateway.id.in_(gateway_ids))).scalars().all() 

923 if visible_gateway_ids is not None: 

924 db_gateways = [db_gateway for db_gateway in db_gateways if str(db_gateway.id) in visible_gateway_ids] 

925 

926 exported_gateways = [] 

927 for db_gateway in db_gateways: 

928 gateway_data = { 

929 "name": db_gateway.name, 

930 "url": str(db_gateway.url) if db_gateway.url else None, 

931 "description": db_gateway.description, 

932 "transport": db_gateway.transport, 

933 "capabilities": db_gateway.capabilities or {}, 

934 "health_check": {"url": f"{db_gateway.url}/health", "interval": 30, "timeout": 10, "retries": 3}, 

935 "is_active": db_gateway.enabled, 

936 "tags": db_gateway.tags or [], 

937 "passthrough_headers": db_gateway.passthrough_headers or [], 

938 } 

939 

940 # Include auth data directly from DB (already have raw values) 

941 if db_gateway.auth_type: 

942 gateway_data["auth_type"] = db_gateway.auth_type 

943 if db_gateway.auth_value: 

944 # DbGateway.auth_value is JSON (dict); export format expects an encoded string. 

945 raw = db_gateway.auth_value 

946 gateway_data["auth_value"] = encode_auth(raw) if isinstance(raw, dict) else raw 

947 # Include query param auth if present 

948 if db_gateway.auth_type == "query_param" and getattr(db_gateway, "auth_query_params", None): 

949 gateway_data["auth_query_params"] = db_gateway.auth_query_params 

950 

951 exported_gateways.append(gateway_data) 

952 

953 return exported_gateways 

954 

955 async def _export_selected_servers( 

956 self, db: Session, server_ids: List[str], root_path: str = "", user_email: Optional[str] = None, token_teams: Optional[List[str]] = None 

957 ) -> List[Dict[str, Any]]: 

958 """Export specific servers by their IDs using batch queries. 

959 

960 Uses a single batch query instead of fetching all servers N times. 

961 

962 Args: 

963 db: Database session 

964 server_ids: List of server IDs to export 

965 root_path: Root path for constructing API endpoints 

966 user_email: Requesting user's email for visibility filtering 

967 token_teams: Token team scope for visibility filtering 

968 

969 Returns: 

970 List of exported server dictionaries 

971 """ 

972 if not server_ids: 

973 return [] 

974 

975 visible_server_ids: Optional[set[str]] = None 

976 if self._is_scoped_selective_export(user_email, token_teams): 

977 visible_servers = await self._fetch_all_servers(db, tags=None, include_inactive=True, user_email=user_email, token_teams=token_teams) 

978 visible_server_ids = {str(server.id) for server in visible_servers} 

979 server_ids = [server_id for server_id in server_ids if server_id in visible_server_ids] 

980 if not server_ids: 

981 return [] 

982 

983 # Batch query for selected servers with eager loading to avoid N+1 queries 

984 db_servers = db.execute(select(DbServer).options(selectinload(DbServer.tools)).where(DbServer.id.in_(server_ids))).scalars().all() 

985 if visible_server_ids is not None: 

986 db_servers = [db_server for db_server in db_servers if str(db_server.id) in visible_server_ids] 

987 

988 exported_servers = [] 

989 for db_server in db_servers: 

990 # Get associated tool IDs (tools are eagerly loaded) 

991 tool_ids = [str(tool.id) for tool in db_server.tools] if db_server.tools else [] 

992 

993 server_data = { 

994 "name": db_server.name, 

995 "description": db_server.description, 

996 "tool_ids": tool_ids, 

997 "sse_endpoint": f"{root_path}/servers/{db_server.id}/sse", 

998 "websocket_endpoint": f"{root_path}/servers/{db_server.id}/ws", 

999 "jsonrpc_endpoint": f"{root_path}/servers/{db_server.id}/jsonrpc", 

1000 "capabilities": {"tools": {"list_changed": True}, "prompts": {"list_changed": True}}, 

1001 "is_active": db_server.enabled, 

1002 "tags": db_server.tags or [], 

1003 } 

1004 

1005 exported_servers.append(server_data) 

1006 

1007 return exported_servers 

1008 

1009 async def _export_selected_prompts(self, db: Session, prompt_names: List[str], user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Dict[str, Any]]: 

1010 """Export specific prompts by their identifiers using batch queries. 

1011 

1012 Uses a single batch query instead of fetching all prompts N times. 

1013 

1014 Args: 

1015 db: Database session 

1016 prompt_names: List of prompt IDs or names to export 

1017 user_email: Requesting user's email for visibility filtering 

1018 token_teams: Token team scope for visibility filtering 

1019 

1020 Returns: 

1021 List of exported prompt dictionaries 

1022 """ 

1023 if not prompt_names: 

1024 return [] 

1025 

1026 visible_prompt_identifiers: Optional[set[str]] = None 

1027 if self._is_scoped_selective_export(user_email, token_teams): 

1028 visible_prompts = await self._fetch_all_prompts(db, tags=None, include_inactive=True, user_email=user_email, token_teams=token_teams) 

1029 visible_prompt_identifiers = set() 

1030 for prompt in visible_prompts: 

1031 visible_prompt_identifiers.add(str(prompt.id)) 

1032 if getattr(prompt, "name", None): 

1033 visible_prompt_identifiers.add(prompt.name) 

1034 if getattr(prompt, "original_name", None): 

1035 visible_prompt_identifiers.add(prompt.original_name) 

1036 if getattr(prompt, "custom_name", None): 

1037 visible_prompt_identifiers.add(prompt.custom_name) 

1038 prompt_names = [prompt_name for prompt_name in prompt_names if prompt_name in visible_prompt_identifiers] 

1039 if not prompt_names: 

1040 return [] 

1041 

1042 # Batch query for selected prompts only 

1043 db_prompts = db.execute(select(DbPrompt).where(or_(DbPrompt.id.in_(prompt_names), DbPrompt.name.in_(prompt_names)))).scalars().all() 

1044 if visible_prompt_identifiers is not None: 

1045 db_prompts = [ 

1046 db_prompt 

1047 for db_prompt in db_prompts 

1048 if str(db_prompt.id) in visible_prompt_identifiers 

1049 or (getattr(db_prompt, "name", None) in visible_prompt_identifiers) 

1050 or (getattr(db_prompt, "original_name", None) in visible_prompt_identifiers) 

1051 or (getattr(db_prompt, "custom_name", None) in visible_prompt_identifiers) 

1052 ] 

1053 

1054 exported_prompts = [] 

1055 for db_prompt in db_prompts: 

1056 # Build input schema from argument_schema 

1057 input_schema: Dict[str, Any] = {"type": "object", "properties": {}, "required": []} 

1058 if db_prompt.argument_schema: 

1059 input_schema = db_prompt.argument_schema 

1060 

1061 prompt_data: Dict[str, Any] = { 

1062 "name": db_prompt.original_name or db_prompt.name, 

1063 "original_name": db_prompt.original_name or db_prompt.name, 

1064 "custom_name": db_prompt.custom_name or db_prompt.original_name or db_prompt.name, 

1065 "display_name": db_prompt.display_name or db_prompt.custom_name or db_prompt.original_name or db_prompt.name, 

1066 "template": db_prompt.template, 

1067 "description": db_prompt.description, 

1068 "input_schema": input_schema, 

1069 "tags": db_prompt.tags or [], 

1070 "is_active": getattr(db_prompt, "enabled", getattr(db_prompt, "is_active", False)), 

1071 } 

1072 

1073 exported_prompts.append(prompt_data) 

1074 

1075 return exported_prompts 

1076 

1077 async def _export_selected_resources(self, db: Session, resource_uris: List[str], user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Dict[str, Any]]: 

1078 """Export specific resources by their URIs using batch queries. 

1079 

1080 Uses a single batch query instead of fetching all resources N times. 

1081 

1082 Args: 

1083 db: Database session 

1084 resource_uris: List of resource URIs to export 

1085 user_email: Requesting user's email for visibility filtering 

1086 token_teams: Token team scope for visibility filtering 

1087 

1088 Returns: 

1089 List of exported resource dictionaries 

1090 """ 

1091 if not resource_uris: 

1092 return [] 

1093 

1094 visible_resource_uris: Optional[set[str]] = None 

1095 if self._is_scoped_selective_export(user_email, token_teams): 

1096 visible_resources = await self._fetch_all_resources(db, tags=None, include_inactive=True, user_email=user_email, token_teams=token_teams) 

1097 visible_resource_uris = {resource.uri for resource in visible_resources} 

1098 resource_uris = [resource_uri for resource_uri in resource_uris if resource_uri in visible_resource_uris] 

1099 if not resource_uris: 

1100 return [] 

1101 

1102 # Batch query for selected resources only 

1103 db_resources = db.execute(select(DbResource).where(DbResource.uri.in_(resource_uris))).scalars().all() 

1104 if visible_resource_uris is not None: 

1105 db_resources = [db_resource for db_resource in db_resources if db_resource.uri in visible_resource_uris] 

1106 

1107 exported_resources = [] 

1108 for db_resource in db_resources: 

1109 resource_data = { 

1110 "name": db_resource.name, 

1111 "uri": db_resource.uri, 

1112 "description": db_resource.description, 

1113 "mime_type": db_resource.mime_type, 

1114 "tags": db_resource.tags or [], 

1115 "is_active": db_resource.enabled, 

1116 "last_modified": db_resource.updated_at.isoformat() if db_resource.updated_at else None, 

1117 } 

1118 

1119 exported_resources.append(resource_data) 

1120 

1121 return exported_resources 

1122 

1123 async def _export_selected_roots(self, root_uris: List[str]) -> List[Dict[str, Any]]: 

1124 """Export specific roots by their URIs. 

1125 

1126 Args: 

1127 root_uris: List of root URIs to export 

1128 

1129 Returns: 

1130 List of exported root dictionaries 

1131 """ 

1132 all_roots = await self._export_roots() 

1133 return [r for r in all_roots if r["uri"] in root_uris]