Coverage for mcpgateway / services / import_service.py: 97%
800 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2# pylint: disable=import-outside-toplevel,no-name-in-module
3"""Location: ./mcpgateway/services/import_service.py
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
6Authors: Mihai Criveti
8Import Service Implementation.
9This module implements comprehensive configuration import functionality according to the import specification.
10It handles:
11- Import file validation and schema compliance
12- Entity creation and updates with conflict resolution
13- Dependency resolution and processing order
14- Authentication data decryption and re-encryption
15- Dry-run functionality for validation
16- Cross-environment key rotation support
17- Import status tracking and progress reporting
18"""
20# Standard
21import base64
22from datetime import datetime, timedelta, timezone
23from enum import Enum
24import logging
25from typing import Any, Dict, List, Optional
26import uuid
28# Third-Party
29from sqlalchemy.orm import Session
31# First-Party
32from mcpgateway.config import settings
33from mcpgateway.db import A2AAgent, EmailUser, Gateway, Prompt, Resource, Server, Tool
34from mcpgateway.schemas import AuthenticationValues, GatewayCreate, GatewayUpdate, PromptCreate, PromptUpdate, ResourceCreate, ResourceUpdate, ServerCreate, ServerUpdate, ToolCreate, ToolUpdate
35from mcpgateway.services.gateway_service import GatewayNameConflictError
36from mcpgateway.services.prompt_service import PromptNameConflictError
37from mcpgateway.services.resource_service import ResourceURIConflictError
38from mcpgateway.services.server_service import ServerNameConflictError
39from mcpgateway.services.tool_service import ToolNameConflictError
40from mcpgateway.utils.services_auth import decode_auth, encode_auth
42logger = logging.getLogger(__name__)
45class ConflictStrategy(str, Enum):
46 """Strategies for handling conflicts during import.
48 Examples:
49 >>> ConflictStrategy.SKIP.value
50 'skip'
51 >>> ConflictStrategy.UPDATE.value
52 'update'
53 >>> ConflictStrategy.RENAME.value
54 'rename'
55 >>> ConflictStrategy.FAIL.value
56 'fail'
57 >>> ConflictStrategy("update")
58 <ConflictStrategy.UPDATE: 'update'>
59 """
61 SKIP = "skip"
62 UPDATE = "update"
63 RENAME = "rename"
64 FAIL = "fail"
67class ImportError(Exception): # pylint: disable=redefined-builtin
68 """Base class for import-related errors.
70 Examples:
71 >>> error = ImportError("Something went wrong")
72 >>> str(error)
73 'Something went wrong'
74 >>> isinstance(error, Exception)
75 True
76 """
79class ImportValidationError(ImportError):
80 """Raised when import data validation fails.
82 Examples:
83 >>> error = ImportValidationError("Invalid schema")
84 >>> str(error)
85 'Invalid schema'
86 >>> isinstance(error, ImportError)
87 True
88 """
91class ImportConflictError(ImportError):
92 """Raised when import conflicts cannot be resolved.
94 Examples:
95 >>> error = ImportConflictError("Name conflict: tool_name")
96 >>> str(error)
97 'Name conflict: tool_name'
98 >>> isinstance(error, ImportError)
99 True
100 """
103class ImportStatus:
104 """Tracks the status of an import operation."""
106 def __init__(self, import_id: str):
107 """Initialize import status tracking.
109 Args:
110 import_id: Unique identifier for the import operation
112 Examples:
113 >>> status = ImportStatus("import_123")
114 >>> status.import_id
115 'import_123'
116 >>> status.status
117 'pending'
118 >>> status.total_entities
119 0
120 """
121 self.import_id = import_id
122 self.status = "pending"
123 self.total_entities = 0
124 self.processed_entities = 0
125 self.created_entities = 0
126 self.updated_entities = 0
127 self.skipped_entities = 0
128 self.failed_entities = 0
129 self.errors: List[str] = []
130 self.warnings: List[str] = []
131 self.started_at = datetime.now(timezone.utc)
132 self.completed_at: Optional[datetime] = None
134 def to_dict(self) -> Dict[str, Any]:
135 """Convert status to dictionary for API responses.
137 Returns:
138 Dictionary representation of import status
139 """
140 return {
141 "import_id": self.import_id,
142 "status": self.status,
143 "progress": {
144 "total": self.total_entities,
145 "processed": self.processed_entities,
146 "created": self.created_entities,
147 "updated": self.updated_entities,
148 "skipped": self.skipped_entities,
149 "failed": self.failed_entities,
150 },
151 "errors": self.errors,
152 "warnings": self.warnings,
153 "started_at": self.started_at.isoformat(),
154 "completed_at": self.completed_at.isoformat() if self.completed_at else None,
155 }
158class ImportService:
159 """Service for importing MCP Gateway configuration and data.
161 This service provides comprehensive import functionality including:
162 - Import file validation and schema compliance
163 - Entity creation and updates with conflict resolution
164 - Dependency resolution and correct processing order
165 - Secure authentication data handling with re-encryption
166 - Dry-run capabilities for validation without changes
167 - Progress tracking and status reporting
168 - Cross-environment key rotation support
169 """
171 def __init__(self):
172 """Initialize the import service with required dependencies.
174 Creates instances of all entity services and initializes the active imports tracker.
176 Examples:
177 >>> service = ImportService()
178 >>> service.active_imports
179 {}
180 >>> hasattr(service, 'tool_service')
181 True
182 >>> hasattr(service, 'gateway_service')
183 True
184 """
185 # Prefer globally-initialized singletons from mcpgateway.main to ensure
186 # services share initialized EventService/Redis clients. Import lazily
187 # to avoid circular import at module load time. Fall back to local
188 # instances if singletons are not available (tests, isolated usage).
189 # Use globally-exported singletons from service modules so they
190 # share initialized EventService/Redis clients created at app startup.
191 # First-Party
192 from mcpgateway.services.gateway_service import gateway_service
193 from mcpgateway.services.prompt_service import prompt_service
194 from mcpgateway.services.resource_service import resource_service
195 from mcpgateway.services.root_service import root_service
196 from mcpgateway.services.server_service import server_service
197 from mcpgateway.services.tool_service import tool_service
199 self.gateway_service = gateway_service
200 self.tool_service = tool_service
201 self.resource_service = resource_service
202 self.prompt_service = prompt_service
203 self.server_service = server_service
204 self.root_service = root_service
205 self.active_imports: Dict[str, ImportStatus] = {}
207 async def initialize(self) -> None:
208 """Initialize the import service."""
209 logger.info("Import service initialized")
211 async def shutdown(self) -> None:
212 """Shutdown the import service."""
213 logger.info("Import service shutdown")
215 def validate_import_data(self, import_data: Dict[str, Any]) -> None:
216 """Validate import data against the expected schema.
218 Args:
219 import_data: The import data to validate
221 Raises:
222 ImportValidationError: If validation fails
224 Examples:
225 >>> service = ImportService()
226 >>> valid_data = {
227 ... "version": "2025-03-26",
228 ... "exported_at": "2025-01-01T00:00:00Z",
229 ... "entities": {"tools": []}
230 ... }
231 >>> service.validate_import_data(valid_data) # Should not raise
233 >>> invalid_data = {"missing": "version"}
234 >>> try:
235 ... service.validate_import_data(invalid_data)
236 ... except ImportValidationError as e:
237 ... "Missing required field" in str(e)
238 True
239 """
240 logger.debug("Validating import data structure")
242 # Check required top-level fields
243 required_fields = ["version", "exported_at", "entities"]
244 for field in required_fields:
245 if field not in import_data:
246 raise ImportValidationError(f"Missing required field: {field}")
248 # Validate version compatibility
249 if not import_data.get("version"):
250 raise ImportValidationError("Version field cannot be empty")
252 # Validate entities structure
253 entities = import_data.get("entities", {})
254 if not isinstance(entities, dict):
255 raise ImportValidationError("Entities must be a dictionary")
257 # Validate each entity type
258 valid_entity_types = ["tools", "gateways", "servers", "prompts", "resources", "roots"]
259 for entity_type, entity_list in entities.items():
260 if entity_type not in valid_entity_types:
261 raise ImportValidationError(f"Unknown entity type: {entity_type}")
263 if not isinstance(entity_list, list):
264 raise ImportValidationError(f"Entity type '{entity_type}' must be a list")
266 # Validate individual entities
267 for i, entity in enumerate(entity_list):
268 if not isinstance(entity, dict):
269 raise ImportValidationError(f"Entity {i} in '{entity_type}' must be a dictionary")
271 # Check required fields based on entity type
272 self._validate_entity_fields(entity_type, entity, i)
274 logger.debug("Import data validation passed")
276 def _validate_entity_fields(self, entity_type: str, entity: Dict[str, Any], index: int) -> None:
277 """Validate required fields for a specific entity type.
279 Args:
280 entity_type: Type of entity (tools, gateways, etc.)
281 entity: Entity data dictionary
282 index: Index of entity in list for error messages
284 Raises:
285 ImportValidationError: If required fields are missing
286 """
287 required_fields = {
288 "tools": ["name", "url", "integration_type"],
289 "gateways": ["name", "url"],
290 "servers": ["name"],
291 "prompts": ["name", "template"],
292 "resources": ["name", "uri"],
293 "roots": ["uri", "name"],
294 }
296 if entity_type in required_fields: 296 ↛ exitline 296 didn't return from function '_validate_entity_fields' because the condition on line 296 was always true
297 for field in required_fields[entity_type]:
298 if field not in entity:
299 raise ImportValidationError(f"Entity {index} in '{entity_type}' missing required field: {field}")
301 async def import_configuration(
302 self,
303 db: Session,
304 import_data: Dict[str, Any],
305 conflict_strategy: ConflictStrategy = ConflictStrategy.UPDATE,
306 dry_run: bool = False,
307 rekey_secret: Optional[str] = None,
308 imported_by: str = "system",
309 selected_entities: Optional[Dict[str, List[str]]] = None,
310 ) -> ImportStatus:
311 """Import configuration data with conflict resolution.
313 Args:
314 db: Database session
315 import_data: The validated import data
316 conflict_strategy: How to handle naming conflicts
317 dry_run: If True, validate but don't make changes
318 rekey_secret: New encryption secret for cross-environment imports
319 imported_by: Username of the person performing the import
320 selected_entities: Dict of entity types to specific entity names/ids to import
322 Returns:
323 ImportStatus: Status object tracking import progress and results
325 Raises:
326 ImportError: If import fails
327 """
328 import_id = str(uuid.uuid4())
329 status = ImportStatus(import_id)
330 self.active_imports[import_id] = status
332 try:
333 logger.info(f"Starting configuration import {import_id} by {imported_by} (dry_run={dry_run})")
335 # Validate import data
336 self.validate_import_data(import_data)
338 # Calculate total entities to process
339 entities = import_data.get("entities", {})
340 status.total_entities = self._calculate_total_entities(entities, selected_entities)
342 status.status = "running"
344 # Process entities in dependency order
345 processing_order = ["roots", "gateways", "tools", "resources", "prompts", "servers"]
347 for entity_type in processing_order:
348 if entity_type in entities:
349 await self._process_entities(db, entity_type, entities[entity_type], conflict_strategy, dry_run, rekey_secret, status, selected_entities, imported_by)
350 # Flush after each entity type to make records visible for associations
351 if not dry_run:
352 db.flush()
354 # Assign all imported items to user's team with public visibility (after all entities processed)
355 if not dry_run:
356 await self._assign_imported_items_to_team(db, imported_by)
358 # Mark as completed
359 status.status = "completed"
360 status.completed_at = datetime.now(timezone.utc)
362 logger.info(f"Import {import_id} completed: created={status.created_entities}, updated={status.updated_entities}, skipped={status.skipped_entities}, failed={status.failed_entities}")
364 return status
366 except Exception as e:
367 status.status = "failed"
368 status.completed_at = datetime.now(timezone.utc)
369 status.errors.append(f"Import failed: {str(e)}")
370 logger.error(f"Import {import_id} failed: {str(e)}")
371 raise ImportError(f"Import failed: {str(e)}")
373 def _get_entity_identifier(self, entity_type: str, entity: Dict[str, Any]) -> str:
374 """Get the unique identifier for an entity based on its type.
376 Args:
377 entity_type: Type of entity
378 entity: Entity data dictionary
380 Returns:
381 Unique identifier string for the entity
383 Examples:
384 >>> service = ImportService()
385 >>> tool_entity = {"name": "my_tool", "url": "https://example.com"}
386 >>> service._get_entity_identifier("tools", tool_entity)
387 'my_tool'
389 >>> resource_entity = {"name": "my_resource", "uri": "/api/data"}
390 >>> service._get_entity_identifier("resources", resource_entity)
391 '/api/data'
393 >>> root_entity = {"name": "workspace", "uri": "file:///workspace"}
394 >>> service._get_entity_identifier("roots", root_entity)
395 'file:///workspace'
397 >>> unknown_entity = {"data": "test"}
398 >>> service._get_entity_identifier("unknown", unknown_entity)
399 ''
400 """
401 if entity_type in ["tools", "gateways", "servers", "prompts"]:
402 return entity.get("name", "")
403 if entity_type == "resources":
404 return entity.get("uri", "")
405 if entity_type == "roots":
406 return entity.get("uri", "")
407 return ""
409 def _calculate_total_entities(self, entities: Dict[str, List[Dict[str, Any]]], selected_entities: Optional[Dict[str, List[str]]]) -> int:
410 """Calculate total entities to process based on selection criteria.
412 Args:
413 entities: Dictionary of entities from import data
414 selected_entities: Optional entity selection filter
416 Returns:
417 Total number of entities to process
419 Examples:
420 No selection counts all entities:
421 >>> svc = ImportService()
422 >>> entities = {
423 ... 'tools': [{"name": "t1"}, {"name": "t2"}],
424 ... 'resources': [{"uri": "/r1"}],
425 ... }
426 >>> svc._calculate_total_entities(entities, selected_entities=None)
427 3
429 Selection for a subset by name/identifier:
430 >>> selected = {'tools': ['t2'], 'resources': ['/r1']}
431 >>> svc._calculate_total_entities(entities, selected)
432 2
434 Selection for only a type (empty list means all of that type):
435 >>> selected = {'tools': []}
436 >>> svc._calculate_total_entities(entities, selected)
437 2
438 """
439 if selected_entities:
440 total = 0
441 for entity_type, entity_list in entities.items():
442 if entity_type in selected_entities:
443 selected_names = selected_entities[entity_type]
444 if selected_names:
445 # Count entities that match selection
446 for entity in entity_list:
447 entity_name = self._get_entity_identifier(entity_type, entity)
448 if entity_name in selected_names:
449 total += 1
450 else:
451 total += len(entity_list)
452 return total
453 return sum(len(entity_list) for entity_list in entities.values())
455 async def _process_entities(
456 self,
457 db: Session,
458 entity_type: str,
459 entity_list: List[Dict[str, Any]],
460 conflict_strategy: ConflictStrategy,
461 dry_run: bool,
462 rekey_secret: Optional[str],
463 status: ImportStatus,
464 selected_entities: Optional[Dict[str, List[str]]],
465 imported_by: str,
466 ) -> None:
467 """Process a list of entities of a specific type using bulk operations.
469 This method now uses bulk registration for tools, resources, and prompts
470 to achieve 10-50x performance improvements over individual processing.
472 Args:
473 db: Database session
474 entity_type: Type of entities being processed
475 entity_list: List of entity data dictionaries
476 conflict_strategy: How to handle naming conflicts
477 dry_run: Whether this is a dry run
478 rekey_secret: New encryption secret if re-keying
479 status: Import status tracker
480 selected_entities: Optional entity selection filter
481 imported_by: Username of the person performing the import
482 """
483 logger.debug(f"Processing {len(entity_list)} {entity_type} entities")
485 # Filter entities based on selection
486 filtered_entities = []
487 for entity_data in entity_list:
488 # Check if this entity is selected for import
489 if selected_entities and entity_type in selected_entities:
490 selected_names = selected_entities[entity_type]
491 if selected_names: # If specific entities are selected
492 entity_name = self._get_entity_identifier(entity_type, entity_data)
493 if entity_name not in selected_names:
494 continue # Skip this entity
496 # Handle authentication re-encryption if needed
497 if rekey_secret and self._has_auth_data(entity_data):
498 entity_data = self._rekey_auth_data(entity_data, rekey_secret)
500 filtered_entities.append(entity_data)
502 if not filtered_entities:
503 logger.debug(f"No {entity_type} entities to process after filtering")
504 return
506 # Use bulk operations for tools, resources, and prompts
507 if entity_type == "tools":
508 await self._process_tools_bulk(db, filtered_entities, conflict_strategy, dry_run, status, imported_by)
509 elif entity_type == "resources":
510 await self._process_resources_bulk(db, filtered_entities, conflict_strategy, dry_run, status, imported_by)
511 elif entity_type == "prompts":
512 await self._process_prompts_bulk(db, filtered_entities, conflict_strategy, dry_run, status, imported_by)
513 else:
514 # Fall back to individual processing for other entity types
515 for entity_data in filtered_entities:
516 try:
517 await self._process_single_entity(db, entity_type, entity_data, conflict_strategy, dry_run, status, imported_by)
518 status.processed_entities += 1
519 except Exception as e:
520 status.failed_entities += 1
521 status.errors.append(f"Failed to process {entity_type} entity: {str(e)}")
522 logger.error(f"Failed to process {entity_type} entity: {str(e)}")
524 def _has_auth_data(self, entity_data: Dict[str, Any]) -> bool:
525 """Check if entity has authentication data that needs re-encryption.
527 Args:
528 entity_data: Entity data dictionary
530 Returns:
531 True if entity has auth data, False otherwise
533 Examples:
534 >>> service = ImportService()
535 >>> entity_with_auth = {"name": "test", "auth_value": "encrypted_data"}
536 >>> bool(service._has_auth_data(entity_with_auth))
537 True
539 >>> entity_without_auth = {"name": "test"}
540 >>> service._has_auth_data(entity_without_auth)
541 False
543 >>> entity_empty_auth = {"name": "test", "auth_value": ""}
544 >>> bool(service._has_auth_data(entity_empty_auth))
545 False
547 >>> entity_none_auth = {"name": "test", "auth_value": None}
548 >>> bool(service._has_auth_data(entity_none_auth))
549 False
550 """
551 return "auth_value" in entity_data and entity_data.get("auth_value")
553 def _rekey_auth_data(self, entity_data: Dict[str, Any], new_secret: str) -> Dict[str, Any]:
554 """Re-encrypt authentication data with a new secret key.
556 Args:
557 entity_data: Entity data dictionary
558 new_secret: New encryption secret
560 Returns:
561 Updated entity data with re-encrypted auth
563 Raises:
564 ImportError: If re-encryption fails
566 Examples:
567 Returns original entity when no auth data present:
568 >>> svc = ImportService()
569 >>> svc._has_auth_data({'name': 'x'})
570 False
571 >>> svc._rekey_auth_data({'name': 'x'}, 'new')
572 {'name': 'x'}
574 Rekeys when auth data is present (encode/decode patched):
575 >>> from unittest.mock import patch
576 >>> data = {'name': 'x', 'auth_value': 'enc_old'}
577 >>> with patch('mcpgateway.services.import_service.decode_auth', return_value='plain'):
578 ... with patch('mcpgateway.services.import_service.encode_auth', return_value='enc_new'):
579 ... result = svc._rekey_auth_data(dict(data), 'new-secret')
580 >>> result['auth_value']
581 'enc_new'
582 """
583 if not self._has_auth_data(entity_data):
584 return entity_data
586 try:
587 # Decrypt with old key
588 old_auth_value = entity_data["auth_value"]
589 decrypted_auth = decode_auth(old_auth_value)
591 # Re-encrypt with new key (temporarily change settings)
592 old_secret = settings.auth_encryption_secret
593 settings.auth_encryption_secret = new_secret
594 try:
595 new_auth_value = encode_auth(decrypted_auth)
596 entity_data["auth_value"] = new_auth_value
597 finally:
598 settings.auth_encryption_secret = old_secret
600 logger.debug("Successfully re-keyed authentication data")
601 return entity_data
603 except Exception as e:
604 raise ImportError(f"Failed to re-key authentication data: {str(e)}")
606 async def _process_single_entity(
607 self, db: Session, entity_type: str, entity_data: Dict[str, Any], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus, imported_by: str
608 ) -> None:
609 """Process a single entity with conflict resolution.
611 Args:
612 db: Database session
613 entity_type: Type of entity
614 entity_data: Entity data dictionary
615 conflict_strategy: How to handle conflicts
616 dry_run: Whether this is a dry run
617 status: Import status tracker
618 imported_by: Username of the person performing the import
620 Raises:
621 ImportError: If processing fails
622 """
623 try:
624 if entity_type == "tools":
625 await self._process_tool(db, entity_data, conflict_strategy, dry_run, status)
626 elif entity_type == "gateways":
627 await self._process_gateway(db, entity_data, conflict_strategy, dry_run, status)
628 elif entity_type == "servers":
629 await self._process_server(db, entity_data, conflict_strategy, dry_run, status, imported_by)
630 elif entity_type == "prompts":
631 await self._process_prompt(db, entity_data, conflict_strategy, dry_run, status)
632 elif entity_type == "resources":
633 await self._process_resource(db, entity_data, conflict_strategy, dry_run, status)
634 elif entity_type == "roots": 634 ↛ exitline 634 didn't return from function '_process_single_entity' because the condition on line 634 was always true
635 await self._process_root(entity_data, conflict_strategy, dry_run, status)
637 except Exception as e:
638 raise ImportError(f"Failed to process {entity_type}: {str(e)}")
640 async def _process_tool(self, db: Session, tool_data: Dict[str, Any], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus) -> None:
641 """Process a tool entity.
643 Args:
644 db: Database session
645 tool_data: Tool data dictionary
646 conflict_strategy: How to handle conflicts
647 dry_run: Whether this is a dry run
648 status: Import status tracker
650 Raises:
651 ImportError: If processing fails
652 ImportConflictError: If conflict cannot be resolved
653 """
654 tool_name = tool_data["name"]
656 if dry_run:
657 status.warnings.append(f"Would import tool: {tool_name}")
658 return
660 try:
661 # Convert to ToolCreate schema
662 create_data = self._convert_to_tool_create(tool_data)
664 # Try to create the tool
665 try:
666 await self.tool_service.register_tool(db, create_data)
667 status.created_entities += 1
668 logger.debug(f"Created tool: {tool_name}")
670 except ToolNameConflictError:
671 # Handle conflict based on strategy
672 if conflict_strategy == ConflictStrategy.SKIP:
673 status.skipped_entities += 1
674 status.warnings.append(f"Skipped existing tool: {tool_name}")
675 elif conflict_strategy == ConflictStrategy.UPDATE:
676 # For conflict resolution, we need to find existing tool ID
677 # This is a simplified approach - in practice you'd query the database
678 try:
679 # Try to get tools and find by name
680 tools, _ = await self.tool_service.list_tools(db, include_inactive=True)
681 existing_tool = next((t for t in tools if t.original_name == tool_name), None)
682 if existing_tool:
683 update_data = self._convert_to_tool_update(tool_data)
684 await self.tool_service.update_tool(db, existing_tool.id, update_data)
685 status.updated_entities += 1
686 logger.debug(f"Updated tool: {tool_name}")
687 else:
688 status.warnings.append(f"Could not find existing tool to update: {tool_name}")
689 status.skipped_entities += 1
690 except Exception as update_error:
691 logger.warning(f"Failed to update tool {tool_name}: {str(update_error)}")
692 status.warnings.append(f"Could not update tool {tool_name}: {str(update_error)}")
693 status.skipped_entities += 1
694 elif conflict_strategy == ConflictStrategy.RENAME:
695 # Rename and create
696 new_name = f"{tool_name}_imported_{int(datetime.now().timestamp())}"
697 create_data.name = new_name
698 await self.tool_service.register_tool(db, create_data)
699 status.created_entities += 1
700 status.warnings.append(f"Renamed tool {tool_name} to {new_name}")
701 elif conflict_strategy == ConflictStrategy.FAIL: 701 ↛ exitline 701 didn't return from function '_process_tool' because the condition on line 701 was always true
702 raise ImportConflictError(f"Tool name conflict: {tool_name}")
704 except Exception as e:
705 raise ImportError(f"Failed to process tool {tool_name}: {str(e)}")
707 async def _process_gateway(self, db: Session, gateway_data: Dict[str, Any], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus) -> None:
708 """Process a gateway entity.
710 Args:
711 db: Database session
712 gateway_data: Gateway data dictionary
713 conflict_strategy: How to handle conflicts
714 dry_run: Whether this is a dry run
715 status: Import status tracker
717 Raises:
718 ImportError: If processing fails
719 ImportConflictError: If conflict cannot be resolved
720 """
721 gateway_name = gateway_data["name"]
723 if dry_run is True:
724 status.warnings.append(f"Would import gateway: {gateway_name}")
725 return
727 try:
728 # Convert to GatewayCreate schema
729 create_data = self._convert_to_gateway_create(gateway_data)
731 try:
732 await self.gateway_service.register_gateway(db, create_data)
733 status.created_entities += 1
734 logger.debug(f"Created gateway: {gateway_name}")
736 except GatewayNameConflictError:
737 if conflict_strategy == ConflictStrategy.SKIP:
738 status.skipped_entities += 1
739 status.warnings.append(f"Skipped existing gateway: {gateway_name}")
740 elif conflict_strategy == ConflictStrategy.UPDATE:
741 try:
742 # Find existing gateway by name
743 gateways, _ = await self.gateway_service.list_gateways(db, include_inactive=True)
744 existing_gateway = next((g for g in gateways if g.name == gateway_name), None)
745 if existing_gateway:
746 update_data = self._convert_to_gateway_update(gateway_data)
747 await self.gateway_service.update_gateway(db, existing_gateway.id, update_data)
748 status.updated_entities += 1
749 logger.debug(f"Updated gateway: {gateway_name}")
750 else:
751 status.warnings.append(f"Could not find existing gateway to update: {gateway_name}")
752 status.skipped_entities += 1
753 except Exception as update_error:
754 logger.warning(f"Failed to update gateway {gateway_name}: {str(update_error)}")
755 status.warnings.append(f"Could not update gateway {gateway_name}: {str(update_error)}")
756 status.skipped_entities += 1
757 elif conflict_strategy == ConflictStrategy.RENAME:
758 new_name = f"{gateway_name}_imported_{int(datetime.now().timestamp())}"
759 create_data.name = new_name
760 await self.gateway_service.register_gateway(db, create_data)
761 status.created_entities += 1
762 status.warnings.append(f"Renamed gateway {gateway_name} to {new_name}")
763 elif conflict_strategy == ConflictStrategy.FAIL: 763 ↛ exitline 763 didn't return from function '_process_gateway' because the condition on line 763 was always true
764 raise ImportConflictError(f"Gateway name conflict: {gateway_name}")
766 except Exception as e:
767 raise ImportError(f"Failed to process gateway {gateway_name}: {str(e)}")
769 async def _process_server(self, db: Session, server_data: Dict[str, Any], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus, imported_by: str) -> None:
770 """Process a server entity.
772 Args:
773 db: Database session
774 server_data: Server data dictionary
775 conflict_strategy: How to handle conflicts
776 dry_run: Whether this is a dry run
777 status: Import status tracker
778 imported_by: Username of the person performing the import
780 Raises:
781 ImportError: If processing fails
782 ImportConflictError: If conflict cannot be resolved
783 """
784 server_name = server_data["name"]
786 if dry_run:
787 status.warnings.append(f"Would import server: {server_name}")
788 return
790 try:
791 create_data = await self._convert_to_server_create(db, server_data)
793 try:
794 await self.server_service.register_server(db, create_data)
795 status.created_entities += 1
796 logger.debug(f"Created server: {server_name}")
798 except ServerNameConflictError:
799 if conflict_strategy == ConflictStrategy.SKIP:
800 status.skipped_entities += 1
801 status.warnings.append(f"Skipped existing server: {server_name}")
802 elif conflict_strategy == ConflictStrategy.UPDATE:
803 try:
804 # Find existing server by name
805 servers = await self.server_service.list_servers(db, include_inactive=True)
806 existing_server = next((s for s in servers if s.name == server_name), None)
807 if existing_server:
808 update_data = await self._convert_to_server_update(db, server_data)
809 await self.server_service.update_server(db, existing_server.id, update_data, imported_by)
810 status.updated_entities += 1
811 logger.debug(f"Updated server: {server_name}")
812 else:
813 status.warnings.append(f"Could not find existing server to update: {server_name}")
814 status.skipped_entities += 1
815 except Exception as update_error:
816 logger.warning(f"Failed to update server {server_name}: {str(update_error)}")
817 status.warnings.append(f"Could not update server {server_name}: {str(update_error)}")
818 status.skipped_entities += 1
819 elif conflict_strategy == ConflictStrategy.RENAME:
820 new_name = f"{server_name}_imported_{int(datetime.now().timestamp())}"
821 create_data.name = new_name
822 await self.server_service.register_server(db, create_data)
823 status.created_entities += 1
824 status.warnings.append(f"Renamed server {server_name} to {new_name}")
825 elif conflict_strategy == ConflictStrategy.FAIL: 825 ↛ exitline 825 didn't return from function '_process_server' because the condition on line 825 was always true
826 raise ImportConflictError(f"Server name conflict: {server_name}")
828 except Exception as e:
829 raise ImportError(f"Failed to process server {server_name}: {str(e)}")
831 async def _process_prompt(self, db: Session, prompt_data: Dict[str, Any], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus) -> None:
832 """Process a prompt entity.
834 Args:
835 db: Database session
836 prompt_data: Prompt data dictionary
837 conflict_strategy: How to handle conflicts
838 dry_run: Whether this is a dry run
839 status: Import status tracker
841 Raises:
842 ImportError: If processing fails
843 ImportConflictError: If conflict cannot be resolved
844 """
845 prompt_name = prompt_data["name"]
847 if dry_run:
848 status.warnings.append(f"Would import prompt: {prompt_name}")
849 return
851 try:
852 create_data = self._convert_to_prompt_create(prompt_data)
854 try:
855 await self.prompt_service.register_prompt(db, create_data)
856 status.created_entities += 1
857 logger.debug(f"Created prompt: {prompt_name}")
859 except PromptNameConflictError:
860 if conflict_strategy == ConflictStrategy.SKIP:
861 status.skipped_entities += 1
862 status.warnings.append(f"Skipped existing prompt: {prompt_name}")
863 elif conflict_strategy == ConflictStrategy.UPDATE:
864 update_data = self._convert_to_prompt_update(prompt_data)
865 await self.prompt_service.update_prompt(db, prompt_name, update_data)
866 status.updated_entities += 1
867 logger.debug(f"Updated prompt: {prompt_name}")
868 elif conflict_strategy == ConflictStrategy.RENAME:
869 new_name = f"{prompt_name}_imported_{int(datetime.now().timestamp())}"
870 create_data.name = new_name
871 await self.prompt_service.register_prompt(db, create_data)
872 status.created_entities += 1
873 status.warnings.append(f"Renamed prompt {prompt_name} to {new_name}")
874 elif conflict_strategy == ConflictStrategy.FAIL: 874 ↛ exitline 874 didn't return from function '_process_prompt' because the condition on line 874 was always true
875 raise ImportConflictError(f"Prompt name conflict: {prompt_name}")
877 except Exception as e:
878 raise ImportError(f"Failed to process prompt {prompt_name}: {str(e)}")
880 async def _process_resource(self, db: Session, resource_data: Dict[str, Any], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus) -> None:
881 """Process a resource entity.
883 Args:
884 db: Database session
885 resource_data: Resource data dictionary
886 conflict_strategy: How to handle conflicts
887 dry_run: Whether this is a dry run
888 status: Import status tracker
890 Raises:
891 ImportError: If processing fails
892 ImportConflictError: If conflict cannot be resolved
893 """
894 resource_uri = resource_data["uri"]
896 if dry_run:
897 status.warnings.append(f"Would import resource: {resource_uri}")
898 return
900 try:
901 create_data = self._convert_to_resource_create(resource_data)
903 try:
904 await self.resource_service.register_resource(db, create_data)
905 status.created_entities += 1
906 logger.debug(f"Created resource: {resource_uri}")
908 except ResourceURIConflictError:
909 if conflict_strategy == ConflictStrategy.SKIP:
910 status.skipped_entities += 1
911 status.warnings.append(f"Skipped existing resource: {resource_uri}")
912 elif conflict_strategy == ConflictStrategy.UPDATE:
913 update_data = self._convert_to_resource_update(resource_data)
914 await self.resource_service.update_resource(db, resource_uri, update_data)
915 status.updated_entities += 1
916 logger.debug(f"Updated resource: {resource_uri}")
917 elif conflict_strategy == ConflictStrategy.RENAME:
918 new_uri = f"{resource_uri}_imported_{int(datetime.now().timestamp())}"
919 create_data.uri = new_uri
920 await self.resource_service.register_resource(db, create_data)
921 status.created_entities += 1
922 status.warnings.append(f"Renamed resource {resource_uri} to {new_uri}")
923 elif conflict_strategy == ConflictStrategy.FAIL: 923 ↛ exitline 923 didn't return from function '_process_resource' because the condition on line 923 was always true
924 raise ImportConflictError(f"Resource URI conflict: {resource_uri}")
926 except Exception as e:
927 raise ImportError(f"Failed to process resource {resource_uri}: {str(e)}")
929 async def _process_tools_bulk(self, db: Session, tools_data: List[Dict[str, Any]], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus, imported_by: str) -> None:
930 """Process multiple tools using bulk operations.
932 Args:
933 db: Database session
934 tools_data: List of tool data dictionaries
935 conflict_strategy: How to handle conflicts
936 dry_run: Whether this is a dry run
937 status: Import status tracker
938 imported_by: Username of the person performing the import
939 """
940 if dry_run:
941 for tool_data in tools_data:
942 status.warnings.append(f"Would import tool: {tool_data.get('name', 'unknown')}")
943 return
945 try:
946 # Convert all tool data to ToolCreate schemas
947 tools_to_register = []
948 for tool_data in tools_data:
949 try:
950 create_data = self._convert_to_tool_create(tool_data)
951 tools_to_register.append(create_data)
952 except Exception as e:
953 status.failed_entities += 1
954 status.errors.append(f"Failed to convert tool {tool_data.get('name', 'unknown')}: {str(e)}")
955 logger.warning(f"Failed to convert tool data: {str(e)}")
957 if not tools_to_register:
958 return
960 # Use bulk registration
961 result = await self.tool_service.register_tools_bulk(
962 db=db,
963 tools=tools_to_register,
964 created_by=imported_by,
965 created_via="import",
966 conflict_strategy=conflict_strategy.value,
967 )
969 # Update status based on results
970 status.created_entities += result["created"]
971 status.updated_entities += result["updated"]
972 status.skipped_entities += result["skipped"]
973 status.failed_entities += result["failed"]
974 status.processed_entities += result["created"] + result["updated"] + result["skipped"]
976 # Add any errors to status
977 for error in result.get("errors", []):
978 status.errors.append(error)
980 logger.info(f"Bulk processed {len(tools_data)} tools: {result['created']} created, {result['updated']} updated, {result['skipped']} skipped, {result['failed']} failed")
982 except Exception as e:
983 status.failed_entities += len(tools_data)
984 status.errors.append(f"Bulk tool processing failed: {str(e)}")
985 logger.error(f"Failed to bulk process tools: {str(e)}")
986 # Don't raise - allow import to continue with other entities
988 async def _process_resources_bulk(self, db: Session, resources_data: List[Dict[str, Any]], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus, imported_by: str) -> None:
989 """Process multiple resources using bulk operations.
991 Args:
992 db: Database session
993 resources_data: List of resource data dictionaries
994 conflict_strategy: How to handle conflicts
995 dry_run: Whether this is a dry run
996 status: Import status tracker
997 imported_by: Username of the person performing the import
998 """
999 if dry_run:
1000 for resource_data in resources_data:
1001 status.warnings.append(f"Would import resource: {resource_data.get('uri', 'unknown')}")
1002 return
1004 try:
1005 # Convert all resource data to ResourceCreate schemas
1006 resources_to_register = []
1007 for resource_data in resources_data:
1008 try:
1009 create_data = self._convert_to_resource_create(resource_data)
1010 resources_to_register.append(create_data)
1011 except Exception as e:
1012 status.failed_entities += 1
1013 status.errors.append(f"Failed to convert resource {resource_data.get('uri', 'unknown')}: {str(e)}")
1014 logger.warning(f"Failed to convert resource data: {str(e)}")
1016 if not resources_to_register:
1017 return
1019 # Use bulk registration
1020 result = await self.resource_service.register_resources_bulk(
1021 db=db,
1022 resources=resources_to_register,
1023 created_by=imported_by,
1024 created_via="import",
1025 conflict_strategy=conflict_strategy.value,
1026 )
1028 # Update status based on results
1029 status.created_entities += result["created"]
1030 status.updated_entities += result["updated"]
1031 status.skipped_entities += result["skipped"]
1032 status.failed_entities += result["failed"]
1033 status.processed_entities += result["created"] + result["updated"] + result["skipped"]
1035 # Add any errors to status
1036 for error in result.get("errors", []):
1037 status.errors.append(error)
1039 logger.info(f"Bulk processed {len(resources_data)} resources: {result['created']} created, {result['updated']} updated, {result['skipped']} skipped, {result['failed']} failed")
1041 except Exception as e:
1042 status.failed_entities += len(resources_data)
1043 status.errors.append(f"Bulk resource processing failed: {str(e)}")
1044 logger.error(f"Failed to bulk process resources: {str(e)}")
1045 # Don't raise - allow import to continue with other entities
1047 async def _process_prompts_bulk(self, db: Session, prompts_data: List[Dict[str, Any]], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus, imported_by: str) -> None:
1048 """Process multiple prompts using bulk operations.
1050 Args:
1051 db: Database session
1052 prompts_data: List of prompt data dictionaries
1053 conflict_strategy: How to handle conflicts
1054 dry_run: Whether this is a dry run
1055 status: Import status tracker
1056 imported_by: Username of the person performing the import
1057 """
1058 if dry_run:
1059 for prompt_data in prompts_data:
1060 status.warnings.append(f"Would import prompt: {prompt_data.get('name', 'unknown')}")
1061 return
1063 try:
1064 # Convert all prompt data to PromptCreate schemas
1065 prompts_to_register = []
1066 for prompt_data in prompts_data:
1067 try:
1068 create_data = self._convert_to_prompt_create(prompt_data)
1069 prompts_to_register.append(create_data)
1070 except Exception as e:
1071 status.failed_entities += 1
1072 status.errors.append(f"Failed to convert prompt {prompt_data.get('name', 'unknown')}: {str(e)}")
1073 logger.warning(f"Failed to convert prompt data: {str(e)}")
1075 if not prompts_to_register:
1076 return
1078 # Use bulk registration
1079 result = await self.prompt_service.register_prompts_bulk(
1080 db=db,
1081 prompts=prompts_to_register,
1082 created_by=imported_by,
1083 created_via="import",
1084 conflict_strategy=conflict_strategy.value,
1085 )
1087 # Update status based on results
1088 status.created_entities += result["created"]
1089 status.updated_entities += result["updated"]
1090 status.skipped_entities += result["skipped"]
1091 status.failed_entities += result["failed"]
1092 status.processed_entities += result["created"] + result["updated"] + result["skipped"]
1094 # Add any errors to status
1095 for error in result.get("errors", []):
1096 status.errors.append(error)
1098 logger.info(f"Bulk processed {len(prompts_data)} prompts: {result['created']} created, {result['updated']} updated, {result['skipped']} skipped, {result['failed']} failed")
1100 except Exception as e:
1101 status.failed_entities += len(prompts_data)
1102 status.errors.append(f"Bulk prompt processing failed: {str(e)}")
1103 logger.error(f"Failed to bulk process prompts: {str(e)}")
1104 # Don't raise - allow import to continue with other entities
1106 async def _process_root(self, root_data: Dict[str, Any], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus) -> None:
1107 """Process a root entity.
1109 Args:
1110 root_data: Root data dictionary
1111 conflict_strategy: How to handle conflicts
1112 dry_run: Whether this is a dry run
1113 status: Import status tracker
1115 Raises:
1116 ImportError: If processing fails
1117 ImportConflictError: If conflict cannot be resolved
1118 """
1119 root_uri = root_data["uri"]
1121 if dry_run:
1122 status.warnings.append(f"Would import root: {root_uri}")
1123 return
1125 try:
1126 await self.root_service.add_root(root_uri, root_data.get("name"))
1127 status.created_entities += 1
1128 logger.debug(f"Created root: {root_uri}")
1130 except Exception as e:
1131 if conflict_strategy == ConflictStrategy.SKIP:
1132 status.skipped_entities += 1
1133 status.warnings.append(f"Skipped existing root: {root_uri}")
1134 elif conflict_strategy == ConflictStrategy.FAIL:
1135 raise ImportConflictError(f"Root URI conflict: {root_uri}")
1136 else:
1137 raise ImportError(f"Failed to process root {root_uri}: {str(e)}")
1139 def _convert_to_tool_create(self, tool_data: Dict[str, Any]) -> ToolCreate:
1140 """Convert import data to ToolCreate schema.
1142 Args:
1143 tool_data: Tool data dictionary from import
1145 Returns:
1146 ToolCreate schema object
1147 """
1148 # Extract auth information if present
1149 auth_info = None
1150 if tool_data.get("auth_type") and tool_data.get("auth_value"):
1151 auth_info = AuthenticationValues(auth_type=tool_data["auth_type"], auth_value=tool_data["auth_value"])
1153 return ToolCreate(
1154 name=tool_data["name"],
1155 displayName=tool_data.get("displayName"),
1156 url=tool_data["url"],
1157 description=tool_data.get("description"),
1158 integration_type=tool_data.get("integration_type", "REST"),
1159 request_type=tool_data.get("request_type", "GET"),
1160 headers=tool_data.get("headers"),
1161 input_schema=tool_data.get("input_schema"),
1162 output_schema=tool_data.get("output_schema"),
1163 annotations=tool_data.get("annotations"),
1164 jsonpath_filter=tool_data.get("jsonpath_filter"),
1165 auth=auth_info,
1166 tags=tool_data.get("tags", []),
1167 )
1169 def _convert_to_tool_update(self, tool_data: Dict[str, Any]) -> ToolUpdate:
1170 """Convert import data to ToolUpdate schema.
1172 Args:
1173 tool_data: Tool data dictionary from import
1175 Returns:
1176 ToolUpdate schema object
1177 """
1178 auth_info = None
1179 if tool_data.get("auth_type") and tool_data.get("auth_value"):
1180 auth_info = AuthenticationValues(auth_type=tool_data["auth_type"], auth_value=tool_data["auth_value"])
1182 return ToolUpdate(
1183 name=tool_data.get("name"),
1184 displayName=tool_data.get("displayName"),
1185 url=tool_data.get("url"),
1186 description=tool_data.get("description"),
1187 integration_type=tool_data.get("integration_type"),
1188 request_type=tool_data.get("request_type"),
1189 headers=tool_data.get("headers"),
1190 input_schema=tool_data.get("input_schema"),
1191 output_schema=tool_data.get("output_schema"),
1192 annotations=tool_data.get("annotations"),
1193 jsonpath_filter=tool_data.get("jsonpath_filter"),
1194 auth=auth_info,
1195 tags=tool_data.get("tags"),
1196 )
1198 def _convert_to_gateway_create(self, gateway_data: Dict[str, Any]) -> GatewayCreate:
1199 """Convert import data to GatewayCreate schema.
1201 Args:
1202 gateway_data: Gateway data dictionary from import
1204 Returns:
1205 GatewayCreate schema object
1206 """
1207 # Handle auth data
1208 auth_kwargs = {}
1209 if gateway_data.get("auth_type"):
1210 auth_kwargs["auth_type"] = gateway_data["auth_type"]
1212 # Handle query_param auth type (new in this version)
1213 if gateway_data["auth_type"] == "query_param" and gateway_data.get("auth_query_params"):
1214 try:
1215 auth_query_params = gateway_data["auth_query_params"]
1216 if auth_query_params: 1216 ↛ 1257line 1216 didn't jump to line 1257 because the condition on line 1216 was always true
1217 # Get the first key-value pair (schema supports single param)
1218 param_key = next(iter(auth_query_params.keys()))
1219 encrypted_value = auth_query_params[param_key]
1220 # Decode the encrypted value - returns dict like {param_key: value}
1221 decrypted_dict = decode_auth(encrypted_value)
1222 # Extract the actual value from the dict
1223 decrypted_value = decrypted_dict.get(param_key, "") if isinstance(decrypted_dict, dict) else str(decrypted_dict)
1224 auth_kwargs["auth_query_param_key"] = param_key
1225 auth_kwargs["auth_query_param_value"] = decrypted_value
1226 logger.debug(f"Importing gateway with query_param auth, key: {param_key}")
1227 except Exception as e:
1228 logger.warning(f"Failed to decode query param auth for gateway: {str(e)}")
1229 # Decode auth_value to get original credentials
1230 elif gateway_data.get("auth_value"): 1230 ↛ 1257line 1230 didn't jump to line 1257 because the condition on line 1230 was always true
1231 try:
1232 decoded_auth = decode_auth(gateway_data["auth_value"])
1233 if gateway_data["auth_type"] == "basic":
1234 # Extract username and password from Basic auth
1235 auth_header = decoded_auth.get("Authorization", "")
1236 if auth_header.startswith("Basic "): 1236 ↛ 1257line 1236 didn't jump to line 1257 because the condition on line 1236 was always true
1237 creds = base64.b64decode(auth_header[6:]).decode("utf-8")
1238 username, password = creds.split(":", 1)
1239 auth_kwargs.update({"auth_username": username, "auth_password": password})
1240 elif gateway_data["auth_type"] == "bearer":
1241 # Extract token from Bearer auth
1242 auth_header = decoded_auth.get("Authorization", "")
1243 if auth_header.startswith("Bearer "): 1243 ↛ 1257line 1243 didn't jump to line 1257 because the condition on line 1243 was always true
1244 auth_kwargs["auth_token"] = auth_header[7:]
1245 elif gateway_data["auth_type"] == "authheaders": 1245 ↛ 1257line 1245 didn't jump to line 1257 because the condition on line 1245 was always true
1246 # Handle custom headers
1247 if len(decoded_auth) == 1:
1248 key, value = next(iter(decoded_auth.items()))
1249 auth_kwargs.update({"auth_header_key": key, "auth_header_value": value})
1250 else:
1251 # Multiple headers - use the new format
1252 headers_list = [{"key": k, "value": v} for k, v in decoded_auth.items()]
1253 auth_kwargs["auth_headers"] = headers_list
1254 except Exception as e:
1255 logger.warning(f"Failed to decode auth data for gateway: {str(e)}")
1257 return GatewayCreate(
1258 name=gateway_data["name"],
1259 url=gateway_data["url"],
1260 description=gateway_data.get("description"),
1261 transport=gateway_data.get("transport", "SSE"),
1262 passthrough_headers=gateway_data.get("passthrough_headers"),
1263 tags=gateway_data.get("tags", []),
1264 **auth_kwargs,
1265 )
1267 def _convert_to_gateway_update(self, gateway_data: Dict[str, Any]) -> GatewayUpdate:
1268 """Convert import data to GatewayUpdate schema.
1270 Args:
1271 gateway_data: Gateway data dictionary from import
1273 Returns:
1274 GatewayUpdate schema object
1275 """
1276 # Similar to create but all fields optional
1277 auth_kwargs = {}
1278 if gateway_data.get("auth_type"):
1279 auth_kwargs["auth_type"] = gateway_data["auth_type"]
1281 # Handle query_param auth type (new in this version)
1282 if gateway_data["auth_type"] == "query_param" and gateway_data.get("auth_query_params"):
1283 try:
1284 auth_query_params = gateway_data["auth_query_params"]
1285 if auth_query_params: 1285 ↛ 1321line 1285 didn't jump to line 1321 because the condition on line 1285 was always true
1286 # Get the first key-value pair (schema supports single param)
1287 param_key = next(iter(auth_query_params.keys()))
1288 encrypted_value = auth_query_params[param_key]
1289 # Decode the encrypted value - returns dict like {param_key: value}
1290 decrypted_dict = decode_auth(encrypted_value)
1291 # Extract the actual value from the dict
1292 decrypted_value = decrypted_dict.get(param_key, "") if isinstance(decrypted_dict, dict) else str(decrypted_dict)
1293 auth_kwargs["auth_query_param_key"] = param_key
1294 auth_kwargs["auth_query_param_value"] = decrypted_value
1295 logger.debug(f"Importing gateway update with query_param auth, key: {param_key}")
1296 except Exception as e:
1297 logger.warning(f"Failed to decode query param auth for gateway update: {str(e)}")
1298 elif gateway_data.get("auth_value"): 1298 ↛ 1321line 1298 didn't jump to line 1321 because the condition on line 1298 was always true
1299 try:
1300 decoded_auth = decode_auth(gateway_data["auth_value"])
1301 if gateway_data["auth_type"] == "basic":
1302 auth_header = decoded_auth.get("Authorization", "")
1303 if auth_header.startswith("Basic "): 1303 ↛ 1321line 1303 didn't jump to line 1321 because the condition on line 1303 was always true
1304 creds = base64.b64decode(auth_header[6:]).decode("utf-8")
1305 username, password = creds.split(":", 1)
1306 auth_kwargs.update({"auth_username": username, "auth_password": password})
1307 elif gateway_data["auth_type"] == "bearer":
1308 auth_header = decoded_auth.get("Authorization", "")
1309 if auth_header.startswith("Bearer "): 1309 ↛ 1321line 1309 didn't jump to line 1321 because the condition on line 1309 was always true
1310 auth_kwargs["auth_token"] = auth_header[7:]
1311 elif gateway_data["auth_type"] == "authheaders": 1311 ↛ 1321line 1311 didn't jump to line 1321 because the condition on line 1311 was always true
1312 if len(decoded_auth) == 1:
1313 key, value = next(iter(decoded_auth.items()))
1314 auth_kwargs.update({"auth_header_key": key, "auth_header_value": value})
1315 else:
1316 headers_list = [{"key": k, "value": v} for k, v in decoded_auth.items()]
1317 auth_kwargs["auth_headers"] = headers_list
1318 except Exception as e:
1319 logger.warning(f"Failed to decode auth data for gateway update: {str(e)}")
1321 return GatewayUpdate(
1322 name=gateway_data.get("name"),
1323 url=gateway_data.get("url"),
1324 description=gateway_data.get("description"),
1325 transport=gateway_data.get("transport"),
1326 passthrough_headers=gateway_data.get("passthrough_headers"),
1327 tags=gateway_data.get("tags"),
1328 **auth_kwargs,
1329 )
1331 async def _convert_to_server_create(self, db: Session, server_data: Dict[str, Any]) -> ServerCreate:
1332 """Convert import data to ServerCreate schema, resolving tool references.
1334 Args:
1335 db: Database session
1336 server_data: Server data dictionary from import
1338 Returns:
1339 ServerCreate schema object with resolved tool IDs
1340 """
1341 # Resolve tool references (could be names or IDs) to current tool IDs
1342 tool_references = server_data.get("tool_ids", []) or server_data.get("associated_tools", [])
1343 resolved_tool_ids = []
1345 if tool_references:
1346 # Get all tools to resolve references
1347 all_tools, _ = await self.tool_service.list_tools(db, include_inactive=True)
1349 for tool_ref in tool_references:
1350 # Try to find tool by ID first, then by name
1351 found_tool = None
1353 # Try exact ID match
1354 found_tool = next((t for t in all_tools if t.id == tool_ref), None)
1356 # If not found, try by original_name or name
1357 if not found_tool:
1358 found_tool = next((t for t in all_tools if t.original_name == tool_ref), None)
1360 if not found_tool:
1361 found_tool = next((t for t in all_tools if hasattr(t, "name") and t.name == tool_ref), None)
1363 if found_tool:
1364 resolved_tool_ids.append(found_tool.id)
1365 logger.debug(f"Resolved tool reference '{tool_ref}' to ID {found_tool.id}")
1366 else:
1367 logger.warning(f"Could not resolve tool reference: {tool_ref}")
1368 # Don't include unresolvable references
1370 return ServerCreate(name=server_data["name"], description=server_data.get("description"), associated_tools=resolved_tool_ids, tags=server_data.get("tags", []))
1372 async def _convert_to_server_update(self, db: Session, server_data: Dict[str, Any]) -> ServerUpdate:
1373 """Convert import data to ServerUpdate schema, resolving tool references.
1375 Args:
1376 db: Database session
1377 server_data: Server data dictionary from import
1379 Returns:
1380 ServerUpdate schema object with resolved tool IDs
1381 """
1382 # Resolve tool references same as create method
1383 tool_references = server_data.get("tool_ids", []) or server_data.get("associated_tools", [])
1384 resolved_tool_ids = []
1386 if tool_references:
1387 all_tools, _ = await self.tool_service.list_tools(db, include_inactive=True)
1389 for tool_ref in tool_references:
1390 found_tool = next((t for t in all_tools if t.id == tool_ref), None)
1391 if not found_tool: 1391 ↛ 1393line 1391 didn't jump to line 1393 because the condition on line 1391 was always true
1392 found_tool = next((t for t in all_tools if t.original_name == tool_ref), None)
1393 if not found_tool:
1394 found_tool = next((t for t in all_tools if hasattr(t, "name") and t.name == tool_ref), None)
1396 if found_tool:
1397 resolved_tool_ids.append(found_tool.id)
1398 else:
1399 logger.warning(f"Could not resolve tool reference for update: {tool_ref}")
1401 return ServerUpdate(name=server_data.get("name"), description=server_data.get("description"), associated_tools=resolved_tool_ids if resolved_tool_ids else None, tags=server_data.get("tags"))
1403 def _convert_to_prompt_create(self, prompt_data: Dict[str, Any]) -> PromptCreate:
1404 """Convert import data to PromptCreate schema.
1406 Args:
1407 prompt_data: Prompt data dictionary from import
1409 Returns:
1410 PromptCreate schema object
1411 """
1412 # Convert input_schema back to arguments format
1413 arguments = []
1414 input_schema = prompt_data.get("input_schema", {})
1415 if isinstance(input_schema, dict): 1415 ↛ 1422line 1415 didn't jump to line 1422 because the condition on line 1415 was always true
1416 properties = input_schema.get("properties", {})
1417 required_fields = input_schema.get("required", [])
1419 for prop_name, prop_data in properties.items():
1420 arguments.append({"name": prop_name, "description": prop_data.get("description", ""), "required": prop_name in required_fields})
1422 original_name = prompt_data.get("original_name") or prompt_data["name"]
1423 return PromptCreate(
1424 name=original_name,
1425 custom_name=prompt_data.get("custom_name"),
1426 display_name=prompt_data.get("display_name"),
1427 template=prompt_data["template"],
1428 description=prompt_data.get("description"),
1429 arguments=arguments,
1430 tags=prompt_data.get("tags", []),
1431 )
1433 def _convert_to_prompt_update(self, prompt_data: Dict[str, Any]) -> PromptUpdate:
1434 """Convert import data to PromptUpdate schema.
1436 Args:
1437 prompt_data: Prompt data dictionary from import
1439 Returns:
1440 PromptUpdate schema object
1441 """
1442 arguments = []
1443 input_schema = prompt_data.get("input_schema", {})
1444 if isinstance(input_schema, dict): 1444 ↛ 1451line 1444 didn't jump to line 1451 because the condition on line 1444 was always true
1445 properties = input_schema.get("properties", {})
1446 required_fields = input_schema.get("required", [])
1448 for prop_name, prop_data in properties.items():
1449 arguments.append({"name": prop_name, "description": prop_data.get("description", ""), "required": prop_name in required_fields})
1451 original_name = prompt_data.get("original_name") or prompt_data.get("name")
1452 return PromptUpdate(
1453 name=original_name,
1454 custom_name=prompt_data.get("custom_name"),
1455 display_name=prompt_data.get("display_name"),
1456 template=prompt_data.get("template"),
1457 description=prompt_data.get("description"),
1458 arguments=arguments if arguments else None,
1459 tags=prompt_data.get("tags"),
1460 )
1462 def _convert_to_resource_create(self, resource_data: Dict[str, Any]) -> ResourceCreate:
1463 """Convert import data to ResourceCreate schema.
1465 Args:
1466 resource_data: Resource data dictionary from import
1468 Returns:
1469 ResourceCreate schema object
1470 """
1471 return ResourceCreate(
1472 uri=resource_data["uri"],
1473 name=resource_data["name"],
1474 description=resource_data.get("description"),
1475 mime_type=resource_data.get("mime_type"),
1476 content=resource_data.get("content", ""), # Default empty content
1477 tags=resource_data.get("tags", []),
1478 )
1480 def _convert_to_resource_update(self, resource_data: Dict[str, Any]) -> ResourceUpdate:
1481 """Convert import data to ResourceUpdate schema.
1483 Args:
1484 resource_data: Resource data dictionary from import
1486 Returns:
1487 ResourceUpdate schema object
1488 """
1489 return ResourceUpdate(
1490 name=resource_data.get("name"), description=resource_data.get("description"), mime_type=resource_data.get("mime_type"), content=resource_data.get("content"), tags=resource_data.get("tags")
1491 )
1493 def get_import_status(self, import_id: str) -> Optional[ImportStatus]:
1494 """Get the status of an import operation.
1496 Args:
1497 import_id: Import operation ID
1499 Returns:
1500 Import status object or None if not found
1501 """
1502 return self.active_imports.get(import_id)
1504 def list_import_statuses(self) -> List[ImportStatus]:
1505 """List all import statuses.
1507 Returns:
1508 List of all import status objects
1509 """
1510 return list(self.active_imports.values())
1512 def cleanup_completed_imports(self, max_age_hours: int = 24) -> int:
1513 """Clean up completed import statuses older than max_age_hours.
1515 Args:
1516 max_age_hours: Maximum age in hours for keeping completed imports
1518 Returns:
1519 Number of import statuses removed
1520 """
1521 cutoff_time = datetime.now(timezone.utc) - timedelta(hours=max_age_hours)
1522 removed = 0
1524 to_remove = []
1525 for import_id, status in self.active_imports.items():
1526 if status.status in ["completed", "failed"] and status.completed_at and status.completed_at < cutoff_time: 1526 ↛ 1525line 1526 didn't jump to line 1525 because the condition on line 1526 was always true
1527 to_remove.append(import_id)
1529 for import_id in to_remove:
1530 del self.active_imports[import_id]
1531 removed += 1
1533 return removed
1535 async def preview_import(self, db: Session, import_data: Dict[str, Any]) -> Dict[str, Any]:
1536 """Preview import file to show what would be imported with smart categorization.
1538 Args:
1539 db: Database session
1540 import_data: The validated import data
1542 Returns:
1543 Dictionary with categorized items for selective import UI
1545 Examples:
1546 >>> service = ImportService()
1547 >>> # This would return a structure for the UI to build selection interface
1548 """
1549 self.validate_import_data(import_data)
1551 entities = import_data.get("entities", {})
1552 preview = {
1553 "summary": {"total_items": sum(len(items) for items in entities.values()), "by_type": {entity_type: len(items) for entity_type, items in entities.items()}},
1554 "items": {},
1555 "bundles": {},
1556 "conflicts": {},
1557 "dependencies": {},
1558 }
1560 # Categorize each entity type
1561 for entity_type, entity_list in entities.items():
1562 preview["items"][entity_type] = []
1564 for entity in entity_list:
1565 item_info = await self._analyze_import_item(db, entity_type, entity)
1566 preview["items"][entity_type].append(item_info)
1568 # Find gateway bundles (gateways + their tools/resources/prompts)
1569 if "gateways" in entities: 1569 ↛ 1573line 1569 didn't jump to line 1573 because the condition on line 1569 was always true
1570 preview["bundles"] = self._find_gateway_bundles(entities)
1572 # Find server dependencies
1573 if "servers" in entities: 1573 ↛ 1577line 1573 didn't jump to line 1577 because the condition on line 1573 was always true
1574 preview["dependencies"] = self._find_server_dependencies(entities)
1576 # Detect conflicts with existing items
1577 preview["conflicts"] = await self._detect_import_conflicts(db, entities)
1579 return preview
1581 async def _analyze_import_item(self, db: Session, entity_type: str, entity: Dict[str, Any]) -> Dict[str, Any]:
1582 """Analyze a single import item for the preview.
1584 Args:
1585 db: Database session
1586 entity_type: Type of entity
1587 entity: Entity data
1589 Returns:
1590 Item analysis with metadata for UI selection
1591 """
1592 item_name = self._get_entity_identifier(entity_type, entity)
1594 # Basic item info
1595 item_info = {
1596 "id": item_name,
1597 "name": entity.get("name", item_name),
1598 "type": entity_type,
1599 "is_gateway_item": bool(entity.get("gateway_name") or entity.get("gateway_id")),
1600 "is_custom": not bool(entity.get("gateway_name") or entity.get("gateway_id")),
1601 "description": entity.get("description", ""),
1602 }
1604 # Check if it conflicts with existing items
1605 try:
1606 if entity_type == "tools":
1607 existing, _ = await self.tool_service.list_tools(db)
1608 item_info["conflicts_with"] = any(t.original_name == item_name for t in existing)
1609 elif entity_type == "gateways":
1610 existing, _ = await self.gateway_service.list_gateways(db)
1611 item_info["conflicts_with"] = any(g.name == item_name for g in existing)
1612 elif entity_type == "servers":
1613 existing = await self.server_service.list_servers(db)
1614 item_info["conflicts_with"] = any(s.name == item_name for s in existing)
1615 elif entity_type == "prompts":
1616 existing, _ = await self.prompt_service.list_prompts(db)
1617 item_info["conflicts_with"] = any(p.name == item_name for p in existing)
1618 elif entity_type == "resources":
1619 existing, _ = await self.resource_service.list_resources(db)
1620 item_info["conflicts_with"] = any(r.uri == item_name for r in existing)
1621 else:
1622 item_info["conflicts_with"] = False
1623 except Exception:
1624 item_info["conflicts_with"] = False
1626 # Add metadata for smart selection
1627 if entity_type == "servers":
1628 item_info["dependencies"] = {"tools": entity.get("associated_tools", []), "resources": entity.get("associated_resources", []), "prompts": entity.get("associated_prompts", [])}
1630 return item_info
1632 def _find_gateway_bundles(self, entities: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]:
1633 """Find gateway bundles (gateway + associated tools/resources/prompts).
1635 Args:
1636 entities: All entities from import data
1638 Returns:
1639 Gateway bundle information for UI
1640 """
1641 bundles = {}
1643 if "gateways" not in entities:
1644 return bundles
1646 for gateway in entities["gateways"]:
1647 gateway_name = gateway.get("name", "")
1648 bundle_items = {"tools": [], "resources": [], "prompts": []}
1650 # Find items that belong to this gateway
1651 for entity_type in ["tools", "resources", "prompts"]:
1652 if entity_type in entities: 1652 ↛ 1651line 1652 didn't jump to line 1651 because the condition on line 1652 was always true
1653 for item in entities[entity_type]:
1654 item_gateway = item.get("gateway_name") or item.get("gateway_id")
1655 if item_gateway == gateway_name: 1655 ↛ 1653line 1655 didn't jump to line 1653 because the condition on line 1655 was always true
1656 item_name = self._get_entity_identifier(entity_type, item)
1657 bundle_items[entity_type].append({"id": item_name, "name": item.get("name", item_name), "description": item.get("description", "")})
1659 if any(bundle_items.values()): # Only add if gateway has items 1659 ↛ 1646line 1659 didn't jump to line 1646 because the condition on line 1659 was always true
1660 bundles[gateway_name] = {
1661 "gateway": {"name": gateway_name, "description": gateway.get("description", "")},
1662 "items": bundle_items,
1663 "total_items": sum(len(items) for items in bundle_items.values()),
1664 }
1666 return bundles
1668 def _find_server_dependencies(self, entities: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]:
1669 """Find server dependencies for smart selection.
1671 Args:
1672 entities: All entities from import data
1674 Returns:
1675 Server dependency information for UI
1676 """
1677 dependencies = {}
1679 if "servers" not in entities:
1680 return dependencies
1682 for server in entities["servers"]:
1683 server_name = server.get("name", "")
1684 deps = {"tools": server.get("associated_tools", []), "resources": server.get("associated_resources", []), "prompts": server.get("associated_prompts", [])}
1686 if any(deps.values()): # Only add if server has dependencies 1686 ↛ 1682line 1686 didn't jump to line 1682 because the condition on line 1686 was always true
1687 dependencies[server_name] = {
1688 "server": {"name": server_name, "description": server.get("description", "")},
1689 "requires": deps,
1690 "total_dependencies": sum(len(items) for items in deps.values()),
1691 }
1693 return dependencies
1695 async def _detect_import_conflicts(self, db: Session, entities: Dict[str, List[Dict[str, Any]]]) -> Dict[str, List[Dict[str, Any]]]:
1696 """Detect conflicts between import items and existing database items.
1698 Args:
1699 db: Database session
1700 entities: Import entities
1702 Returns:
1703 Dictionary of conflicts by entity type
1704 """
1705 conflicts = {}
1707 try:
1708 # Check tool conflicts
1709 if "tools" in entities: 1709 ↛ 1723line 1709 didn't jump to line 1723 because the condition on line 1709 was always true
1710 existing_tools, _ = await self.tool_service.list_tools(db)
1711 existing_names = {t.original_name for t in existing_tools}
1713 tool_conflicts = []
1714 for tool in entities["tools"]:
1715 tool_name = tool.get("name", "")
1716 if tool_name in existing_names: 1716 ↛ 1714line 1716 didn't jump to line 1714 because the condition on line 1716 was always true
1717 tool_conflicts.append({"name": tool_name, "type": "name_conflict", "description": tool.get("description", "")})
1719 if tool_conflicts: 1719 ↛ 1723line 1719 didn't jump to line 1723 because the condition on line 1719 was always true
1720 conflicts["tools"] = tool_conflicts
1722 # Check gateway conflicts
1723 if "gateways" in entities: 1723 ↛ 1741line 1723 didn't jump to line 1741 because the condition on line 1723 was always true
1724 existing_gateways, _ = await self.gateway_service.list_gateways(db)
1725 existing_names = {g.name for g in existing_gateways}
1727 gateway_conflicts = []
1728 for gateway in entities["gateways"]:
1729 gateway_name = gateway.get("name", "")
1730 if gateway_name in existing_names: 1730 ↛ 1728line 1730 didn't jump to line 1728 because the condition on line 1730 was always true
1731 gateway_conflicts.append({"name": gateway_name, "type": "name_conflict", "description": gateway.get("description", "")})
1733 if gateway_conflicts: 1733 ↛ 1741line 1733 didn't jump to line 1741 because the condition on line 1733 was always true
1734 conflicts["gateways"] = gateway_conflicts
1736 # Add other entity types as needed...
1738 except Exception as e:
1739 logger.warning(f"Could not detect all conflicts: {e}")
1741 return conflicts
1743 async def _get_user_context(self, db: Session, imported_by: str) -> Optional[Dict[str, Any]]:
1744 """Get user context for import team assignment.
1746 Args:
1747 db: Database session
1748 imported_by: Email of importing user
1750 Returns:
1751 User context dict or None if not found
1752 """
1753 try:
1754 user = db.query(EmailUser).filter(EmailUser.email == imported_by).first()
1755 if not user:
1756 logger.warning(f"Could not find importing user: {imported_by}")
1757 return None
1759 personal_team = user.get_personal_team()
1760 if not personal_team:
1761 logger.warning(f"User {imported_by} has no personal team")
1762 return None
1764 return {"user_email": user.email, "team_id": personal_team.id, "team_name": personal_team.name}
1765 except Exception as e:
1766 logger.error(f"Failed to get user context: {e}")
1767 return None
1769 def _add_multitenancy_context(self, entity_data: Dict[str, Any], user_context: Dict[str, Any]) -> Dict[str, Any]:
1770 """Add team and visibility context to entity data for import.
1772 Args:
1773 entity_data: Original entity data
1774 user_context: User context with team information
1776 Returns:
1777 Entity data enhanced with multitenancy fields
1778 """
1779 # Create copy to avoid modifying original
1780 enhanced_data = dict(entity_data)
1782 # Add team assignment (assign to importing user's personal team)
1783 if not enhanced_data.get("team_id"): 1783 ↛ 1786line 1783 didn't jump to line 1786 because the condition on line 1783 was always true
1784 enhanced_data["team_id"] = user_context["team_id"]
1786 if not enhanced_data.get("owner_email"): 1786 ↛ 1791line 1786 didn't jump to line 1791 because the condition on line 1786 was always true
1787 enhanced_data["owner_email"] = user_context["user_email"]
1789 # Set visibility: use export value if present, otherwise default to 'public'
1790 # This supports pre-0.7.0 exports that don't have visibility field
1791 if not enhanced_data.get("visibility"): 1791 ↛ 1795line 1791 didn't jump to line 1795 because the condition on line 1791 was always true
1792 enhanced_data["visibility"] = "public" # Default to public for backward compatibility
1794 # Add import tracking
1795 if not enhanced_data.get("federation_source"): 1795 ↛ 1798line 1795 didn't jump to line 1798 because the condition on line 1795 was always true
1796 enhanced_data["federation_source"] = f"imported-by-{user_context['user_email']}"
1798 logger.debug(f"Enhanced entity with multitenancy: team_id={enhanced_data['team_id']}, visibility={enhanced_data['visibility']}")
1799 return enhanced_data
1801 async def _assign_imported_items_to_team(self, db: Session, imported_by: str) -> None:
1802 """Assign imported items without team assignment to the importer's personal team.
1804 Args:
1805 db: Database session
1806 imported_by: Email of user who performed the import
1807 """
1808 try:
1809 # Find the importing user and their personal team
1810 user = db.query(EmailUser).filter(EmailUser.email == imported_by).first()
1811 if not user:
1812 logger.warning(f"Could not find importing user {imported_by} - skipping team assignment")
1813 return
1815 personal_team = user.get_personal_team()
1816 if not personal_team:
1817 logger.warning(f"User {imported_by} has no personal team - skipping team assignment")
1818 return
1820 logger.info(f"Assigning orphaned imported items to {imported_by}'s team: {personal_team.name}")
1822 # Resource types to check
1823 resource_types = [("servers", Server), ("tools", Tool), ("resources", Resource), ("prompts", Prompt), ("gateways", Gateway), ("a2a_agents", A2AAgent)]
1825 total_assigned = 0
1827 for resource_name, resource_model in resource_types:
1828 try:
1829 # Find items without team assignment (recently imported)
1830 unassigned = db.query(resource_model).filter((resource_model.team_id.is_(None)) | (resource_model.owner_email.is_(None))).all()
1832 if unassigned:
1833 logger.info(f"Assigning {len(unassigned)} orphaned {resource_name} to user team")
1835 for item in unassigned:
1836 item.team_id = personal_team.id
1837 item.owner_email = user.email
1838 # Set imported items to public for better visibility
1839 item.visibility = "public"
1840 if hasattr(item, "federation_source") and not item.federation_source: 1840 ↛ 1835line 1840 didn't jump to line 1835 because the condition on line 1840 was always true
1841 item.federation_source = f"imported-by-{imported_by}"
1843 total_assigned += len(unassigned)
1845 except Exception as e:
1846 logger.error(f"Failed to assign {resource_name} to team: {e}")
1847 continue
1849 if total_assigned > 0:
1850 db.commit()
1851 logger.info(f"Assigned {total_assigned} imported items to {personal_team.name} with public visibility")
1852 else:
1853 logger.debug("No orphaned imported items found")
1855 except Exception as e:
1856 logger.error(f"Failed to assign imported items to team: {e}")
1857 # Don't fail the import for team assignment issues