Coverage for mcpgateway / services / export_service.py: 100%
398 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2# pylint: disable=import-outside-toplevel,no-name-in-module
3"""Location: ./mcpgateway/services/export_service.py
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
6Authors: Mihai Criveti
8Export Service Implementation.
9This module implements comprehensive configuration export functionality according to the export specification.
10It handles:
11- Entity collection from all entity types (Tools, Gateways, Servers, Prompts, Resources, Roots)
12- Secure authentication data encryption using AES-256-GCM
13- Dependency resolution and inclusion
14- Filtering by entity types, tags, and active/inactive status
15- Export format validation and schema compliance
16- Only exports locally configured entities (not federated content)
17"""
19# Standard
20from datetime import datetime, timezone
21import logging
22from typing import Any, cast, Dict, List, Optional, TypedDict
24# Third-Party
25from sqlalchemy import or_, select
26from sqlalchemy.orm import selectinload, Session
28# First-Party
29from mcpgateway.config import settings
30from mcpgateway.db import Gateway as DbGateway
31from mcpgateway.db import Prompt as DbPrompt
32from mcpgateway.db import Resource as DbResource
33from mcpgateway.db import Server as DbServer
34from mcpgateway.db import Tool as DbTool
35from mcpgateway.utils.services_auth import encode_auth
37# Service singletons are imported lazily in __init__ to avoid circular imports
39logger = logging.getLogger(__name__)
42class ExportError(Exception):
43 """Base class for export-related errors.
45 Examples:
46 >>> try:
47 ... raise ExportError("General export error")
48 ... except ExportError as e:
49 ... str(e)
50 'General export error'
51 >>> try:
52 ... raise ExportError("Export failed")
53 ... except Exception as e:
54 ... isinstance(e, ExportError)
55 True
56 """
59class ExportValidationError(ExportError):
60 """Raised when export data validation fails.
62 Examples:
63 >>> try:
64 ... raise ExportValidationError("Invalid export format")
65 ... except ExportValidationError as e:
66 ... str(e)
67 'Invalid export format'
68 >>> try:
69 ... raise ExportValidationError("Schema validation failed")
70 ... except ExportError as e:
71 ... isinstance(e, ExportError) # Should inherit from ExportError
72 True
73 >>> try:
74 ... raise ExportValidationError("Missing required field")
75 ... except Exception as e:
76 ... isinstance(e, ExportValidationError)
77 True
78 """
81class ExportService:
82 """Service for exporting ContextForge configuration and data.
84 This service provides comprehensive export functionality including:
85 - Collection of all entity types (tools, gateways, servers, prompts, resources, roots)
86 - Secure handling of authentication data with encryption
87 - Dependency resolution between entities
88 - Filtering options (by type, tags, status)
89 - Export format validation
91 The service only exports locally configured entities, excluding dynamic content
92 from federated sources to ensure exports contain only configuration data.
94 Examples:
95 >>> service = ExportService()
96 >>> hasattr(service, 'gateway_service')
97 True
98 >>> hasattr(service, 'tool_service')
99 True
100 >>> hasattr(service, 'resource_service')
101 True
102 >>> # Test entity type validation
103 >>> valid_types = ["tools", "gateways", "servers", "prompts", "resources", "roots"]
104 >>> "tools" in valid_types
105 True
106 >>> "invalid_type" in valid_types
107 False
108 >>> # Test filtering logic
109 >>> include_types = ["tools", "servers"]
110 >>> exclude_types = ["gateways"]
111 >>> "tools" in include_types and "tools" not in exclude_types
112 True
113 >>> "gateways" in include_types and "gateways" not in exclude_types
114 False
115 >>> # Test tag filtering
116 >>> entity_tags = ["production", "api"]
117 >>> filter_tags = ["production"]
118 >>> any(tag in entity_tags for tag in filter_tags)
119 True
120 >>> filter_tags = ["development"]
121 >>> any(tag in entity_tags for tag in filter_tags)
122 False
123 """
125 def __init__(self):
126 """Initialize the export service with required dependencies."""
127 # Use globally-exported singletons from service modules so they
128 # share initialized EventService/Redis clients created at app startup.
129 # Import lazily to avoid circular import at module load time.
130 # First-Party
131 from mcpgateway.services.gateway_service import gateway_service
132 from mcpgateway.services.prompt_service import prompt_service
133 from mcpgateway.services.resource_service import resource_service
134 from mcpgateway.services.root_service import root_service
135 from mcpgateway.services.server_service import server_service
136 from mcpgateway.services.tool_service import tool_service
138 self.gateway_service = gateway_service
139 self.tool_service = tool_service
140 self.resource_service = resource_service
141 self.prompt_service = prompt_service
142 self.server_service = server_service
143 self.root_service = root_service
145 async def initialize(self) -> None:
146 """Initialize the export service."""
147 logger.info("Export service initialized")
149 async def shutdown(self) -> None:
150 """Shutdown the export service."""
151 logger.info("Export service shutdown")
153 async def _fetch_all_tools(self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Any]:
154 """Fetch all tools by following pagination cursors with team scoping.
156 Args:
157 db: Database session
158 tags: Filter by tags
159 include_inactive: Include inactive tools
160 user_email: Requesting user's email for visibility filtering
161 token_teams: Token team scope for visibility filtering
163 Returns:
164 List of all tools across all pages
165 """
166 all_tools = []
167 cursor = None
168 while True:
169 tools, next_cursor = await self.tool_service.list_tools(db, tags=tags, include_inactive=include_inactive, cursor=cursor, user_email=user_email, token_teams=token_teams)
170 all_tools.extend(tools)
171 if not next_cursor:
172 break
173 cursor = next_cursor
174 return all_tools
176 async def _fetch_all_prompts(self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Any]:
177 """Fetch all prompts by following pagination cursors with team scoping.
179 Args:
180 db: Database session
181 tags: Filter by tags
182 include_inactive: Include inactive prompts
183 user_email: Requesting user's email for visibility filtering
184 token_teams: Token team scope for visibility filtering
186 Returns:
187 List of all prompts across all pages
188 """
189 all_prompts = []
190 cursor = None
191 while True:
192 prompts, next_cursor = await self.prompt_service.list_prompts(db, tags=tags, include_inactive=include_inactive, cursor=cursor, user_email=user_email, token_teams=token_teams)
193 all_prompts.extend(prompts)
194 if not next_cursor:
195 break
196 cursor = next_cursor
197 return all_prompts
199 async def _fetch_all_resources(self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Any]:
200 """Fetch all resources by following pagination cursors with team scoping.
202 Args:
203 db: Database session
204 tags: Filter by tags
205 include_inactive: Include inactive resources
206 user_email: Requesting user's email for visibility filtering
207 token_teams: Token team scope for visibility filtering
209 Returns:
210 List of all resources across all pages
211 """
212 all_resources = []
213 cursor = None
214 while True:
215 resources, next_cursor = await self.resource_service.list_resources(db, tags=tags, include_inactive=include_inactive, cursor=cursor, user_email=user_email, token_teams=token_teams)
216 all_resources.extend(resources)
217 if not next_cursor:
218 break
219 cursor = next_cursor
220 return all_resources
222 async def _fetch_all_gateways(self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Any]:
223 """Fetch all gateways by following pagination cursors with team scoping.
225 Args:
226 db: Database session
227 tags: Filter by tags
228 include_inactive: Include inactive gateways
229 user_email: Requesting user's email for visibility filtering
230 token_teams: Token team scope for visibility filtering
232 Returns:
233 List of all gateways across all pages
234 """
235 all_gateways = []
236 cursor = None
237 while True:
238 gateways, next_cursor = await self.gateway_service.list_gateways(db, tags=tags, include_inactive=include_inactive, cursor=cursor, user_email=user_email, token_teams=token_teams)
239 all_gateways.extend(gateways)
240 if not next_cursor:
241 break
242 cursor = next_cursor
243 return all_gateways
245 async def _fetch_all_servers(self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Any]:
246 """Fetch all servers by following pagination cursors with team scoping.
248 Args:
249 db: Database session
250 tags: Filter by tags
251 include_inactive: Include inactive servers
252 user_email: Requesting user's email for visibility filtering
253 token_teams: Token team scope for visibility filtering
255 Returns:
256 List of all servers across all pages
257 """
258 all_servers = []
259 cursor = None
260 while True:
261 servers, next_cursor = await self.server_service.list_servers(db, tags=tags, include_inactive=include_inactive, cursor=cursor, user_email=user_email, token_teams=token_teams)
262 all_servers.extend(servers)
263 if not next_cursor:
264 break
265 cursor = next_cursor
266 return all_servers
268 async def export_configuration(
269 self,
270 db: Session,
271 include_types: Optional[List[str]] = None,
272 exclude_types: Optional[List[str]] = None,
273 tags: Optional[List[str]] = None,
274 include_inactive: bool = False,
275 include_dependencies: bool = True,
276 exported_by: str = "system",
277 root_path: str = "",
278 user_email: Optional[str] = None,
279 token_teams: Optional[List[str]] = None,
280 ) -> Dict[str, Any]:
281 """Export complete gateway configuration to a standardized format.
283 Args:
284 db: Database session
285 include_types: List of entity types to include (tools, gateways, servers, prompts, resources, roots)
286 exclude_types: List of entity types to exclude
287 tags: Filter entities by tags (only export entities with these tags)
288 include_inactive: Whether to include inactive entities
289 include_dependencies: Whether to include dependent entities automatically
290 exported_by: Username of the person performing the export
291 root_path: Root path for constructing API endpoints
292 user_email: Requesting user's email for team-scoped visibility filtering
293 token_teams: Token team scope for visibility filtering (None=admin bypass, []=public-only)
295 Returns:
296 Dict containing the complete export data in the specified schema format
298 Raises:
299 ExportError: If export fails
300 ExportValidationError: If validation fails
301 """
302 try:
303 logger.info(f"Starting configuration export by {exported_by}")
305 # Determine which entity types to include
306 all_types = ["tools", "gateways", "servers", "prompts", "resources", "roots"]
307 if include_types:
308 entity_types = [t.lower() for t in include_types if t.lower() in all_types]
309 else:
310 entity_types = all_types
312 if exclude_types:
313 entity_types = [t for t in entity_types if t.lower() not in [e.lower() for e in exclude_types]]
315 class ExportOptions(TypedDict, total=False):
316 """Options that control export behavior (full export)."""
318 include_inactive: bool
319 include_dependencies: bool
320 selected_types: List[str]
321 filter_tags: List[str]
323 class ExportMetadata(TypedDict):
324 """Metadata for full export including counts, dependencies, and options."""
326 entity_counts: Dict[str, int]
327 dependencies: Dict[str, Any]
328 export_options: ExportOptions
330 class ExportData(TypedDict):
331 """Top-level full export payload shape."""
333 version: str
334 exported_at: str
335 exported_by: str
336 source_gateway: str
337 encryption_method: str
338 entities: Dict[str, List[Dict[str, Any]]]
339 metadata: ExportMetadata
341 entities: Dict[str, List[Dict[str, Any]]] = {}
342 metadata: ExportMetadata = {
343 "entity_counts": {},
344 "dependencies": {},
345 "export_options": {
346 "include_inactive": include_inactive,
347 "include_dependencies": include_dependencies,
348 "selected_types": entity_types,
349 "filter_tags": tags or [],
350 },
351 }
353 export_data: ExportData = {
354 "version": settings.protocol_version,
355 "exported_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
356 "exported_by": exported_by,
357 "source_gateway": f"http://{settings.host}:{settings.port}",
358 "encryption_method": "AES-256-GCM",
359 "entities": entities,
360 "metadata": metadata,
361 }
363 # Export each entity type (with team-scoped visibility filtering)
364 if "tools" in entity_types:
365 export_data["entities"]["tools"] = await self._export_tools(db, tags, include_inactive, user_email=user_email, token_teams=token_teams)
367 if "gateways" in entity_types:
368 export_data["entities"]["gateways"] = await self._export_gateways(db, tags, include_inactive, user_email=user_email, token_teams=token_teams)
370 if "servers" in entity_types:
371 export_data["entities"]["servers"] = await self._export_servers(db, tags, include_inactive, root_path, user_email=user_email, token_teams=token_teams)
373 if "prompts" in entity_types:
374 export_data["entities"]["prompts"] = await self._export_prompts(db, tags, include_inactive, user_email=user_email, token_teams=token_teams)
376 if "resources" in entity_types:
377 export_data["entities"]["resources"] = await self._export_resources(db, tags, include_inactive, user_email=user_email, token_teams=token_teams)
379 if "roots" in entity_types:
380 export_data["entities"]["roots"] = await self._export_roots()
382 # Add dependency information
383 if include_dependencies:
384 export_data["metadata"]["dependencies"] = await self._extract_dependencies(db, export_data["entities"])
386 # Calculate entity counts
387 for entity_type, entities_list in export_data["entities"].items():
388 export_data["metadata"]["entity_counts"][entity_type] = len(entities_list)
390 # Validate export data
391 self._validate_export_data(cast(Dict[str, Any], export_data))
393 logger.info(f"Export completed successfully with {sum(export_data['metadata']['entity_counts'].values())} total entities")
394 return cast(Dict[str, Any], export_data)
396 except Exception as e:
397 logger.error(f"Export failed: {str(e)}")
398 raise ExportError(f"Failed to export configuration: {str(e)}")
400 async def _export_tools(self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Dict[str, Any]]:
401 """Export tools with encrypted authentication data.
403 Uses batch queries to fetch auth data efficiently, avoiding N+1 query patterns.
405 Args:
406 db: Database session
407 tags: Filter by tags
408 include_inactive: Include inactive tools
409 user_email: Requesting user's email for visibility filtering
410 token_teams: Token team scope for visibility filtering
412 Returns:
413 List of exported tool dictionaries
414 """
415 # Fetch all tools across all pages (with team-scoped visibility)
416 tools = await self._fetch_all_tools(db, tags, include_inactive, user_email=user_email, token_teams=token_teams)
418 # Filter to only exportable tools (local REST tools, not MCP tools from gateways)
419 exportable_tools = [t for t in tools if not (t.integration_type == "MCP" and t.gateway_id)]
421 # Batch fetch auth data for tools with masked values (single query instead of N queries)
422 tool_ids_needing_auth = [
423 tool.id for tool in exportable_tools if hasattr(tool, "auth") and tool.auth and hasattr(tool.auth, "auth_value") and tool.auth.auth_value == settings.masked_auth_value
424 ]
426 auth_data_map: Dict[Any, tuple] = {}
427 if tool_ids_needing_auth:
428 db_tools_with_auth = db.execute(select(DbTool.id, DbTool.auth_type, DbTool.auth_value).where(DbTool.id.in_(tool_ids_needing_auth))).all()
429 auth_data_map = {row[0]: (row[1], row[2]) for row in db_tools_with_auth}
431 exported_tools = []
432 for tool in exportable_tools:
433 tool_data = {
434 "name": tool.original_name, # Use original name, not the slugified version
435 "displayName": tool.displayName, # Export displayName field from ToolRead
436 "url": str(tool.url),
437 "integration_type": tool.integration_type,
438 "request_type": tool.request_type,
439 "description": tool.description,
440 "original_description": tool.original_description,
441 "headers": tool.headers or {},
442 "input_schema": tool.input_schema or {"type": "object", "properties": {}},
443 "output_schema": tool.output_schema,
444 "annotations": tool.annotations or {},
445 "jsonpath_filter": tool.jsonpath_filter,
446 "tags": tool.tags or [],
447 "rate_limit": getattr(tool, "rate_limit", None),
448 "timeout": getattr(tool, "timeout", None),
449 "is_active": tool.enabled,
450 "created_at": tool.created_at.isoformat() if hasattr(tool.created_at, "isoformat") and tool.created_at else None,
451 "updated_at": tool.updated_at.isoformat() if hasattr(tool.updated_at, "isoformat") and tool.updated_at else None,
452 }
454 # Handle authentication data securely - use batch-fetched values
455 if hasattr(tool, "auth") and tool.auth:
456 auth = tool.auth
457 if hasattr(auth, "auth_type") and hasattr(auth, "auth_value"):
458 if auth.auth_value == settings.masked_auth_value:
459 # Use batch-fetched auth data
460 if tool.id in auth_data_map:
461 auth_type, auth_value = auth_data_map[tool.id]
462 if auth_value:
463 tool_data["auth_type"] = auth_type
464 tool_data["auth_value"] = auth_value
465 else:
466 # Auth value is not masked, use as-is
467 tool_data["auth_type"] = auth.auth_type
468 tool_data["auth_value"] = auth.auth_value
470 exported_tools.append(tool_data)
472 return exported_tools
474 async def _export_gateways(self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Dict[str, Any]]:
475 """Export gateways with encrypted authentication data.
477 Uses batch queries to fetch auth data efficiently, avoiding N+1 query patterns.
479 Args:
480 db: Database session
481 tags: Filter by tags
482 include_inactive: Include inactive gateways
483 user_email: Requesting user's email for visibility filtering
484 token_teams: Token team scope for visibility filtering
486 Returns:
487 List of exported gateway dictionaries
488 """
489 # Fetch all gateways across all pages (with team-scoped visibility)
490 gateways = await self._fetch_all_gateways(db, tags, include_inactive, user_email=user_email, token_teams=token_teams)
492 # Batch fetch auth data for gateways with masked values (single query instead of N queries)
493 gateway_ids_needing_auth = [g.id for g in gateways if g.auth_type and g.auth_value == settings.masked_auth_value]
495 auth_data_map: Dict[Any, tuple] = {}
496 if gateway_ids_needing_auth:
497 db_gateways_with_auth = db.execute(select(DbGateway.id, DbGateway.auth_type, DbGateway.auth_value).where(DbGateway.id.in_(gateway_ids_needing_auth))).all()
498 auth_data_map = {row[0]: (row[1], row[2]) for row in db_gateways_with_auth}
500 exported_gateways = []
501 for gateway in gateways:
502 gateway_data = {
503 "name": gateway.name,
504 "url": str(gateway.url),
505 "description": gateway.description,
506 "transport": gateway.transport,
507 "capabilities": gateway.capabilities or {},
508 "health_check": {"url": f"{gateway.url}/health", "interval": 30, "timeout": 10, "retries": 3},
509 "is_active": gateway.enabled,
510 "tags": gateway.tags or [],
511 "passthrough_headers": gateway.passthrough_headers or [],
512 }
514 # Handle authentication data securely - use batch-fetched values
515 if gateway.auth_type and gateway.auth_value:
516 if gateway.auth_value == settings.masked_auth_value:
517 # Use batch-fetched auth data
518 if gateway.id in auth_data_map:
519 auth_type, auth_value = auth_data_map[gateway.id]
520 if auth_value:
521 gateway_data["auth_type"] = auth_type
522 # DbGateway.auth_value is JSON (dict); export format expects encoded string.
523 gateway_data["auth_value"] = encode_auth(auth_value) if isinstance(auth_value, dict) else auth_value
524 else:
525 # Auth value is not masked, use as-is
526 gateway_data["auth_type"] = gateway.auth_type
527 gateway_data["auth_value"] = gateway.auth_value
529 exported_gateways.append(gateway_data)
531 return exported_gateways
533 async def _export_servers(
534 self, db: Session, tags: Optional[List[str]], include_inactive: bool, root_path: str = "", user_email: Optional[str] = None, token_teams: Optional[List[str]] = None
535 ) -> List[Dict[str, Any]]:
536 """Export virtual servers with their tool associations.
538 Args:
539 db: Database session
540 tags: Filter by tags
541 include_inactive: Include inactive servers
542 root_path: Root path for constructing API endpoints
543 user_email: Requesting user's email for visibility filtering
544 token_teams: Token team scope for visibility filtering
546 Returns:
547 List of exported server dictionaries
548 """
549 # Fetch all servers across all pages (with team-scoped visibility)
550 servers = await self._fetch_all_servers(db, tags, include_inactive, user_email=user_email, token_teams=token_teams)
551 exported_servers = []
553 for server in servers:
554 server_data = {
555 "name": server.name,
556 "description": server.description,
557 "tool_ids": list(server.associated_tools),
558 "sse_endpoint": f"{root_path}/servers/{server.id}/sse",
559 "websocket_endpoint": f"{root_path}/servers/{server.id}/ws",
560 "jsonrpc_endpoint": f"{root_path}/servers/{server.id}/jsonrpc",
561 "capabilities": {"tools": {"list_changed": True}, "prompts": {"list_changed": True}},
562 "is_active": getattr(server, "enabled", getattr(server, "is_active", False)),
563 "tags": server.tags or [],
564 }
566 exported_servers.append(server_data)
568 return exported_servers
570 async def _export_prompts(self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Dict[str, Any]]:
571 """Export prompts with their templates and schemas.
573 Args:
574 db: Database session
575 tags: Filter by tags
576 include_inactive: Include inactive prompts
577 user_email: Requesting user's email for visibility filtering
578 token_teams: Token team scope for visibility filtering
580 Returns:
581 List of exported prompt dictionaries
582 """
583 # Fetch all prompts across all pages (with team-scoped visibility)
584 prompts = await self._fetch_all_prompts(db, tags, include_inactive, user_email=user_email, token_teams=token_teams)
585 exported_prompts = []
587 for prompt in prompts:
588 input_schema: Dict[str, Any] = {"type": "object", "properties": {}, "required": []}
589 prompt_data: Dict[str, Any] = {
590 "name": getattr(prompt, "original_name", None) or prompt.name,
591 "original_name": getattr(prompt, "original_name", None) or prompt.name,
592 "custom_name": getattr(prompt, "custom_name", None) or getattr(prompt, "original_name", None) or prompt.name,
593 "display_name": getattr(prompt, "display_name", None) or getattr(prompt, "custom_name", None) or getattr(prompt, "original_name", None) or prompt.name,
594 "template": prompt.template,
595 "description": prompt.description,
596 "input_schema": input_schema,
597 "tags": prompt.tags or [],
598 # Use the new `enabled` attribute on prompt objects but keep export key `is_active` for compatibility
599 "is_active": getattr(prompt, "enabled", getattr(prompt, "is_active", False)),
600 }
602 # Convert arguments to input schema format
603 if prompt.arguments:
604 properties: Dict[str, Any] = {}
605 required = []
606 for arg in prompt.arguments:
607 properties[arg.name] = {"type": "string", "description": arg.description or ""}
608 if arg.required:
609 required.append(arg.name)
610 input_schema["properties"] = properties
611 input_schema["required"] = required
613 exported_prompts.append(prompt_data)
615 return exported_prompts
617 async def _export_resources(
618 self, db: Session, tags: Optional[List[str]], include_inactive: bool, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None
619 ) -> List[Dict[str, Any]]:
620 """Export resources with their content metadata.
622 Args:
623 db: Database session
624 tags: Filter by tags
625 include_inactive: Include inactive resources
626 user_email: Requesting user's email for visibility filtering
627 token_teams: Token team scope for visibility filtering
629 Returns:
630 List of exported resource dictionaries
631 """
632 # Fetch all resources across all pages (with team-scoped visibility)
633 resources = await self._fetch_all_resources(db, tags, include_inactive, user_email=user_email, token_teams=token_teams)
634 exported_resources = []
636 for resource in resources:
637 resource_data = {
638 "name": resource.name,
639 "uri": resource.uri,
640 "description": resource.description,
641 "mime_type": resource.mime_type,
642 "tags": resource.tags or [],
643 "is_active": getattr(resource, "enabled", getattr(resource, "is_active", False)),
644 "last_modified": resource.updated_at.isoformat() if resource.updated_at else None,
645 }
647 exported_resources.append(resource_data)
649 return exported_resources
651 async def _export_roots(self) -> List[Dict[str, Any]]:
652 """Export filesystem roots.
654 Returns:
655 List of exported root dictionaries
656 """
657 roots = await self.root_service.list_roots()
658 exported_roots = []
660 for root in roots:
661 root_data = {"uri": str(root.uri), "name": root.name}
662 exported_roots.append(root_data)
664 return exported_roots
666 async def _extract_dependencies(self, db: Session, entities: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]: # pylint: disable=unused-argument
667 """Extract dependency relationships between entities.
669 Args:
670 db: Database session
671 entities: Dictionary of exported entities
673 Returns:
674 Dictionary containing dependency mappings
675 """
676 dependencies = {"servers_to_tools": {}, "servers_to_resources": {}, "servers_to_prompts": {}}
678 # Extract server-to-tool dependencies
679 if "servers" in entities and "tools" in entities:
680 for server in entities["servers"]:
681 if server.get("tool_ids"):
682 dependencies["servers_to_tools"][server["name"]] = server["tool_ids"]
684 return dependencies
686 def _validate_export_data(self, export_data: Dict[str, Any]) -> None:
687 """Validate export data against the schema.
689 Args:
690 export_data: The export data to validate
692 Raises:
693 ExportValidationError: If validation fails
694 """
695 required_fields = ["version", "exported_at", "exported_by", "entities", "metadata"]
697 for field in required_fields:
698 if field not in export_data:
699 raise ExportValidationError(f"Missing required field: {field}")
701 # Validate version format
702 if not export_data["version"]:
703 raise ExportValidationError("Version cannot be empty")
705 # Validate entities structure
706 if not isinstance(export_data["entities"], dict):
707 raise ExportValidationError("Entities must be a dictionary")
709 # Validate metadata structure
710 metadata = export_data["metadata"]
711 if not isinstance(metadata.get("entity_counts"), dict):
712 raise ExportValidationError("Metadata entity_counts must be a dictionary")
714 logger.debug("Export data validation passed")
716 async def export_selective(
717 self,
718 db: Session,
719 entity_selections: Dict[str, List[str]],
720 include_dependencies: bool = True,
721 exported_by: str = "system",
722 root_path: str = "",
723 user_email: Optional[str] = None,
724 token_teams: Optional[List[str]] = None,
725 ) -> Dict[str, Any]:
726 """Export specific entities by their IDs/names.
728 Args:
729 db: Database session
730 entity_selections: Dict mapping entity types to lists of IDs/names to export
731 include_dependencies: Whether to include dependent entities
732 exported_by: Username of the person performing the export
733 root_path: Root path for constructing API endpoints
734 user_email: Requesting user's email for team-scoped visibility filtering
735 token_teams: Token team scope for visibility filtering (None=admin bypass, []=public-only)
737 Returns:
738 Dict containing the selective export data
740 Example:
741 entity_selections = {
742 "tools": ["tool1", "tool2"],
743 "servers": ["server1"],
744 "prompts": ["prompt1"]
745 }
746 """
747 logger.info(f"Starting selective export by {exported_by}")
749 class SelExportOptions(TypedDict, total=False):
750 """Options that control behavior for selective export."""
752 selective: bool
753 include_dependencies: bool
754 selections: Dict[str, List[str]]
756 class SelExportMetadata(TypedDict):
757 """Metadata for selective export including counts, dependencies, and options."""
759 entity_counts: Dict[str, int]
760 dependencies: Dict[str, Any]
761 export_options: SelExportOptions
763 class SelExportData(TypedDict):
764 """Top-level selective export payload shape."""
766 version: str
767 exported_at: str
768 exported_by: str
769 source_gateway: str
770 encryption_method: str
771 entities: Dict[str, List[Dict[str, Any]]]
772 metadata: SelExportMetadata
774 sel_entities: Dict[str, List[Dict[str, Any]]] = {}
775 sel_metadata: SelExportMetadata = {
776 "entity_counts": {},
777 "dependencies": {},
778 "export_options": {"selective": True, "include_dependencies": include_dependencies, "selections": entity_selections},
779 }
780 export_data: SelExportData = {
781 "version": settings.protocol_version,
782 "exported_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
783 "exported_by": exported_by,
784 "source_gateway": f"http://{settings.host}:{settings.port}",
785 "encryption_method": "AES-256-GCM",
786 "entities": sel_entities,
787 "metadata": sel_metadata,
788 }
790 # Export selected entities for each type
791 for entity_type, selected_ids in entity_selections.items():
792 if entity_type == "tools":
793 export_data["entities"]["tools"] = await self._export_selected_tools(db, selected_ids, user_email=user_email, token_teams=token_teams)
794 elif entity_type == "gateways":
795 export_data["entities"]["gateways"] = await self._export_selected_gateways(db, selected_ids, user_email=user_email, token_teams=token_teams)
796 elif entity_type == "servers":
797 export_data["entities"]["servers"] = await self._export_selected_servers(db, selected_ids, root_path, user_email=user_email, token_teams=token_teams)
798 elif entity_type == "prompts":
799 export_data["entities"]["prompts"] = await self._export_selected_prompts(db, selected_ids, user_email=user_email, token_teams=token_teams)
800 elif entity_type == "resources":
801 export_data["entities"]["resources"] = await self._export_selected_resources(db, selected_ids, user_email=user_email, token_teams=token_teams)
802 elif entity_type == "roots":
803 export_data["entities"]["roots"] = await self._export_selected_roots(selected_ids)
805 # Add dependencies if requested
806 if include_dependencies:
807 export_data["metadata"]["dependencies"] = await self._extract_dependencies(db, export_data["entities"])
809 # Calculate entity counts
810 for entity_type, entities_list in export_data["entities"].items():
811 export_data["metadata"]["entity_counts"][entity_type] = len(entities_list)
813 self._validate_export_data(cast(Dict[str, Any], export_data))
815 logger.info(f"Selective export completed with {sum(export_data['metadata']['entity_counts'].values())} entities")
816 return cast(Dict[str, Any], export_data)
818 @staticmethod
819 def _is_scoped_selective_export(user_email: Optional[str], token_teams: Optional[List[str]]) -> bool:
820 """Return whether selective export should apply visibility filtering.
822 Args:
823 user_email: Requesting user's email.
824 token_teams: Token team scope for visibility filtering.
826 Returns:
827 ``True`` when selective export should apply team/public filtering.
828 """
829 return user_email is not None or token_teams is not None
831 async def _export_selected_tools(self, db: Session, tool_ids: List[str], user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Dict[str, Any]]:
832 """Export specific tools by their IDs using batch queries.
834 Uses a single batch query instead of fetching all tools N times.
836 Args:
837 db: Database session
838 tool_ids: List of tool IDs to export
839 user_email: Requesting user's email for visibility filtering
840 token_teams: Token team scope for visibility filtering
842 Returns:
843 List of exported tool dictionaries
844 """
845 if not tool_ids:
846 return []
848 visible_tool_ids: Optional[set[str]] = None
849 if self._is_scoped_selective_export(user_email, token_teams):
850 visible_tools = await self._fetch_all_tools(db, tags=None, include_inactive=True, user_email=user_email, token_teams=token_teams)
851 visible_tool_ids = {str(tool.id) for tool in visible_tools}
852 tool_ids = [tool_id for tool_id in tool_ids if tool_id in visible_tool_ids]
853 if not tool_ids:
854 return []
856 # Batch query for selected tools only
857 db_tools = db.execute(select(DbTool).where(DbTool.id.in_(tool_ids))).scalars().all()
858 if visible_tool_ids is not None:
859 db_tools = [db_tool for db_tool in db_tools if str(db_tool.id) in visible_tool_ids]
861 exported_tools = []
862 for db_tool in db_tools:
863 # Only export local REST tools, not MCP tools from gateways
864 if db_tool.integration_type == "MCP" and db_tool.gateway_id:
865 continue
867 tool_data = {
868 "name": db_tool.original_name or db_tool.custom_name,
869 "displayName": db_tool.display_name,
870 "url": str(db_tool.url) if db_tool.url else None,
871 "integration_type": db_tool.integration_type,
872 "request_type": db_tool.request_type,
873 "description": db_tool.description,
874 "headers": db_tool.headers or {},
875 "input_schema": db_tool.input_schema or {"type": "object", "properties": {}},
876 "output_schema": db_tool.output_schema,
877 "annotations": db_tool.annotations or {},
878 "jsonpath_filter": db_tool.jsonpath_filter,
879 "tags": db_tool.tags or [],
880 "rate_limit": getattr(db_tool, "rate_limit", None),
881 "timeout": getattr(db_tool, "timeout", None),
882 "is_active": db_tool.enabled,
883 "created_at": db_tool.created_at.isoformat() if db_tool.created_at else None,
884 "updated_at": db_tool.updated_at.isoformat() if db_tool.updated_at else None,
885 }
887 # Include auth data directly from DB (already have raw values)
888 if db_tool.auth_type and db_tool.auth_value:
889 tool_data["auth_type"] = db_tool.auth_type
890 tool_data["auth_value"] = db_tool.auth_value
892 exported_tools.append(tool_data)
894 return exported_tools
896 async def _export_selected_gateways(self, db: Session, gateway_ids: List[str], user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Dict[str, Any]]:
897 """Export specific gateways by their IDs using batch queries.
899 Uses a single batch query instead of fetching all gateways N times.
901 Args:
902 db: Database session
903 gateway_ids: List of gateway IDs to export
904 user_email: Requesting user's email for visibility filtering
905 token_teams: Token team scope for visibility filtering
907 Returns:
908 List of exported gateway dictionaries
909 """
910 if not gateway_ids:
911 return []
913 visible_gateway_ids: Optional[set[str]] = None
914 if self._is_scoped_selective_export(user_email, token_teams):
915 visible_gateways = await self._fetch_all_gateways(db, tags=None, include_inactive=True, user_email=user_email, token_teams=token_teams)
916 visible_gateway_ids = {str(gateway.id) for gateway in visible_gateways}
917 gateway_ids = [gateway_id for gateway_id in gateway_ids if gateway_id in visible_gateway_ids]
918 if not gateway_ids:
919 return []
921 # Batch query for selected gateways only
922 db_gateways = db.execute(select(DbGateway).where(DbGateway.id.in_(gateway_ids))).scalars().all()
923 if visible_gateway_ids is not None:
924 db_gateways = [db_gateway for db_gateway in db_gateways if str(db_gateway.id) in visible_gateway_ids]
926 exported_gateways = []
927 for db_gateway in db_gateways:
928 gateway_data = {
929 "name": db_gateway.name,
930 "url": str(db_gateway.url) if db_gateway.url else None,
931 "description": db_gateway.description,
932 "transport": db_gateway.transport,
933 "capabilities": db_gateway.capabilities or {},
934 "health_check": {"url": f"{db_gateway.url}/health", "interval": 30, "timeout": 10, "retries": 3},
935 "is_active": db_gateway.enabled,
936 "tags": db_gateway.tags or [],
937 "passthrough_headers": db_gateway.passthrough_headers or [],
938 }
940 # Include auth data directly from DB (already have raw values)
941 if db_gateway.auth_type:
942 gateway_data["auth_type"] = db_gateway.auth_type
943 if db_gateway.auth_value:
944 # DbGateway.auth_value is JSON (dict); export format expects an encoded string.
945 raw = db_gateway.auth_value
946 gateway_data["auth_value"] = encode_auth(raw) if isinstance(raw, dict) else raw
947 # Include query param auth if present
948 if db_gateway.auth_type == "query_param" and getattr(db_gateway, "auth_query_params", None):
949 gateway_data["auth_query_params"] = db_gateway.auth_query_params
951 exported_gateways.append(gateway_data)
953 return exported_gateways
955 async def _export_selected_servers(
956 self, db: Session, server_ids: List[str], root_path: str = "", user_email: Optional[str] = None, token_teams: Optional[List[str]] = None
957 ) -> List[Dict[str, Any]]:
958 """Export specific servers by their IDs using batch queries.
960 Uses a single batch query instead of fetching all servers N times.
962 Args:
963 db: Database session
964 server_ids: List of server IDs to export
965 root_path: Root path for constructing API endpoints
966 user_email: Requesting user's email for visibility filtering
967 token_teams: Token team scope for visibility filtering
969 Returns:
970 List of exported server dictionaries
971 """
972 if not server_ids:
973 return []
975 visible_server_ids: Optional[set[str]] = None
976 if self._is_scoped_selective_export(user_email, token_teams):
977 visible_servers = await self._fetch_all_servers(db, tags=None, include_inactive=True, user_email=user_email, token_teams=token_teams)
978 visible_server_ids = {str(server.id) for server in visible_servers}
979 server_ids = [server_id for server_id in server_ids if server_id in visible_server_ids]
980 if not server_ids:
981 return []
983 # Batch query for selected servers with eager loading to avoid N+1 queries
984 db_servers = db.execute(select(DbServer).options(selectinload(DbServer.tools)).where(DbServer.id.in_(server_ids))).scalars().all()
985 if visible_server_ids is not None:
986 db_servers = [db_server for db_server in db_servers if str(db_server.id) in visible_server_ids]
988 exported_servers = []
989 for db_server in db_servers:
990 # Get associated tool IDs (tools are eagerly loaded)
991 tool_ids = [str(tool.id) for tool in db_server.tools] if db_server.tools else []
993 server_data = {
994 "name": db_server.name,
995 "description": db_server.description,
996 "tool_ids": tool_ids,
997 "sse_endpoint": f"{root_path}/servers/{db_server.id}/sse",
998 "websocket_endpoint": f"{root_path}/servers/{db_server.id}/ws",
999 "jsonrpc_endpoint": f"{root_path}/servers/{db_server.id}/jsonrpc",
1000 "capabilities": {"tools": {"list_changed": True}, "prompts": {"list_changed": True}},
1001 "is_active": db_server.enabled,
1002 "tags": db_server.tags or [],
1003 }
1005 exported_servers.append(server_data)
1007 return exported_servers
1009 async def _export_selected_prompts(self, db: Session, prompt_names: List[str], user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Dict[str, Any]]:
1010 """Export specific prompts by their identifiers using batch queries.
1012 Uses a single batch query instead of fetching all prompts N times.
1014 Args:
1015 db: Database session
1016 prompt_names: List of prompt IDs or names to export
1017 user_email: Requesting user's email for visibility filtering
1018 token_teams: Token team scope for visibility filtering
1020 Returns:
1021 List of exported prompt dictionaries
1022 """
1023 if not prompt_names:
1024 return []
1026 visible_prompt_identifiers: Optional[set[str]] = None
1027 if self._is_scoped_selective_export(user_email, token_teams):
1028 visible_prompts = await self._fetch_all_prompts(db, tags=None, include_inactive=True, user_email=user_email, token_teams=token_teams)
1029 visible_prompt_identifiers = set()
1030 for prompt in visible_prompts:
1031 visible_prompt_identifiers.add(str(prompt.id))
1032 if getattr(prompt, "name", None):
1033 visible_prompt_identifiers.add(prompt.name)
1034 if getattr(prompt, "original_name", None):
1035 visible_prompt_identifiers.add(prompt.original_name)
1036 if getattr(prompt, "custom_name", None):
1037 visible_prompt_identifiers.add(prompt.custom_name)
1038 prompt_names = [prompt_name for prompt_name in prompt_names if prompt_name in visible_prompt_identifiers]
1039 if not prompt_names:
1040 return []
1042 # Batch query for selected prompts only
1043 db_prompts = db.execute(select(DbPrompt).where(or_(DbPrompt.id.in_(prompt_names), DbPrompt.name.in_(prompt_names)))).scalars().all()
1044 if visible_prompt_identifiers is not None:
1045 db_prompts = [
1046 db_prompt
1047 for db_prompt in db_prompts
1048 if str(db_prompt.id) in visible_prompt_identifiers
1049 or (getattr(db_prompt, "name", None) in visible_prompt_identifiers)
1050 or (getattr(db_prompt, "original_name", None) in visible_prompt_identifiers)
1051 or (getattr(db_prompt, "custom_name", None) in visible_prompt_identifiers)
1052 ]
1054 exported_prompts = []
1055 for db_prompt in db_prompts:
1056 # Build input schema from argument_schema
1057 input_schema: Dict[str, Any] = {"type": "object", "properties": {}, "required": []}
1058 if db_prompt.argument_schema:
1059 input_schema = db_prompt.argument_schema
1061 prompt_data: Dict[str, Any] = {
1062 "name": db_prompt.original_name or db_prompt.name,
1063 "original_name": db_prompt.original_name or db_prompt.name,
1064 "custom_name": db_prompt.custom_name or db_prompt.original_name or db_prompt.name,
1065 "display_name": db_prompt.display_name or db_prompt.custom_name or db_prompt.original_name or db_prompt.name,
1066 "template": db_prompt.template,
1067 "description": db_prompt.description,
1068 "input_schema": input_schema,
1069 "tags": db_prompt.tags or [],
1070 "is_active": getattr(db_prompt, "enabled", getattr(db_prompt, "is_active", False)),
1071 }
1073 exported_prompts.append(prompt_data)
1075 return exported_prompts
1077 async def _export_selected_resources(self, db: Session, resource_uris: List[str], user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> List[Dict[str, Any]]:
1078 """Export specific resources by their URIs using batch queries.
1080 Uses a single batch query instead of fetching all resources N times.
1082 Args:
1083 db: Database session
1084 resource_uris: List of resource URIs to export
1085 user_email: Requesting user's email for visibility filtering
1086 token_teams: Token team scope for visibility filtering
1088 Returns:
1089 List of exported resource dictionaries
1090 """
1091 if not resource_uris:
1092 return []
1094 visible_resource_uris: Optional[set[str]] = None
1095 if self._is_scoped_selective_export(user_email, token_teams):
1096 visible_resources = await self._fetch_all_resources(db, tags=None, include_inactive=True, user_email=user_email, token_teams=token_teams)
1097 visible_resource_uris = {resource.uri for resource in visible_resources}
1098 resource_uris = [resource_uri for resource_uri in resource_uris if resource_uri in visible_resource_uris]
1099 if not resource_uris:
1100 return []
1102 # Batch query for selected resources only
1103 db_resources = db.execute(select(DbResource).where(DbResource.uri.in_(resource_uris))).scalars().all()
1104 if visible_resource_uris is not None:
1105 db_resources = [db_resource for db_resource in db_resources if db_resource.uri in visible_resource_uris]
1107 exported_resources = []
1108 for db_resource in db_resources:
1109 resource_data = {
1110 "name": db_resource.name,
1111 "uri": db_resource.uri,
1112 "description": db_resource.description,
1113 "mime_type": db_resource.mime_type,
1114 "tags": db_resource.tags or [],
1115 "is_active": db_resource.enabled,
1116 "last_modified": db_resource.updated_at.isoformat() if db_resource.updated_at else None,
1117 }
1119 exported_resources.append(resource_data)
1121 return exported_resources
1123 async def _export_selected_roots(self, root_uris: List[str]) -> List[Dict[str, Any]]:
1124 """Export specific roots by their URIs.
1126 Args:
1127 root_uris: List of root URIs to export
1129 Returns:
1130 List of exported root dictionaries
1131 """
1132 all_roots = await self._export_roots()
1133 return [r for r in all_roots if r["uri"] in root_uris]