Coverage for mcpgateway / services / export_service.py: 96%
340 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2# pylint: disable=import-outside-toplevel,no-name-in-module
3"""Location: ./mcpgateway/services/export_service.py
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
6Authors: Mihai Criveti
8Export Service Implementation.
9This module implements comprehensive configuration export functionality according to the export specification.
10It handles:
11- Entity collection from all entity types (Tools, Gateways, Servers, Prompts, Resources, Roots)
12- Secure authentication data encryption using AES-256-GCM
13- Dependency resolution and inclusion
14- Filtering by entity types, tags, and active/inactive status
15- Export format validation and schema compliance
16- Only exports locally configured entities (not federated content)
17"""
19# Standard
20from datetime import datetime, timezone
21import logging
22from typing import Any, cast, Dict, List, Optional, TypedDict
24# Third-Party
25from sqlalchemy import or_, select
26from sqlalchemy.orm import selectinload, Session
28# First-Party
29from mcpgateway.config import settings
30from mcpgateway.db import Gateway as DbGateway
31from mcpgateway.db import Prompt as DbPrompt
32from mcpgateway.db import Resource as DbResource
33from mcpgateway.db import Server as DbServer
34from mcpgateway.db import Tool as DbTool
36# Service singletons are imported lazily in __init__ to avoid circular imports
38logger = logging.getLogger(__name__)
41class ExportError(Exception):
42 """Base class for export-related errors.
44 Examples:
45 >>> try:
46 ... raise ExportError("General export error")
47 ... except ExportError as e:
48 ... str(e)
49 'General export error'
50 >>> try:
51 ... raise ExportError("Export failed")
52 ... except Exception as e:
53 ... isinstance(e, ExportError)
54 True
55 """
58class ExportValidationError(ExportError):
59 """Raised when export data validation fails.
61 Examples:
62 >>> try:
63 ... raise ExportValidationError("Invalid export format")
64 ... except ExportValidationError as e:
65 ... str(e)
66 'Invalid export format'
67 >>> try:
68 ... raise ExportValidationError("Schema validation failed")
69 ... except ExportError as e:
70 ... isinstance(e, ExportError) # Should inherit from ExportError
71 True
72 >>> try:
73 ... raise ExportValidationError("Missing required field")
74 ... except Exception as e:
75 ... isinstance(e, ExportValidationError)
76 True
77 """
80class ExportService:
81 """Service for exporting MCP Gateway configuration and data.
83 This service provides comprehensive export functionality including:
84 - Collection of all entity types (tools, gateways, servers, prompts, resources, roots)
85 - Secure handling of authentication data with encryption
86 - Dependency resolution between entities
87 - Filtering options (by type, tags, status)
88 - Export format validation
90 The service only exports locally configured entities, excluding dynamic content
91 from federated sources to ensure exports contain only configuration data.
93 Examples:
94 >>> service = ExportService()
95 >>> hasattr(service, 'gateway_service')
96 True
97 >>> hasattr(service, 'tool_service')
98 True
99 >>> hasattr(service, 'resource_service')
100 True
101 >>> # Test entity type validation
102 >>> valid_types = ["tools", "gateways", "servers", "prompts", "resources", "roots"]
103 >>> "tools" in valid_types
104 True
105 >>> "invalid_type" in valid_types
106 False
107 >>> # Test filtering logic
108 >>> include_types = ["tools", "servers"]
109 >>> exclude_types = ["gateways"]
110 >>> "tools" in include_types and "tools" not in exclude_types
111 True
112 >>> "gateways" in include_types and "gateways" not in exclude_types
113 False
114 >>> # Test tag filtering
115 >>> entity_tags = ["production", "api"]
116 >>> filter_tags = ["production"]
117 >>> any(tag in entity_tags for tag in filter_tags)
118 True
119 >>> filter_tags = ["development"]
120 >>> any(tag in entity_tags for tag in filter_tags)
121 False
122 """
124 def __init__(self):
125 """Initialize the export service with required dependencies."""
126 # Use globally-exported singletons from service modules so they
127 # share initialized EventService/Redis clients created at app startup.
128 # Import lazily to avoid circular import at module load time.
129 # First-Party
130 from mcpgateway.services.gateway_service import gateway_service
131 from mcpgateway.services.prompt_service import prompt_service
132 from mcpgateway.services.resource_service import resource_service
133 from mcpgateway.services.root_service import root_service
134 from mcpgateway.services.server_service import server_service
135 from mcpgateway.services.tool_service import tool_service
137 self.gateway_service = gateway_service
138 self.tool_service = tool_service
139 self.resource_service = resource_service
140 self.prompt_service = prompt_service
141 self.server_service = server_service
142 self.root_service = root_service
144 async def initialize(self) -> None:
145 """Initialize the export service."""
146 logger.info("Export service initialized")
148 async def shutdown(self) -> None:
149 """Shutdown the export service."""
150 logger.info("Export service shutdown")
152 async def _fetch_all_tools(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Any]:
153 """Fetch all tools by following pagination cursors.
155 Args:
156 db: Database session
157 tags: Filter by tags
158 include_inactive: Include inactive tools
160 Returns:
161 List of all tools across all pages
162 """
163 all_tools = []
164 cursor = None
165 while True:
166 tools, next_cursor = await self.tool_service.list_tools(db, tags=tags, include_inactive=include_inactive, cursor=cursor)
167 all_tools.extend(tools)
168 if not next_cursor:
169 break
170 cursor = next_cursor
171 return all_tools
173 async def _fetch_all_prompts(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Any]:
174 """Fetch all prompts by following pagination cursors.
176 Args:
177 db: Database session
178 tags: Filter by tags
179 include_inactive: Include inactive prompts
181 Returns:
182 List of all prompts across all pages
183 """
184 all_prompts = []
185 cursor = None
186 while True:
187 prompts, next_cursor = await self.prompt_service.list_prompts(db, tags=tags, include_inactive=include_inactive, cursor=cursor)
188 all_prompts.extend(prompts)
189 if not next_cursor:
190 break
191 cursor = next_cursor
192 return all_prompts
194 async def _fetch_all_resources(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Any]:
195 """Fetch all resources by following pagination cursors.
197 Args:
198 db: Database session
199 tags: Filter by tags
200 include_inactive: Include inactive resources
202 Returns:
203 List of all resources across all pages
204 """
205 all_resources = []
206 cursor = None
207 while True:
208 resources, next_cursor = await self.resource_service.list_resources(db, tags=tags, include_inactive=include_inactive, cursor=cursor)
209 all_resources.extend(resources)
210 if not next_cursor:
211 break
212 cursor = next_cursor
213 return all_resources
215 async def _fetch_all_gateways(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Any]:
216 """Fetch all gateways by following pagination cursors.
218 Args:
219 db: Database session
220 tags: Filter by tags
221 include_inactive: Include inactive gateways
223 Returns:
224 List of all gateways across all pages
225 """
226 all_gateways = []
227 cursor = None
228 while True:
229 gateways, next_cursor = await self.gateway_service.list_gateways(db, tags=tags, include_inactive=include_inactive, cursor=cursor)
230 all_gateways.extend(gateways)
231 if not next_cursor:
232 break
233 cursor = next_cursor
234 return all_gateways
236 async def _fetch_all_servers(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Any]:
237 """Fetch all servers by following pagination cursors.
239 Args:
240 db: Database session
241 tags: Filter by tags
242 include_inactive: Include inactive servers
244 Returns:
245 List of all servers across all pages
246 """
247 all_servers = []
248 cursor = None
249 while True:
250 servers, next_cursor = await self.server_service.list_servers(db, tags=tags, include_inactive=include_inactive, cursor=cursor)
251 all_servers.extend(servers)
252 if not next_cursor:
253 break
254 cursor = next_cursor
255 return all_servers
257 async def export_configuration(
258 self,
259 db: Session,
260 include_types: Optional[List[str]] = None,
261 exclude_types: Optional[List[str]] = None,
262 tags: Optional[List[str]] = None,
263 include_inactive: bool = False,
264 include_dependencies: bool = True,
265 exported_by: str = "system",
266 root_path: str = "",
267 ) -> Dict[str, Any]:
268 """Export complete gateway configuration to a standardized format.
270 Args:
271 db: Database session
272 include_types: List of entity types to include (tools, gateways, servers, prompts, resources, roots)
273 exclude_types: List of entity types to exclude
274 tags: Filter entities by tags (only export entities with these tags)
275 include_inactive: Whether to include inactive entities
276 include_dependencies: Whether to include dependent entities automatically
277 exported_by: Username of the person performing the export
278 root_path: Root path for constructing API endpoints
280 Returns:
281 Dict containing the complete export data in the specified schema format
283 Raises:
284 ExportError: If export fails
285 ExportValidationError: If validation fails
286 """
287 try:
288 logger.info(f"Starting configuration export by {exported_by}")
290 # Determine which entity types to include
291 all_types = ["tools", "gateways", "servers", "prompts", "resources", "roots"]
292 if include_types:
293 entity_types = [t.lower() for t in include_types if t.lower() in all_types]
294 else:
295 entity_types = all_types
297 if exclude_types:
298 entity_types = [t for t in entity_types if t.lower() not in [e.lower() for e in exclude_types]]
300 class ExportOptions(TypedDict, total=False):
301 """Options that control export behavior (full export)."""
303 include_inactive: bool
304 include_dependencies: bool
305 selected_types: List[str]
306 filter_tags: List[str]
308 class ExportMetadata(TypedDict):
309 """Metadata for full export including counts, dependencies, and options."""
311 entity_counts: Dict[str, int]
312 dependencies: Dict[str, Any]
313 export_options: ExportOptions
315 class ExportData(TypedDict):
316 """Top-level full export payload shape."""
318 version: str
319 exported_at: str
320 exported_by: str
321 source_gateway: str
322 encryption_method: str
323 entities: Dict[str, List[Dict[str, Any]]]
324 metadata: ExportMetadata
326 entities: Dict[str, List[Dict[str, Any]]] = {}
327 metadata: ExportMetadata = {
328 "entity_counts": {},
329 "dependencies": {},
330 "export_options": {
331 "include_inactive": include_inactive,
332 "include_dependencies": include_dependencies,
333 "selected_types": entity_types,
334 "filter_tags": tags or [],
335 },
336 }
338 export_data: ExportData = {
339 "version": settings.protocol_version,
340 "exported_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
341 "exported_by": exported_by,
342 "source_gateway": f"http://{settings.host}:{settings.port}",
343 "encryption_method": "AES-256-GCM",
344 "entities": entities,
345 "metadata": metadata,
346 }
348 # Export each entity type
349 if "tools" in entity_types: 349 ↛ 352line 349 didn't jump to line 352 because the condition on line 349 was always true
350 export_data["entities"]["tools"] = await self._export_tools(db, tags, include_inactive)
352 if "gateways" in entity_types: 352 ↛ 355line 352 didn't jump to line 355 because the condition on line 352 was always true
353 export_data["entities"]["gateways"] = await self._export_gateways(db, tags, include_inactive)
355 if "servers" in entity_types:
356 export_data["entities"]["servers"] = await self._export_servers(db, tags, include_inactive, root_path)
358 if "prompts" in entity_types:
359 export_data["entities"]["prompts"] = await self._export_prompts(db, tags, include_inactive)
361 if "resources" in entity_types:
362 export_data["entities"]["resources"] = await self._export_resources(db, tags, include_inactive)
364 if "roots" in entity_types:
365 export_data["entities"]["roots"] = await self._export_roots()
367 # Add dependency information
368 if include_dependencies: 368 ↛ 372line 368 didn't jump to line 372 because the condition on line 368 was always true
369 export_data["metadata"]["dependencies"] = await self._extract_dependencies(db, export_data["entities"])
371 # Calculate entity counts
372 for entity_type, entities_list in export_data["entities"].items():
373 export_data["metadata"]["entity_counts"][entity_type] = len(entities_list)
375 # Validate export data
376 self._validate_export_data(cast(Dict[str, Any], export_data))
378 logger.info(f"Export completed successfully with {sum(export_data['metadata']['entity_counts'].values())} total entities")
379 return cast(Dict[str, Any], export_data)
381 except Exception as e:
382 logger.error(f"Export failed: {str(e)}")
383 raise ExportError(f"Failed to export configuration: {str(e)}")
385 async def _export_tools(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Dict[str, Any]]:
386 """Export tools with encrypted authentication data.
388 Uses batch queries to fetch auth data efficiently, avoiding N+1 query patterns.
390 Args:
391 db: Database session
392 tags: Filter by tags
393 include_inactive: Include inactive tools
395 Returns:
396 List of exported tool dictionaries
397 """
398 # Fetch all tools across all pages (bypasses pagination limit)
399 tools = await self._fetch_all_tools(db, tags, include_inactive)
401 # Filter to only exportable tools (local REST tools, not MCP tools from gateways)
402 exportable_tools = [t for t in tools if not (t.integration_type == "MCP" and t.gateway_id)]
404 # Batch fetch auth data for tools with masked values (single query instead of N queries)
405 tool_ids_needing_auth = [
406 tool.id for tool in exportable_tools if hasattr(tool, "auth") and tool.auth and hasattr(tool.auth, "auth_value") and tool.auth.auth_value == settings.masked_auth_value
407 ]
409 auth_data_map: Dict[Any, tuple] = {}
410 if tool_ids_needing_auth:
411 db_tools_with_auth = db.execute(select(DbTool.id, DbTool.auth_type, DbTool.auth_value).where(DbTool.id.in_(tool_ids_needing_auth))).all()
412 auth_data_map = {row[0]: (row[1], row[2]) for row in db_tools_with_auth}
414 exported_tools = []
415 for tool in exportable_tools:
416 tool_data = {
417 "name": tool.original_name, # Use original name, not the slugified version
418 "displayName": tool.displayName, # Export displayName field from ToolRead
419 "url": str(tool.url),
420 "integration_type": tool.integration_type,
421 "request_type": tool.request_type,
422 "description": tool.description,
423 "headers": tool.headers or {},
424 "input_schema": tool.input_schema or {"type": "object", "properties": {}},
425 "output_schema": tool.output_schema,
426 "annotations": tool.annotations or {},
427 "jsonpath_filter": tool.jsonpath_filter,
428 "tags": tool.tags or [],
429 "rate_limit": getattr(tool, "rate_limit", None),
430 "timeout": getattr(tool, "timeout", None),
431 "is_active": tool.enabled,
432 "created_at": tool.created_at.isoformat() if hasattr(tool.created_at, "isoformat") and tool.created_at else None,
433 "updated_at": tool.updated_at.isoformat() if hasattr(tool.updated_at, "isoformat") and tool.updated_at else None,
434 }
436 # Handle authentication data securely - use batch-fetched values
437 if hasattr(tool, "auth") and tool.auth:
438 auth = tool.auth
439 if hasattr(auth, "auth_type") and hasattr(auth, "auth_value"): 439 ↛ 452line 439 didn't jump to line 452 because the condition on line 439 was always true
440 if auth.auth_value == settings.masked_auth_value:
441 # Use batch-fetched auth data
442 if tool.id in auth_data_map: 442 ↛ 452line 442 didn't jump to line 452 because the condition on line 442 was always true
443 auth_type, auth_value = auth_data_map[tool.id]
444 if auth_value: 444 ↛ 452line 444 didn't jump to line 452 because the condition on line 444 was always true
445 tool_data["auth_type"] = auth_type
446 tool_data["auth_value"] = auth_value
447 else:
448 # Auth value is not masked, use as-is
449 tool_data["auth_type"] = auth.auth_type
450 tool_data["auth_value"] = auth.auth_value
452 exported_tools.append(tool_data)
454 return exported_tools
456 async def _export_gateways(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Dict[str, Any]]:
457 """Export gateways with encrypted authentication data.
459 Uses batch queries to fetch auth data efficiently, avoiding N+1 query patterns.
461 Args:
462 db: Database session
463 tags: Filter by tags
464 include_inactive: Include inactive gateways
466 Returns:
467 List of exported gateway dictionaries
468 """
469 # Fetch all gateways across all pages (bypasses pagination limit)
470 gateways = await self._fetch_all_gateways(db, tags, include_inactive)
472 # Batch fetch auth data for gateways with masked values (single query instead of N queries)
473 gateway_ids_needing_auth = [g.id for g in gateways if g.auth_type and g.auth_value == settings.masked_auth_value]
475 auth_data_map: Dict[Any, tuple] = {}
476 if gateway_ids_needing_auth:
477 db_gateways_with_auth = db.execute(select(DbGateway.id, DbGateway.auth_type, DbGateway.auth_value).where(DbGateway.id.in_(gateway_ids_needing_auth))).all()
478 auth_data_map = {row[0]: (row[1], row[2]) for row in db_gateways_with_auth}
480 exported_gateways = []
481 for gateway in gateways:
482 gateway_data = {
483 "name": gateway.name,
484 "url": str(gateway.url),
485 "description": gateway.description,
486 "transport": gateway.transport,
487 "capabilities": gateway.capabilities or {},
488 "health_check": {"url": f"{gateway.url}/health", "interval": 30, "timeout": 10, "retries": 3},
489 "is_active": gateway.enabled,
490 "tags": gateway.tags or [],
491 "passthrough_headers": gateway.passthrough_headers or [],
492 }
494 # Handle authentication data securely - use batch-fetched values
495 if gateway.auth_type and gateway.auth_value:
496 if gateway.auth_value == settings.masked_auth_value:
497 # Use batch-fetched auth data
498 if gateway.id in auth_data_map: 498 ↛ 508line 498 didn't jump to line 508 because the condition on line 498 was always true
499 auth_type, auth_value = auth_data_map[gateway.id]
500 if auth_value: 500 ↛ 508line 500 didn't jump to line 508 because the condition on line 500 was always true
501 gateway_data["auth_type"] = auth_type
502 gateway_data["auth_value"] = auth_value
503 else:
504 # Auth value is not masked, use as-is
505 gateway_data["auth_type"] = gateway.auth_type
506 gateway_data["auth_value"] = gateway.auth_value
508 exported_gateways.append(gateway_data)
510 return exported_gateways
512 async def _export_servers(self, db: Session, tags: Optional[List[str]], include_inactive: bool, root_path: str = "") -> List[Dict[str, Any]]:
513 """Export virtual servers with their tool associations.
515 Args:
516 db: Database session
517 tags: Filter by tags
518 include_inactive: Include inactive servers
519 root_path: Root path for constructing API endpoints
521 Returns:
522 List of exported server dictionaries
523 """
524 # Fetch all servers across all pages (bypasses pagination limit)
525 servers = await self._fetch_all_servers(db, tags, include_inactive)
526 exported_servers = []
528 for server in servers:
529 server_data = {
530 "name": server.name,
531 "description": server.description,
532 "tool_ids": list(server.associated_tools),
533 "sse_endpoint": f"{root_path}/servers/{server.id}/sse",
534 "websocket_endpoint": f"{root_path}/servers/{server.id}/ws",
535 "jsonrpc_endpoint": f"{root_path}/servers/{server.id}/jsonrpc",
536 "capabilities": {"tools": {"list_changed": True}, "prompts": {"list_changed": True}},
537 "is_active": getattr(server, "enabled", getattr(server, "is_active", False)),
538 "tags": server.tags or [],
539 }
541 exported_servers.append(server_data)
543 return exported_servers
545 async def _export_prompts(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Dict[str, Any]]:
546 """Export prompts with their templates and schemas.
548 Args:
549 db: Database session
550 tags: Filter by tags
551 include_inactive: Include inactive prompts
553 Returns:
554 List of exported prompt dictionaries
555 """
556 # Fetch all prompts across all pages (bypasses pagination limit)
557 prompts = await self._fetch_all_prompts(db, tags, include_inactive)
558 exported_prompts = []
560 for prompt in prompts:
561 input_schema: Dict[str, Any] = {"type": "object", "properties": {}, "required": []}
562 prompt_data: Dict[str, Any] = {
563 "name": getattr(prompt, "original_name", None) or prompt.name,
564 "original_name": getattr(prompt, "original_name", None) or prompt.name,
565 "custom_name": getattr(prompt, "custom_name", None) or getattr(prompt, "original_name", None) or prompt.name,
566 "display_name": getattr(prompt, "display_name", None) or getattr(prompt, "custom_name", None) or getattr(prompt, "original_name", None) or prompt.name,
567 "template": prompt.template,
568 "description": prompt.description,
569 "input_schema": input_schema,
570 "tags": prompt.tags or [],
571 # Use the new `enabled` attribute on prompt objects but keep export key `is_active` for compatibility
572 "is_active": getattr(prompt, "enabled", getattr(prompt, "is_active", False)),
573 }
575 # Convert arguments to input schema format
576 if prompt.arguments: 576 ↛ 586line 576 didn't jump to line 586 because the condition on line 576 was always true
577 properties: Dict[str, Any] = {}
578 required = []
579 for arg in prompt.arguments:
580 properties[arg.name] = {"type": "string", "description": arg.description or ""}
581 if arg.required:
582 required.append(arg.name)
583 input_schema["properties"] = properties
584 input_schema["required"] = required
586 exported_prompts.append(prompt_data)
588 return exported_prompts
590 async def _export_resources(self, db: Session, tags: Optional[List[str]], include_inactive: bool) -> List[Dict[str, Any]]:
591 """Export resources with their content metadata.
593 Args:
594 db: Database session
595 tags: Filter by tags
596 include_inactive: Include inactive resources
598 Returns:
599 List of exported resource dictionaries
600 """
601 # Fetch all resources across all pages (bypasses pagination limit)
602 resources = await self._fetch_all_resources(db, tags, include_inactive)
603 exported_resources = []
605 for resource in resources:
606 resource_data = {
607 "name": resource.name,
608 "uri": resource.uri,
609 "description": resource.description,
610 "mime_type": resource.mime_type,
611 "tags": resource.tags or [],
612 "is_active": getattr(resource, "enabled", getattr(resource, "is_active", False)),
613 "last_modified": resource.updated_at.isoformat() if resource.updated_at else None,
614 }
616 exported_resources.append(resource_data)
618 return exported_resources
620 async def _export_roots(self) -> List[Dict[str, Any]]:
621 """Export filesystem roots.
623 Returns:
624 List of exported root dictionaries
625 """
626 roots = await self.root_service.list_roots()
627 exported_roots = []
629 for root in roots:
630 root_data = {"uri": str(root.uri), "name": root.name}
631 exported_roots.append(root_data)
633 return exported_roots
635 async def _extract_dependencies(self, db: Session, entities: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]: # pylint: disable=unused-argument
636 """Extract dependency relationships between entities.
638 Args:
639 db: Database session
640 entities: Dictionary of exported entities
642 Returns:
643 Dictionary containing dependency mappings
644 """
645 dependencies = {"servers_to_tools": {}, "servers_to_resources": {}, "servers_to_prompts": {}}
647 # Extract server-to-tool dependencies
648 if "servers" in entities and "tools" in entities:
649 for server in entities["servers"]:
650 if server.get("tool_ids"): 650 ↛ 649line 650 didn't jump to line 649 because the condition on line 650 was always true
651 dependencies["servers_to_tools"][server["name"]] = server["tool_ids"]
653 return dependencies
655 def _validate_export_data(self, export_data: Dict[str, Any]) -> None:
656 """Validate export data against the schema.
658 Args:
659 export_data: The export data to validate
661 Raises:
662 ExportValidationError: If validation fails
663 """
664 required_fields = ["version", "exported_at", "exported_by", "entities", "metadata"]
666 for field in required_fields:
667 if field not in export_data:
668 raise ExportValidationError(f"Missing required field: {field}")
670 # Validate version format
671 if not export_data["version"]:
672 raise ExportValidationError("Version cannot be empty")
674 # Validate entities structure
675 if not isinstance(export_data["entities"], dict):
676 raise ExportValidationError("Entities must be a dictionary")
678 # Validate metadata structure
679 metadata = export_data["metadata"]
680 if not isinstance(metadata.get("entity_counts"), dict):
681 raise ExportValidationError("Metadata entity_counts must be a dictionary")
683 logger.debug("Export data validation passed")
685 async def export_selective(self, db: Session, entity_selections: Dict[str, List[str]], include_dependencies: bool = True, exported_by: str = "system", root_path: str = "") -> Dict[str, Any]:
686 """Export specific entities by their IDs/names.
688 Args:
689 db: Database session
690 entity_selections: Dict mapping entity types to lists of IDs/names to export
691 include_dependencies: Whether to include dependent entities
692 exported_by: Username of the person performing the export
693 root_path: Root path for constructing API endpoints
695 Returns:
696 Dict containing the selective export data
698 Example:
699 entity_selections = {
700 "tools": ["tool1", "tool2"],
701 "servers": ["server1"],
702 "prompts": ["prompt1"]
703 }
704 """
705 logger.info(f"Starting selective export by {exported_by}")
707 class SelExportOptions(TypedDict, total=False):
708 """Options that control behavior for selective export."""
710 selective: bool
711 include_dependencies: bool
712 selections: Dict[str, List[str]]
714 class SelExportMetadata(TypedDict):
715 """Metadata for selective export including counts, dependencies, and options."""
717 entity_counts: Dict[str, int]
718 dependencies: Dict[str, Any]
719 export_options: SelExportOptions
721 class SelExportData(TypedDict):
722 """Top-level selective export payload shape."""
724 version: str
725 exported_at: str
726 exported_by: str
727 source_gateway: str
728 encryption_method: str
729 entities: Dict[str, List[Dict[str, Any]]]
730 metadata: SelExportMetadata
732 sel_entities: Dict[str, List[Dict[str, Any]]] = {}
733 sel_metadata: SelExportMetadata = {
734 "entity_counts": {},
735 "dependencies": {},
736 "export_options": {"selective": True, "include_dependencies": include_dependencies, "selections": entity_selections},
737 }
738 export_data: SelExportData = {
739 "version": settings.protocol_version,
740 "exported_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
741 "exported_by": exported_by,
742 "source_gateway": f"http://{settings.host}:{settings.port}",
743 "encryption_method": "AES-256-GCM",
744 "entities": sel_entities,
745 "metadata": sel_metadata,
746 }
748 # Export selected entities for each type
749 for entity_type, selected_ids in entity_selections.items():
750 if entity_type == "tools":
751 export_data["entities"]["tools"] = await self._export_selected_tools(db, selected_ids)
752 elif entity_type == "gateways":
753 export_data["entities"]["gateways"] = await self._export_selected_gateways(db, selected_ids)
754 elif entity_type == "servers":
755 export_data["entities"]["servers"] = await self._export_selected_servers(db, selected_ids, root_path)
756 elif entity_type == "prompts":
757 export_data["entities"]["prompts"] = await self._export_selected_prompts(db, selected_ids)
758 elif entity_type == "resources":
759 export_data["entities"]["resources"] = await self._export_selected_resources(db, selected_ids)
760 elif entity_type == "roots": 760 ↛ 749line 760 didn't jump to line 749 because the condition on line 760 was always true
761 export_data["entities"]["roots"] = await self._export_selected_roots(selected_ids)
763 # Add dependencies if requested
764 if include_dependencies: 764 ↛ 768line 764 didn't jump to line 768 because the condition on line 764 was always true
765 export_data["metadata"]["dependencies"] = await self._extract_dependencies(db, export_data["entities"])
767 # Calculate entity counts
768 for entity_type, entities_list in export_data["entities"].items():
769 export_data["metadata"]["entity_counts"][entity_type] = len(entities_list)
771 self._validate_export_data(cast(Dict[str, Any], export_data))
773 logger.info(f"Selective export completed with {sum(export_data['metadata']['entity_counts'].values())} entities")
774 return cast(Dict[str, Any], export_data)
776 async def _export_selected_tools(self, db: Session, tool_ids: List[str]) -> List[Dict[str, Any]]:
777 """Export specific tools by their IDs using batch queries.
779 Uses a single batch query instead of fetching all tools N times.
781 Args:
782 db: Database session
783 tool_ids: List of tool IDs to export
785 Returns:
786 List of exported tool dictionaries
787 """
788 if not tool_ids:
789 return []
791 # Batch query for selected tools only
792 db_tools = db.execute(select(DbTool).where(DbTool.id.in_(tool_ids))).scalars().all()
794 exported_tools = []
795 for db_tool in db_tools:
796 # Only export local REST tools, not MCP tools from gateways
797 if db_tool.integration_type == "MCP" and db_tool.gateway_id:
798 continue
800 tool_data = {
801 "name": db_tool.original_name or db_tool.custom_name,
802 "displayName": db_tool.display_name,
803 "url": str(db_tool.url) if db_tool.url else None,
804 "integration_type": db_tool.integration_type,
805 "request_type": db_tool.request_type,
806 "description": db_tool.description,
807 "headers": db_tool.headers or {},
808 "input_schema": db_tool.input_schema or {"type": "object", "properties": {}},
809 "output_schema": db_tool.output_schema,
810 "annotations": db_tool.annotations or {},
811 "jsonpath_filter": db_tool.jsonpath_filter,
812 "tags": db_tool.tags or [],
813 "rate_limit": db_tool.rate_limit,
814 "timeout": db_tool.timeout,
815 "is_active": db_tool.is_active,
816 "created_at": db_tool.created_at.isoformat() if db_tool.created_at else None,
817 "updated_at": db_tool.updated_at.isoformat() if db_tool.updated_at else None,
818 }
820 # Include auth data directly from DB (already have raw values)
821 if db_tool.auth_type and db_tool.auth_value: 821 ↛ 825line 821 didn't jump to line 825 because the condition on line 821 was always true
822 tool_data["auth_type"] = db_tool.auth_type
823 tool_data["auth_value"] = db_tool.auth_value
825 exported_tools.append(tool_data)
827 return exported_tools
829 async def _export_selected_gateways(self, db: Session, gateway_ids: List[str]) -> List[Dict[str, Any]]:
830 """Export specific gateways by their IDs using batch queries.
832 Uses a single batch query instead of fetching all gateways N times.
834 Args:
835 db: Database session
836 gateway_ids: List of gateway IDs to export
838 Returns:
839 List of exported gateway dictionaries
840 """
841 if not gateway_ids:
842 return []
844 # Batch query for selected gateways only
845 db_gateways = db.execute(select(DbGateway).where(DbGateway.id.in_(gateway_ids))).scalars().all()
847 exported_gateways = []
848 for db_gateway in db_gateways:
849 gateway_data = {
850 "name": db_gateway.name,
851 "url": str(db_gateway.url) if db_gateway.url else None,
852 "description": db_gateway.description,
853 "transport": db_gateway.transport,
854 "capabilities": db_gateway.capabilities or {},
855 "health_check": {"url": f"{db_gateway.url}/health", "interval": 30, "timeout": 10, "retries": 3},
856 "is_active": db_gateway.is_active,
857 "tags": db_gateway.tags or [],
858 "passthrough_headers": db_gateway.passthrough_headers or [],
859 }
861 # Include auth data directly from DB (already have raw values)
862 if db_gateway.auth_type: 862 ↛ 870line 862 didn't jump to line 870 because the condition on line 862 was always true
863 gateway_data["auth_type"] = db_gateway.auth_type
864 if db_gateway.auth_value: 864 ↛ 867line 864 didn't jump to line 867 because the condition on line 864 was always true
865 gateway_data["auth_value"] = db_gateway.auth_value
866 # Include query param auth if present
867 if db_gateway.auth_type == "query_param" and getattr(db_gateway, "auth_query_params", None): 867 ↛ 870line 867 didn't jump to line 870 because the condition on line 867 was always true
868 gateway_data["auth_query_params"] = db_gateway.auth_query_params
870 exported_gateways.append(gateway_data)
872 return exported_gateways
874 async def _export_selected_servers(self, db: Session, server_ids: List[str], root_path: str = "") -> List[Dict[str, Any]]:
875 """Export specific servers by their IDs using batch queries.
877 Uses a single batch query instead of fetching all servers N times.
879 Args:
880 db: Database session
881 server_ids: List of server IDs to export
882 root_path: Root path for constructing API endpoints
884 Returns:
885 List of exported server dictionaries
886 """
887 if not server_ids:
888 return []
890 # Batch query for selected servers with eager loading to avoid N+1 queries
891 db_servers = db.execute(select(DbServer).options(selectinload(DbServer.tools)).where(DbServer.id.in_(server_ids))).scalars().all()
893 exported_servers = []
894 for db_server in db_servers:
895 # Get associated tool IDs (tools are eagerly loaded)
896 tool_ids = [str(tool.id) for tool in db_server.tools] if db_server.tools else []
898 server_data = {
899 "name": db_server.name,
900 "description": db_server.description,
901 "tool_ids": tool_ids,
902 "sse_endpoint": f"{root_path}/servers/{db_server.id}/sse",
903 "websocket_endpoint": f"{root_path}/servers/{db_server.id}/ws",
904 "jsonrpc_endpoint": f"{root_path}/servers/{db_server.id}/jsonrpc",
905 "capabilities": {"tools": {"list_changed": True}, "prompts": {"list_changed": True}},
906 "is_active": db_server.is_active,
907 "tags": db_server.tags or [],
908 }
910 exported_servers.append(server_data)
912 return exported_servers
914 async def _export_selected_prompts(self, db: Session, prompt_names: List[str]) -> List[Dict[str, Any]]:
915 """Export specific prompts by their identifiers using batch queries.
917 Uses a single batch query instead of fetching all prompts N times.
919 Args:
920 db: Database session
921 prompt_names: List of prompt IDs or names to export
923 Returns:
924 List of exported prompt dictionaries
925 """
926 if not prompt_names:
927 return []
929 # Batch query for selected prompts only
930 db_prompts = db.execute(select(DbPrompt).where(or_(DbPrompt.id.in_(prompt_names), DbPrompt.name.in_(prompt_names)))).scalars().all()
932 exported_prompts = []
933 for db_prompt in db_prompts:
934 # Build input schema from argument_schema
935 input_schema: Dict[str, Any] = {"type": "object", "properties": {}, "required": []}
936 if db_prompt.argument_schema: 936 ↛ 939line 936 didn't jump to line 939 because the condition on line 936 was always true
937 input_schema = db_prompt.argument_schema
939 prompt_data: Dict[str, Any] = {
940 "name": db_prompt.original_name or db_prompt.name,
941 "original_name": db_prompt.original_name or db_prompt.name,
942 "custom_name": db_prompt.custom_name or db_prompt.original_name or db_prompt.name,
943 "display_name": db_prompt.display_name or db_prompt.custom_name or db_prompt.original_name or db_prompt.name,
944 "template": db_prompt.template,
945 "description": db_prompt.description,
946 "input_schema": input_schema,
947 "tags": db_prompt.tags or [],
948 "is_active": getattr(db_prompt, "enabled", getattr(db_prompt, "is_active", False)),
949 }
951 exported_prompts.append(prompt_data)
953 return exported_prompts
955 async def _export_selected_resources(self, db: Session, resource_uris: List[str]) -> List[Dict[str, Any]]:
956 """Export specific resources by their URIs using batch queries.
958 Uses a single batch query instead of fetching all resources N times.
960 Args:
961 db: Database session
962 resource_uris: List of resource URIs to export
964 Returns:
965 List of exported resource dictionaries
966 """
967 if not resource_uris:
968 return []
970 # Batch query for selected resources only
971 db_resources = db.execute(select(DbResource).where(DbResource.uri.in_(resource_uris))).scalars().all()
973 exported_resources = []
974 for db_resource in db_resources:
975 resource_data = {
976 "name": db_resource.name,
977 "uri": db_resource.uri,
978 "description": db_resource.description,
979 "mime_type": db_resource.mime_type,
980 "tags": db_resource.tags or [],
981 "is_active": db_resource.is_active,
982 "last_modified": db_resource.updated_at.isoformat() if db_resource.updated_at else None,
983 }
985 exported_resources.append(resource_data)
987 return exported_resources
989 async def _export_selected_roots(self, root_uris: List[str]) -> List[Dict[str, Any]]:
990 """Export specific roots by their URIs.
992 Args:
993 root_uris: List of root URIs to export
995 Returns:
996 List of exported root dictionaries
997 """
998 all_roots = await self._export_roots()
999 return [r for r in all_roots if r["uri"] in root_uris]