Coverage for mcpgateway / services / import_service.py: 97%

800 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2# pylint: disable=import-outside-toplevel,no-name-in-module 

3"""Location: ./mcpgateway/services/import_service.py 

4Copyright 2025 

5SPDX-License-Identifier: Apache-2.0 

6Authors: Mihai Criveti 

7 

8Import Service Implementation. 

9This module implements comprehensive configuration import functionality according to the import specification. 

10It handles: 

11- Import file validation and schema compliance 

12- Entity creation and updates with conflict resolution 

13- Dependency resolution and processing order 

14- Authentication data decryption and re-encryption 

15- Dry-run functionality for validation 

16- Cross-environment key rotation support 

17- Import status tracking and progress reporting 

18""" 

19 

20# Standard 

21import base64 

22from datetime import datetime, timedelta, timezone 

23from enum import Enum 

24import logging 

25from typing import Any, Dict, List, Optional 

26import uuid 

27 

28# Third-Party 

29from sqlalchemy.orm import Session 

30 

31# First-Party 

32from mcpgateway.config import settings 

33from mcpgateway.db import A2AAgent, EmailUser, Gateway, Prompt, Resource, Server, Tool 

34from mcpgateway.schemas import AuthenticationValues, GatewayCreate, GatewayUpdate, PromptCreate, PromptUpdate, ResourceCreate, ResourceUpdate, ServerCreate, ServerUpdate, ToolCreate, ToolUpdate 

35from mcpgateway.services.gateway_service import GatewayNameConflictError 

36from mcpgateway.services.prompt_service import PromptNameConflictError 

37from mcpgateway.services.resource_service import ResourceURIConflictError 

38from mcpgateway.services.server_service import ServerNameConflictError 

39from mcpgateway.services.tool_service import ToolNameConflictError 

40from mcpgateway.utils.services_auth import decode_auth, encode_auth 

41 

42logger = logging.getLogger(__name__) 

43 

44 

45class ConflictStrategy(str, Enum): 

46 """Strategies for handling conflicts during import. 

47 

48 Examples: 

49 >>> ConflictStrategy.SKIP.value 

50 'skip' 

51 >>> ConflictStrategy.UPDATE.value 

52 'update' 

53 >>> ConflictStrategy.RENAME.value 

54 'rename' 

55 >>> ConflictStrategy.FAIL.value 

56 'fail' 

57 >>> ConflictStrategy("update") 

58 <ConflictStrategy.UPDATE: 'update'> 

59 """ 

60 

61 SKIP = "skip" 

62 UPDATE = "update" 

63 RENAME = "rename" 

64 FAIL = "fail" 

65 

66 

67class ImportError(Exception): # pylint: disable=redefined-builtin 

68 """Base class for import-related errors. 

69 

70 Examples: 

71 >>> error = ImportError("Something went wrong") 

72 >>> str(error) 

73 'Something went wrong' 

74 >>> isinstance(error, Exception) 

75 True 

76 """ 

77 

78 

79class ImportValidationError(ImportError): 

80 """Raised when import data validation fails. 

81 

82 Examples: 

83 >>> error = ImportValidationError("Invalid schema") 

84 >>> str(error) 

85 'Invalid schema' 

86 >>> isinstance(error, ImportError) 

87 True 

88 """ 

89 

90 

91class ImportConflictError(ImportError): 

92 """Raised when import conflicts cannot be resolved. 

93 

94 Examples: 

95 >>> error = ImportConflictError("Name conflict: tool_name") 

96 >>> str(error) 

97 'Name conflict: tool_name' 

98 >>> isinstance(error, ImportError) 

99 True 

100 """ 

101 

102 

103class ImportStatus: 

104 """Tracks the status of an import operation.""" 

105 

106 def __init__(self, import_id: str): 

107 """Initialize import status tracking. 

108 

109 Args: 

110 import_id: Unique identifier for the import operation 

111 

112 Examples: 

113 >>> status = ImportStatus("import_123") 

114 >>> status.import_id 

115 'import_123' 

116 >>> status.status 

117 'pending' 

118 >>> status.total_entities 

119 0 

120 """ 

121 self.import_id = import_id 

122 self.status = "pending" 

123 self.total_entities = 0 

124 self.processed_entities = 0 

125 self.created_entities = 0 

126 self.updated_entities = 0 

127 self.skipped_entities = 0 

128 self.failed_entities = 0 

129 self.errors: List[str] = [] 

130 self.warnings: List[str] = [] 

131 self.started_at = datetime.now(timezone.utc) 

132 self.completed_at: Optional[datetime] = None 

133 

134 def to_dict(self) -> Dict[str, Any]: 

135 """Convert status to dictionary for API responses. 

136 

137 Returns: 

138 Dictionary representation of import status 

139 """ 

140 return { 

141 "import_id": self.import_id, 

142 "status": self.status, 

143 "progress": { 

144 "total": self.total_entities, 

145 "processed": self.processed_entities, 

146 "created": self.created_entities, 

147 "updated": self.updated_entities, 

148 "skipped": self.skipped_entities, 

149 "failed": self.failed_entities, 

150 }, 

151 "errors": self.errors, 

152 "warnings": self.warnings, 

153 "started_at": self.started_at.isoformat(), 

154 "completed_at": self.completed_at.isoformat() if self.completed_at else None, 

155 } 

156 

157 

158class ImportService: 

159 """Service for importing MCP Gateway configuration and data. 

160 

161 This service provides comprehensive import functionality including: 

162 - Import file validation and schema compliance 

163 - Entity creation and updates with conflict resolution 

164 - Dependency resolution and correct processing order 

165 - Secure authentication data handling with re-encryption 

166 - Dry-run capabilities for validation without changes 

167 - Progress tracking and status reporting 

168 - Cross-environment key rotation support 

169 """ 

170 

171 def __init__(self): 

172 """Initialize the import service with required dependencies. 

173 

174 Creates instances of all entity services and initializes the active imports tracker. 

175 

176 Examples: 

177 >>> service = ImportService() 

178 >>> service.active_imports 

179 {} 

180 >>> hasattr(service, 'tool_service') 

181 True 

182 >>> hasattr(service, 'gateway_service') 

183 True 

184 """ 

185 # Prefer globally-initialized singletons from mcpgateway.main to ensure 

186 # services share initialized EventService/Redis clients. Import lazily 

187 # to avoid circular import at module load time. Fall back to local 

188 # instances if singletons are not available (tests, isolated usage). 

189 # Use globally-exported singletons from service modules so they 

190 # share initialized EventService/Redis clients created at app startup. 

191 # First-Party 

192 from mcpgateway.services.gateway_service import gateway_service 

193 from mcpgateway.services.prompt_service import prompt_service 

194 from mcpgateway.services.resource_service import resource_service 

195 from mcpgateway.services.root_service import root_service 

196 from mcpgateway.services.server_service import server_service 

197 from mcpgateway.services.tool_service import tool_service 

198 

199 self.gateway_service = gateway_service 

200 self.tool_service = tool_service 

201 self.resource_service = resource_service 

202 self.prompt_service = prompt_service 

203 self.server_service = server_service 

204 self.root_service = root_service 

205 self.active_imports: Dict[str, ImportStatus] = {} 

206 

207 async def initialize(self) -> None: 

208 """Initialize the import service.""" 

209 logger.info("Import service initialized") 

210 

211 async def shutdown(self) -> None: 

212 """Shutdown the import service.""" 

213 logger.info("Import service shutdown") 

214 

215 def validate_import_data(self, import_data: Dict[str, Any]) -> None: 

216 """Validate import data against the expected schema. 

217 

218 Args: 

219 import_data: The import data to validate 

220 

221 Raises: 

222 ImportValidationError: If validation fails 

223 

224 Examples: 

225 >>> service = ImportService() 

226 >>> valid_data = { 

227 ... "version": "2025-03-26", 

228 ... "exported_at": "2025-01-01T00:00:00Z", 

229 ... "entities": {"tools": []} 

230 ... } 

231 >>> service.validate_import_data(valid_data) # Should not raise 

232 

233 >>> invalid_data = {"missing": "version"} 

234 >>> try: 

235 ... service.validate_import_data(invalid_data) 

236 ... except ImportValidationError as e: 

237 ... "Missing required field" in str(e) 

238 True 

239 """ 

240 logger.debug("Validating import data structure") 

241 

242 # Check required top-level fields 

243 required_fields = ["version", "exported_at", "entities"] 

244 for field in required_fields: 

245 if field not in import_data: 

246 raise ImportValidationError(f"Missing required field: {field}") 

247 

248 # Validate version compatibility 

249 if not import_data.get("version"): 

250 raise ImportValidationError("Version field cannot be empty") 

251 

252 # Validate entities structure 

253 entities = import_data.get("entities", {}) 

254 if not isinstance(entities, dict): 

255 raise ImportValidationError("Entities must be a dictionary") 

256 

257 # Validate each entity type 

258 valid_entity_types = ["tools", "gateways", "servers", "prompts", "resources", "roots"] 

259 for entity_type, entity_list in entities.items(): 

260 if entity_type not in valid_entity_types: 

261 raise ImportValidationError(f"Unknown entity type: {entity_type}") 

262 

263 if not isinstance(entity_list, list): 

264 raise ImportValidationError(f"Entity type '{entity_type}' must be a list") 

265 

266 # Validate individual entities 

267 for i, entity in enumerate(entity_list): 

268 if not isinstance(entity, dict): 

269 raise ImportValidationError(f"Entity {i} in '{entity_type}' must be a dictionary") 

270 

271 # Check required fields based on entity type 

272 self._validate_entity_fields(entity_type, entity, i) 

273 

274 logger.debug("Import data validation passed") 

275 

276 def _validate_entity_fields(self, entity_type: str, entity: Dict[str, Any], index: int) -> None: 

277 """Validate required fields for a specific entity type. 

278 

279 Args: 

280 entity_type: Type of entity (tools, gateways, etc.) 

281 entity: Entity data dictionary 

282 index: Index of entity in list for error messages 

283 

284 Raises: 

285 ImportValidationError: If required fields are missing 

286 """ 

287 required_fields = { 

288 "tools": ["name", "url", "integration_type"], 

289 "gateways": ["name", "url"], 

290 "servers": ["name"], 

291 "prompts": ["name", "template"], 

292 "resources": ["name", "uri"], 

293 "roots": ["uri", "name"], 

294 } 

295 

296 if entity_type in required_fields: 296 ↛ exitline 296 didn't return from function '_validate_entity_fields' because the condition on line 296 was always true

297 for field in required_fields[entity_type]: 

298 if field not in entity: 

299 raise ImportValidationError(f"Entity {index} in '{entity_type}' missing required field: {field}") 

300 

301 async def import_configuration( 

302 self, 

303 db: Session, 

304 import_data: Dict[str, Any], 

305 conflict_strategy: ConflictStrategy = ConflictStrategy.UPDATE, 

306 dry_run: bool = False, 

307 rekey_secret: Optional[str] = None, 

308 imported_by: str = "system", 

309 selected_entities: Optional[Dict[str, List[str]]] = None, 

310 ) -> ImportStatus: 

311 """Import configuration data with conflict resolution. 

312 

313 Args: 

314 db: Database session 

315 import_data: The validated import data 

316 conflict_strategy: How to handle naming conflicts 

317 dry_run: If True, validate but don't make changes 

318 rekey_secret: New encryption secret for cross-environment imports 

319 imported_by: Username of the person performing the import 

320 selected_entities: Dict of entity types to specific entity names/ids to import 

321 

322 Returns: 

323 ImportStatus: Status object tracking import progress and results 

324 

325 Raises: 

326 ImportError: If import fails 

327 """ 

328 import_id = str(uuid.uuid4()) 

329 status = ImportStatus(import_id) 

330 self.active_imports[import_id] = status 

331 

332 try: 

333 logger.info(f"Starting configuration import {import_id} by {imported_by} (dry_run={dry_run})") 

334 

335 # Validate import data 

336 self.validate_import_data(import_data) 

337 

338 # Calculate total entities to process 

339 entities = import_data.get("entities", {}) 

340 status.total_entities = self._calculate_total_entities(entities, selected_entities) 

341 

342 status.status = "running" 

343 

344 # Process entities in dependency order 

345 processing_order = ["roots", "gateways", "tools", "resources", "prompts", "servers"] 

346 

347 for entity_type in processing_order: 

348 if entity_type in entities: 

349 await self._process_entities(db, entity_type, entities[entity_type], conflict_strategy, dry_run, rekey_secret, status, selected_entities, imported_by) 

350 # Flush after each entity type to make records visible for associations 

351 if not dry_run: 

352 db.flush() 

353 

354 # Assign all imported items to user's team with public visibility (after all entities processed) 

355 if not dry_run: 

356 await self._assign_imported_items_to_team(db, imported_by) 

357 

358 # Mark as completed 

359 status.status = "completed" 

360 status.completed_at = datetime.now(timezone.utc) 

361 

362 logger.info(f"Import {import_id} completed: created={status.created_entities}, updated={status.updated_entities}, skipped={status.skipped_entities}, failed={status.failed_entities}") 

363 

364 return status 

365 

366 except Exception as e: 

367 status.status = "failed" 

368 status.completed_at = datetime.now(timezone.utc) 

369 status.errors.append(f"Import failed: {str(e)}") 

370 logger.error(f"Import {import_id} failed: {str(e)}") 

371 raise ImportError(f"Import failed: {str(e)}") 

372 

373 def _get_entity_identifier(self, entity_type: str, entity: Dict[str, Any]) -> str: 

374 """Get the unique identifier for an entity based on its type. 

375 

376 Args: 

377 entity_type: Type of entity 

378 entity: Entity data dictionary 

379 

380 Returns: 

381 Unique identifier string for the entity 

382 

383 Examples: 

384 >>> service = ImportService() 

385 >>> tool_entity = {"name": "my_tool", "url": "https://example.com"} 

386 >>> service._get_entity_identifier("tools", tool_entity) 

387 'my_tool' 

388 

389 >>> resource_entity = {"name": "my_resource", "uri": "/api/data"} 

390 >>> service._get_entity_identifier("resources", resource_entity) 

391 '/api/data' 

392 

393 >>> root_entity = {"name": "workspace", "uri": "file:///workspace"} 

394 >>> service._get_entity_identifier("roots", root_entity) 

395 'file:///workspace' 

396 

397 >>> unknown_entity = {"data": "test"} 

398 >>> service._get_entity_identifier("unknown", unknown_entity) 

399 '' 

400 """ 

401 if entity_type in ["tools", "gateways", "servers", "prompts"]: 

402 return entity.get("name", "") 

403 if entity_type == "resources": 

404 return entity.get("uri", "") 

405 if entity_type == "roots": 

406 return entity.get("uri", "") 

407 return "" 

408 

409 def _calculate_total_entities(self, entities: Dict[str, List[Dict[str, Any]]], selected_entities: Optional[Dict[str, List[str]]]) -> int: 

410 """Calculate total entities to process based on selection criteria. 

411 

412 Args: 

413 entities: Dictionary of entities from import data 

414 selected_entities: Optional entity selection filter 

415 

416 Returns: 

417 Total number of entities to process 

418 

419 Examples: 

420 No selection counts all entities: 

421 >>> svc = ImportService() 

422 >>> entities = { 

423 ... 'tools': [{"name": "t1"}, {"name": "t2"}], 

424 ... 'resources': [{"uri": "/r1"}], 

425 ... } 

426 >>> svc._calculate_total_entities(entities, selected_entities=None) 

427 3 

428 

429 Selection for a subset by name/identifier: 

430 >>> selected = {'tools': ['t2'], 'resources': ['/r1']} 

431 >>> svc._calculate_total_entities(entities, selected) 

432 2 

433 

434 Selection for only a type (empty list means all of that type): 

435 >>> selected = {'tools': []} 

436 >>> svc._calculate_total_entities(entities, selected) 

437 2 

438 """ 

439 if selected_entities: 

440 total = 0 

441 for entity_type, entity_list in entities.items(): 

442 if entity_type in selected_entities: 

443 selected_names = selected_entities[entity_type] 

444 if selected_names: 

445 # Count entities that match selection 

446 for entity in entity_list: 

447 entity_name = self._get_entity_identifier(entity_type, entity) 

448 if entity_name in selected_names: 

449 total += 1 

450 else: 

451 total += len(entity_list) 

452 return total 

453 return sum(len(entity_list) for entity_list in entities.values()) 

454 

455 async def _process_entities( 

456 self, 

457 db: Session, 

458 entity_type: str, 

459 entity_list: List[Dict[str, Any]], 

460 conflict_strategy: ConflictStrategy, 

461 dry_run: bool, 

462 rekey_secret: Optional[str], 

463 status: ImportStatus, 

464 selected_entities: Optional[Dict[str, List[str]]], 

465 imported_by: str, 

466 ) -> None: 

467 """Process a list of entities of a specific type using bulk operations. 

468 

469 This method now uses bulk registration for tools, resources, and prompts 

470 to achieve 10-50x performance improvements over individual processing. 

471 

472 Args: 

473 db: Database session 

474 entity_type: Type of entities being processed 

475 entity_list: List of entity data dictionaries 

476 conflict_strategy: How to handle naming conflicts 

477 dry_run: Whether this is a dry run 

478 rekey_secret: New encryption secret if re-keying 

479 status: Import status tracker 

480 selected_entities: Optional entity selection filter 

481 imported_by: Username of the person performing the import 

482 """ 

483 logger.debug(f"Processing {len(entity_list)} {entity_type} entities") 

484 

485 # Filter entities based on selection 

486 filtered_entities = [] 

487 for entity_data in entity_list: 

488 # Check if this entity is selected for import 

489 if selected_entities and entity_type in selected_entities: 

490 selected_names = selected_entities[entity_type] 

491 if selected_names: # If specific entities are selected 

492 entity_name = self._get_entity_identifier(entity_type, entity_data) 

493 if entity_name not in selected_names: 

494 continue # Skip this entity 

495 

496 # Handle authentication re-encryption if needed 

497 if rekey_secret and self._has_auth_data(entity_data): 

498 entity_data = self._rekey_auth_data(entity_data, rekey_secret) 

499 

500 filtered_entities.append(entity_data) 

501 

502 if not filtered_entities: 

503 logger.debug(f"No {entity_type} entities to process after filtering") 

504 return 

505 

506 # Use bulk operations for tools, resources, and prompts 

507 if entity_type == "tools": 

508 await self._process_tools_bulk(db, filtered_entities, conflict_strategy, dry_run, status, imported_by) 

509 elif entity_type == "resources": 

510 await self._process_resources_bulk(db, filtered_entities, conflict_strategy, dry_run, status, imported_by) 

511 elif entity_type == "prompts": 

512 await self._process_prompts_bulk(db, filtered_entities, conflict_strategy, dry_run, status, imported_by) 

513 else: 

514 # Fall back to individual processing for other entity types 

515 for entity_data in filtered_entities: 

516 try: 

517 await self._process_single_entity(db, entity_type, entity_data, conflict_strategy, dry_run, status, imported_by) 

518 status.processed_entities += 1 

519 except Exception as e: 

520 status.failed_entities += 1 

521 status.errors.append(f"Failed to process {entity_type} entity: {str(e)}") 

522 logger.error(f"Failed to process {entity_type} entity: {str(e)}") 

523 

524 def _has_auth_data(self, entity_data: Dict[str, Any]) -> bool: 

525 """Check if entity has authentication data that needs re-encryption. 

526 

527 Args: 

528 entity_data: Entity data dictionary 

529 

530 Returns: 

531 True if entity has auth data, False otherwise 

532 

533 Examples: 

534 >>> service = ImportService() 

535 >>> entity_with_auth = {"name": "test", "auth_value": "encrypted_data"} 

536 >>> bool(service._has_auth_data(entity_with_auth)) 

537 True 

538 

539 >>> entity_without_auth = {"name": "test"} 

540 >>> service._has_auth_data(entity_without_auth) 

541 False 

542 

543 >>> entity_empty_auth = {"name": "test", "auth_value": ""} 

544 >>> bool(service._has_auth_data(entity_empty_auth)) 

545 False 

546 

547 >>> entity_none_auth = {"name": "test", "auth_value": None} 

548 >>> bool(service._has_auth_data(entity_none_auth)) 

549 False 

550 """ 

551 return "auth_value" in entity_data and entity_data.get("auth_value") 

552 

553 def _rekey_auth_data(self, entity_data: Dict[str, Any], new_secret: str) -> Dict[str, Any]: 

554 """Re-encrypt authentication data with a new secret key. 

555 

556 Args: 

557 entity_data: Entity data dictionary 

558 new_secret: New encryption secret 

559 

560 Returns: 

561 Updated entity data with re-encrypted auth 

562 

563 Raises: 

564 ImportError: If re-encryption fails 

565 

566 Examples: 

567 Returns original entity when no auth data present: 

568 >>> svc = ImportService() 

569 >>> svc._has_auth_data({'name': 'x'}) 

570 False 

571 >>> svc._rekey_auth_data({'name': 'x'}, 'new') 

572 {'name': 'x'} 

573 

574 Rekeys when auth data is present (encode/decode patched): 

575 >>> from unittest.mock import patch 

576 >>> data = {'name': 'x', 'auth_value': 'enc_old'} 

577 >>> with patch('mcpgateway.services.import_service.decode_auth', return_value='plain'): 

578 ... with patch('mcpgateway.services.import_service.encode_auth', return_value='enc_new'): 

579 ... result = svc._rekey_auth_data(dict(data), 'new-secret') 

580 >>> result['auth_value'] 

581 'enc_new' 

582 """ 

583 if not self._has_auth_data(entity_data): 

584 return entity_data 

585 

586 try: 

587 # Decrypt with old key 

588 old_auth_value = entity_data["auth_value"] 

589 decrypted_auth = decode_auth(old_auth_value) 

590 

591 # Re-encrypt with new key (temporarily change settings) 

592 old_secret = settings.auth_encryption_secret 

593 settings.auth_encryption_secret = new_secret 

594 try: 

595 new_auth_value = encode_auth(decrypted_auth) 

596 entity_data["auth_value"] = new_auth_value 

597 finally: 

598 settings.auth_encryption_secret = old_secret 

599 

600 logger.debug("Successfully re-keyed authentication data") 

601 return entity_data 

602 

603 except Exception as e: 

604 raise ImportError(f"Failed to re-key authentication data: {str(e)}") 

605 

606 async def _process_single_entity( 

607 self, db: Session, entity_type: str, entity_data: Dict[str, Any], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus, imported_by: str 

608 ) -> None: 

609 """Process a single entity with conflict resolution. 

610 

611 Args: 

612 db: Database session 

613 entity_type: Type of entity 

614 entity_data: Entity data dictionary 

615 conflict_strategy: How to handle conflicts 

616 dry_run: Whether this is a dry run 

617 status: Import status tracker 

618 imported_by: Username of the person performing the import 

619 

620 Raises: 

621 ImportError: If processing fails 

622 """ 

623 try: 

624 if entity_type == "tools": 

625 await self._process_tool(db, entity_data, conflict_strategy, dry_run, status) 

626 elif entity_type == "gateways": 

627 await self._process_gateway(db, entity_data, conflict_strategy, dry_run, status) 

628 elif entity_type == "servers": 

629 await self._process_server(db, entity_data, conflict_strategy, dry_run, status, imported_by) 

630 elif entity_type == "prompts": 

631 await self._process_prompt(db, entity_data, conflict_strategy, dry_run, status) 

632 elif entity_type == "resources": 

633 await self._process_resource(db, entity_data, conflict_strategy, dry_run, status) 

634 elif entity_type == "roots": 634 ↛ exitline 634 didn't return from function '_process_single_entity' because the condition on line 634 was always true

635 await self._process_root(entity_data, conflict_strategy, dry_run, status) 

636 

637 except Exception as e: 

638 raise ImportError(f"Failed to process {entity_type}: {str(e)}") 

639 

640 async def _process_tool(self, db: Session, tool_data: Dict[str, Any], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus) -> None: 

641 """Process a tool entity. 

642 

643 Args: 

644 db: Database session 

645 tool_data: Tool data dictionary 

646 conflict_strategy: How to handle conflicts 

647 dry_run: Whether this is a dry run 

648 status: Import status tracker 

649 

650 Raises: 

651 ImportError: If processing fails 

652 ImportConflictError: If conflict cannot be resolved 

653 """ 

654 tool_name = tool_data["name"] 

655 

656 if dry_run: 

657 status.warnings.append(f"Would import tool: {tool_name}") 

658 return 

659 

660 try: 

661 # Convert to ToolCreate schema 

662 create_data = self._convert_to_tool_create(tool_data) 

663 

664 # Try to create the tool 

665 try: 

666 await self.tool_service.register_tool(db, create_data) 

667 status.created_entities += 1 

668 logger.debug(f"Created tool: {tool_name}") 

669 

670 except ToolNameConflictError: 

671 # Handle conflict based on strategy 

672 if conflict_strategy == ConflictStrategy.SKIP: 

673 status.skipped_entities += 1 

674 status.warnings.append(f"Skipped existing tool: {tool_name}") 

675 elif conflict_strategy == ConflictStrategy.UPDATE: 

676 # For conflict resolution, we need to find existing tool ID 

677 # This is a simplified approach - in practice you'd query the database 

678 try: 

679 # Try to get tools and find by name 

680 tools, _ = await self.tool_service.list_tools(db, include_inactive=True) 

681 existing_tool = next((t for t in tools if t.original_name == tool_name), None) 

682 if existing_tool: 

683 update_data = self._convert_to_tool_update(tool_data) 

684 await self.tool_service.update_tool(db, existing_tool.id, update_data) 

685 status.updated_entities += 1 

686 logger.debug(f"Updated tool: {tool_name}") 

687 else: 

688 status.warnings.append(f"Could not find existing tool to update: {tool_name}") 

689 status.skipped_entities += 1 

690 except Exception as update_error: 

691 logger.warning(f"Failed to update tool {tool_name}: {str(update_error)}") 

692 status.warnings.append(f"Could not update tool {tool_name}: {str(update_error)}") 

693 status.skipped_entities += 1 

694 elif conflict_strategy == ConflictStrategy.RENAME: 

695 # Rename and create 

696 new_name = f"{tool_name}_imported_{int(datetime.now().timestamp())}" 

697 create_data.name = new_name 

698 await self.tool_service.register_tool(db, create_data) 

699 status.created_entities += 1 

700 status.warnings.append(f"Renamed tool {tool_name} to {new_name}") 

701 elif conflict_strategy == ConflictStrategy.FAIL: 701 ↛ exitline 701 didn't return from function '_process_tool' because the condition on line 701 was always true

702 raise ImportConflictError(f"Tool name conflict: {tool_name}") 

703 

704 except Exception as e: 

705 raise ImportError(f"Failed to process tool {tool_name}: {str(e)}") 

706 

707 async def _process_gateway(self, db: Session, gateway_data: Dict[str, Any], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus) -> None: 

708 """Process a gateway entity. 

709 

710 Args: 

711 db: Database session 

712 gateway_data: Gateway data dictionary 

713 conflict_strategy: How to handle conflicts 

714 dry_run: Whether this is a dry run 

715 status: Import status tracker 

716 

717 Raises: 

718 ImportError: If processing fails 

719 ImportConflictError: If conflict cannot be resolved 

720 """ 

721 gateway_name = gateway_data["name"] 

722 

723 if dry_run is True: 

724 status.warnings.append(f"Would import gateway: {gateway_name}") 

725 return 

726 

727 try: 

728 # Convert to GatewayCreate schema 

729 create_data = self._convert_to_gateway_create(gateway_data) 

730 

731 try: 

732 await self.gateway_service.register_gateway(db, create_data) 

733 status.created_entities += 1 

734 logger.debug(f"Created gateway: {gateway_name}") 

735 

736 except GatewayNameConflictError: 

737 if conflict_strategy == ConflictStrategy.SKIP: 

738 status.skipped_entities += 1 

739 status.warnings.append(f"Skipped existing gateway: {gateway_name}") 

740 elif conflict_strategy == ConflictStrategy.UPDATE: 

741 try: 

742 # Find existing gateway by name 

743 gateways, _ = await self.gateway_service.list_gateways(db, include_inactive=True) 

744 existing_gateway = next((g for g in gateways if g.name == gateway_name), None) 

745 if existing_gateway: 

746 update_data = self._convert_to_gateway_update(gateway_data) 

747 await self.gateway_service.update_gateway(db, existing_gateway.id, update_data) 

748 status.updated_entities += 1 

749 logger.debug(f"Updated gateway: {gateway_name}") 

750 else: 

751 status.warnings.append(f"Could not find existing gateway to update: {gateway_name}") 

752 status.skipped_entities += 1 

753 except Exception as update_error: 

754 logger.warning(f"Failed to update gateway {gateway_name}: {str(update_error)}") 

755 status.warnings.append(f"Could not update gateway {gateway_name}: {str(update_error)}") 

756 status.skipped_entities += 1 

757 elif conflict_strategy == ConflictStrategy.RENAME: 

758 new_name = f"{gateway_name}_imported_{int(datetime.now().timestamp())}" 

759 create_data.name = new_name 

760 await self.gateway_service.register_gateway(db, create_data) 

761 status.created_entities += 1 

762 status.warnings.append(f"Renamed gateway {gateway_name} to {new_name}") 

763 elif conflict_strategy == ConflictStrategy.FAIL: 763 ↛ exitline 763 didn't return from function '_process_gateway' because the condition on line 763 was always true

764 raise ImportConflictError(f"Gateway name conflict: {gateway_name}") 

765 

766 except Exception as e: 

767 raise ImportError(f"Failed to process gateway {gateway_name}: {str(e)}") 

768 

769 async def _process_server(self, db: Session, server_data: Dict[str, Any], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus, imported_by: str) -> None: 

770 """Process a server entity. 

771 

772 Args: 

773 db: Database session 

774 server_data: Server data dictionary 

775 conflict_strategy: How to handle conflicts 

776 dry_run: Whether this is a dry run 

777 status: Import status tracker 

778 imported_by: Username of the person performing the import 

779 

780 Raises: 

781 ImportError: If processing fails 

782 ImportConflictError: If conflict cannot be resolved 

783 """ 

784 server_name = server_data["name"] 

785 

786 if dry_run: 

787 status.warnings.append(f"Would import server: {server_name}") 

788 return 

789 

790 try: 

791 create_data = await self._convert_to_server_create(db, server_data) 

792 

793 try: 

794 await self.server_service.register_server(db, create_data) 

795 status.created_entities += 1 

796 logger.debug(f"Created server: {server_name}") 

797 

798 except ServerNameConflictError: 

799 if conflict_strategy == ConflictStrategy.SKIP: 

800 status.skipped_entities += 1 

801 status.warnings.append(f"Skipped existing server: {server_name}") 

802 elif conflict_strategy == ConflictStrategy.UPDATE: 

803 try: 

804 # Find existing server by name 

805 servers = await self.server_service.list_servers(db, include_inactive=True) 

806 existing_server = next((s for s in servers if s.name == server_name), None) 

807 if existing_server: 

808 update_data = await self._convert_to_server_update(db, server_data) 

809 await self.server_service.update_server(db, existing_server.id, update_data, imported_by) 

810 status.updated_entities += 1 

811 logger.debug(f"Updated server: {server_name}") 

812 else: 

813 status.warnings.append(f"Could not find existing server to update: {server_name}") 

814 status.skipped_entities += 1 

815 except Exception as update_error: 

816 logger.warning(f"Failed to update server {server_name}: {str(update_error)}") 

817 status.warnings.append(f"Could not update server {server_name}: {str(update_error)}") 

818 status.skipped_entities += 1 

819 elif conflict_strategy == ConflictStrategy.RENAME: 

820 new_name = f"{server_name}_imported_{int(datetime.now().timestamp())}" 

821 create_data.name = new_name 

822 await self.server_service.register_server(db, create_data) 

823 status.created_entities += 1 

824 status.warnings.append(f"Renamed server {server_name} to {new_name}") 

825 elif conflict_strategy == ConflictStrategy.FAIL: 825 ↛ exitline 825 didn't return from function '_process_server' because the condition on line 825 was always true

826 raise ImportConflictError(f"Server name conflict: {server_name}") 

827 

828 except Exception as e: 

829 raise ImportError(f"Failed to process server {server_name}: {str(e)}") 

830 

831 async def _process_prompt(self, db: Session, prompt_data: Dict[str, Any], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus) -> None: 

832 """Process a prompt entity. 

833 

834 Args: 

835 db: Database session 

836 prompt_data: Prompt data dictionary 

837 conflict_strategy: How to handle conflicts 

838 dry_run: Whether this is a dry run 

839 status: Import status tracker 

840 

841 Raises: 

842 ImportError: If processing fails 

843 ImportConflictError: If conflict cannot be resolved 

844 """ 

845 prompt_name = prompt_data["name"] 

846 

847 if dry_run: 

848 status.warnings.append(f"Would import prompt: {prompt_name}") 

849 return 

850 

851 try: 

852 create_data = self._convert_to_prompt_create(prompt_data) 

853 

854 try: 

855 await self.prompt_service.register_prompt(db, create_data) 

856 status.created_entities += 1 

857 logger.debug(f"Created prompt: {prompt_name}") 

858 

859 except PromptNameConflictError: 

860 if conflict_strategy == ConflictStrategy.SKIP: 

861 status.skipped_entities += 1 

862 status.warnings.append(f"Skipped existing prompt: {prompt_name}") 

863 elif conflict_strategy == ConflictStrategy.UPDATE: 

864 update_data = self._convert_to_prompt_update(prompt_data) 

865 await self.prompt_service.update_prompt(db, prompt_name, update_data) 

866 status.updated_entities += 1 

867 logger.debug(f"Updated prompt: {prompt_name}") 

868 elif conflict_strategy == ConflictStrategy.RENAME: 

869 new_name = f"{prompt_name}_imported_{int(datetime.now().timestamp())}" 

870 create_data.name = new_name 

871 await self.prompt_service.register_prompt(db, create_data) 

872 status.created_entities += 1 

873 status.warnings.append(f"Renamed prompt {prompt_name} to {new_name}") 

874 elif conflict_strategy == ConflictStrategy.FAIL: 874 ↛ exitline 874 didn't return from function '_process_prompt' because the condition on line 874 was always true

875 raise ImportConflictError(f"Prompt name conflict: {prompt_name}") 

876 

877 except Exception as e: 

878 raise ImportError(f"Failed to process prompt {prompt_name}: {str(e)}") 

879 

880 async def _process_resource(self, db: Session, resource_data: Dict[str, Any], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus) -> None: 

881 """Process a resource entity. 

882 

883 Args: 

884 db: Database session 

885 resource_data: Resource data dictionary 

886 conflict_strategy: How to handle conflicts 

887 dry_run: Whether this is a dry run 

888 status: Import status tracker 

889 

890 Raises: 

891 ImportError: If processing fails 

892 ImportConflictError: If conflict cannot be resolved 

893 """ 

894 resource_uri = resource_data["uri"] 

895 

896 if dry_run: 

897 status.warnings.append(f"Would import resource: {resource_uri}") 

898 return 

899 

900 try: 

901 create_data = self._convert_to_resource_create(resource_data) 

902 

903 try: 

904 await self.resource_service.register_resource(db, create_data) 

905 status.created_entities += 1 

906 logger.debug(f"Created resource: {resource_uri}") 

907 

908 except ResourceURIConflictError: 

909 if conflict_strategy == ConflictStrategy.SKIP: 

910 status.skipped_entities += 1 

911 status.warnings.append(f"Skipped existing resource: {resource_uri}") 

912 elif conflict_strategy == ConflictStrategy.UPDATE: 

913 update_data = self._convert_to_resource_update(resource_data) 

914 await self.resource_service.update_resource(db, resource_uri, update_data) 

915 status.updated_entities += 1 

916 logger.debug(f"Updated resource: {resource_uri}") 

917 elif conflict_strategy == ConflictStrategy.RENAME: 

918 new_uri = f"{resource_uri}_imported_{int(datetime.now().timestamp())}" 

919 create_data.uri = new_uri 

920 await self.resource_service.register_resource(db, create_data) 

921 status.created_entities += 1 

922 status.warnings.append(f"Renamed resource {resource_uri} to {new_uri}") 

923 elif conflict_strategy == ConflictStrategy.FAIL: 923 ↛ exitline 923 didn't return from function '_process_resource' because the condition on line 923 was always true

924 raise ImportConflictError(f"Resource URI conflict: {resource_uri}") 

925 

926 except Exception as e: 

927 raise ImportError(f"Failed to process resource {resource_uri}: {str(e)}") 

928 

929 async def _process_tools_bulk(self, db: Session, tools_data: List[Dict[str, Any]], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus, imported_by: str) -> None: 

930 """Process multiple tools using bulk operations. 

931 

932 Args: 

933 db: Database session 

934 tools_data: List of tool data dictionaries 

935 conflict_strategy: How to handle conflicts 

936 dry_run: Whether this is a dry run 

937 status: Import status tracker 

938 imported_by: Username of the person performing the import 

939 """ 

940 if dry_run: 

941 for tool_data in tools_data: 

942 status.warnings.append(f"Would import tool: {tool_data.get('name', 'unknown')}") 

943 return 

944 

945 try: 

946 # Convert all tool data to ToolCreate schemas 

947 tools_to_register = [] 

948 for tool_data in tools_data: 

949 try: 

950 create_data = self._convert_to_tool_create(tool_data) 

951 tools_to_register.append(create_data) 

952 except Exception as e: 

953 status.failed_entities += 1 

954 status.errors.append(f"Failed to convert tool {tool_data.get('name', 'unknown')}: {str(e)}") 

955 logger.warning(f"Failed to convert tool data: {str(e)}") 

956 

957 if not tools_to_register: 

958 return 

959 

960 # Use bulk registration 

961 result = await self.tool_service.register_tools_bulk( 

962 db=db, 

963 tools=tools_to_register, 

964 created_by=imported_by, 

965 created_via="import", 

966 conflict_strategy=conflict_strategy.value, 

967 ) 

968 

969 # Update status based on results 

970 status.created_entities += result["created"] 

971 status.updated_entities += result["updated"] 

972 status.skipped_entities += result["skipped"] 

973 status.failed_entities += result["failed"] 

974 status.processed_entities += result["created"] + result["updated"] + result["skipped"] 

975 

976 # Add any errors to status 

977 for error in result.get("errors", []): 

978 status.errors.append(error) 

979 

980 logger.info(f"Bulk processed {len(tools_data)} tools: {result['created']} created, {result['updated']} updated, {result['skipped']} skipped, {result['failed']} failed") 

981 

982 except Exception as e: 

983 status.failed_entities += len(tools_data) 

984 status.errors.append(f"Bulk tool processing failed: {str(e)}") 

985 logger.error(f"Failed to bulk process tools: {str(e)}") 

986 # Don't raise - allow import to continue with other entities 

987 

988 async def _process_resources_bulk(self, db: Session, resources_data: List[Dict[str, Any]], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus, imported_by: str) -> None: 

989 """Process multiple resources using bulk operations. 

990 

991 Args: 

992 db: Database session 

993 resources_data: List of resource data dictionaries 

994 conflict_strategy: How to handle conflicts 

995 dry_run: Whether this is a dry run 

996 status: Import status tracker 

997 imported_by: Username of the person performing the import 

998 """ 

999 if dry_run: 

1000 for resource_data in resources_data: 

1001 status.warnings.append(f"Would import resource: {resource_data.get('uri', 'unknown')}") 

1002 return 

1003 

1004 try: 

1005 # Convert all resource data to ResourceCreate schemas 

1006 resources_to_register = [] 

1007 for resource_data in resources_data: 

1008 try: 

1009 create_data = self._convert_to_resource_create(resource_data) 

1010 resources_to_register.append(create_data) 

1011 except Exception as e: 

1012 status.failed_entities += 1 

1013 status.errors.append(f"Failed to convert resource {resource_data.get('uri', 'unknown')}: {str(e)}") 

1014 logger.warning(f"Failed to convert resource data: {str(e)}") 

1015 

1016 if not resources_to_register: 

1017 return 

1018 

1019 # Use bulk registration 

1020 result = await self.resource_service.register_resources_bulk( 

1021 db=db, 

1022 resources=resources_to_register, 

1023 created_by=imported_by, 

1024 created_via="import", 

1025 conflict_strategy=conflict_strategy.value, 

1026 ) 

1027 

1028 # Update status based on results 

1029 status.created_entities += result["created"] 

1030 status.updated_entities += result["updated"] 

1031 status.skipped_entities += result["skipped"] 

1032 status.failed_entities += result["failed"] 

1033 status.processed_entities += result["created"] + result["updated"] + result["skipped"] 

1034 

1035 # Add any errors to status 

1036 for error in result.get("errors", []): 

1037 status.errors.append(error) 

1038 

1039 logger.info(f"Bulk processed {len(resources_data)} resources: {result['created']} created, {result['updated']} updated, {result['skipped']} skipped, {result['failed']} failed") 

1040 

1041 except Exception as e: 

1042 status.failed_entities += len(resources_data) 

1043 status.errors.append(f"Bulk resource processing failed: {str(e)}") 

1044 logger.error(f"Failed to bulk process resources: {str(e)}") 

1045 # Don't raise - allow import to continue with other entities 

1046 

1047 async def _process_prompts_bulk(self, db: Session, prompts_data: List[Dict[str, Any]], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus, imported_by: str) -> None: 

1048 """Process multiple prompts using bulk operations. 

1049 

1050 Args: 

1051 db: Database session 

1052 prompts_data: List of prompt data dictionaries 

1053 conflict_strategy: How to handle conflicts 

1054 dry_run: Whether this is a dry run 

1055 status: Import status tracker 

1056 imported_by: Username of the person performing the import 

1057 """ 

1058 if dry_run: 

1059 for prompt_data in prompts_data: 

1060 status.warnings.append(f"Would import prompt: {prompt_data.get('name', 'unknown')}") 

1061 return 

1062 

1063 try: 

1064 # Convert all prompt data to PromptCreate schemas 

1065 prompts_to_register = [] 

1066 for prompt_data in prompts_data: 

1067 try: 

1068 create_data = self._convert_to_prompt_create(prompt_data) 

1069 prompts_to_register.append(create_data) 

1070 except Exception as e: 

1071 status.failed_entities += 1 

1072 status.errors.append(f"Failed to convert prompt {prompt_data.get('name', 'unknown')}: {str(e)}") 

1073 logger.warning(f"Failed to convert prompt data: {str(e)}") 

1074 

1075 if not prompts_to_register: 

1076 return 

1077 

1078 # Use bulk registration 

1079 result = await self.prompt_service.register_prompts_bulk( 

1080 db=db, 

1081 prompts=prompts_to_register, 

1082 created_by=imported_by, 

1083 created_via="import", 

1084 conflict_strategy=conflict_strategy.value, 

1085 ) 

1086 

1087 # Update status based on results 

1088 status.created_entities += result["created"] 

1089 status.updated_entities += result["updated"] 

1090 status.skipped_entities += result["skipped"] 

1091 status.failed_entities += result["failed"] 

1092 status.processed_entities += result["created"] + result["updated"] + result["skipped"] 

1093 

1094 # Add any errors to status 

1095 for error in result.get("errors", []): 

1096 status.errors.append(error) 

1097 

1098 logger.info(f"Bulk processed {len(prompts_data)} prompts: {result['created']} created, {result['updated']} updated, {result['skipped']} skipped, {result['failed']} failed") 

1099 

1100 except Exception as e: 

1101 status.failed_entities += len(prompts_data) 

1102 status.errors.append(f"Bulk prompt processing failed: {str(e)}") 

1103 logger.error(f"Failed to bulk process prompts: {str(e)}") 

1104 # Don't raise - allow import to continue with other entities 

1105 

1106 async def _process_root(self, root_data: Dict[str, Any], conflict_strategy: ConflictStrategy, dry_run: bool, status: ImportStatus) -> None: 

1107 """Process a root entity. 

1108 

1109 Args: 

1110 root_data: Root data dictionary 

1111 conflict_strategy: How to handle conflicts 

1112 dry_run: Whether this is a dry run 

1113 status: Import status tracker 

1114 

1115 Raises: 

1116 ImportError: If processing fails 

1117 ImportConflictError: If conflict cannot be resolved 

1118 """ 

1119 root_uri = root_data["uri"] 

1120 

1121 if dry_run: 

1122 status.warnings.append(f"Would import root: {root_uri}") 

1123 return 

1124 

1125 try: 

1126 await self.root_service.add_root(root_uri, root_data.get("name")) 

1127 status.created_entities += 1 

1128 logger.debug(f"Created root: {root_uri}") 

1129 

1130 except Exception as e: 

1131 if conflict_strategy == ConflictStrategy.SKIP: 

1132 status.skipped_entities += 1 

1133 status.warnings.append(f"Skipped existing root: {root_uri}") 

1134 elif conflict_strategy == ConflictStrategy.FAIL: 

1135 raise ImportConflictError(f"Root URI conflict: {root_uri}") 

1136 else: 

1137 raise ImportError(f"Failed to process root {root_uri}: {str(e)}") 

1138 

1139 def _convert_to_tool_create(self, tool_data: Dict[str, Any]) -> ToolCreate: 

1140 """Convert import data to ToolCreate schema. 

1141 

1142 Args: 

1143 tool_data: Tool data dictionary from import 

1144 

1145 Returns: 

1146 ToolCreate schema object 

1147 """ 

1148 # Extract auth information if present 

1149 auth_info = None 

1150 if tool_data.get("auth_type") and tool_data.get("auth_value"): 

1151 auth_info = AuthenticationValues(auth_type=tool_data["auth_type"], auth_value=tool_data["auth_value"]) 

1152 

1153 return ToolCreate( 

1154 name=tool_data["name"], 

1155 displayName=tool_data.get("displayName"), 

1156 url=tool_data["url"], 

1157 description=tool_data.get("description"), 

1158 integration_type=tool_data.get("integration_type", "REST"), 

1159 request_type=tool_data.get("request_type", "GET"), 

1160 headers=tool_data.get("headers"), 

1161 input_schema=tool_data.get("input_schema"), 

1162 output_schema=tool_data.get("output_schema"), 

1163 annotations=tool_data.get("annotations"), 

1164 jsonpath_filter=tool_data.get("jsonpath_filter"), 

1165 auth=auth_info, 

1166 tags=tool_data.get("tags", []), 

1167 ) 

1168 

1169 def _convert_to_tool_update(self, tool_data: Dict[str, Any]) -> ToolUpdate: 

1170 """Convert import data to ToolUpdate schema. 

1171 

1172 Args: 

1173 tool_data: Tool data dictionary from import 

1174 

1175 Returns: 

1176 ToolUpdate schema object 

1177 """ 

1178 auth_info = None 

1179 if tool_data.get("auth_type") and tool_data.get("auth_value"): 

1180 auth_info = AuthenticationValues(auth_type=tool_data["auth_type"], auth_value=tool_data["auth_value"]) 

1181 

1182 return ToolUpdate( 

1183 name=tool_data.get("name"), 

1184 displayName=tool_data.get("displayName"), 

1185 url=tool_data.get("url"), 

1186 description=tool_data.get("description"), 

1187 integration_type=tool_data.get("integration_type"), 

1188 request_type=tool_data.get("request_type"), 

1189 headers=tool_data.get("headers"), 

1190 input_schema=tool_data.get("input_schema"), 

1191 output_schema=tool_data.get("output_schema"), 

1192 annotations=tool_data.get("annotations"), 

1193 jsonpath_filter=tool_data.get("jsonpath_filter"), 

1194 auth=auth_info, 

1195 tags=tool_data.get("tags"), 

1196 ) 

1197 

1198 def _convert_to_gateway_create(self, gateway_data: Dict[str, Any]) -> GatewayCreate: 

1199 """Convert import data to GatewayCreate schema. 

1200 

1201 Args: 

1202 gateway_data: Gateway data dictionary from import 

1203 

1204 Returns: 

1205 GatewayCreate schema object 

1206 """ 

1207 # Handle auth data 

1208 auth_kwargs = {} 

1209 if gateway_data.get("auth_type"): 

1210 auth_kwargs["auth_type"] = gateway_data["auth_type"] 

1211 

1212 # Handle query_param auth type (new in this version) 

1213 if gateway_data["auth_type"] == "query_param" and gateway_data.get("auth_query_params"): 

1214 try: 

1215 auth_query_params = gateway_data["auth_query_params"] 

1216 if auth_query_params: 1216 ↛ 1257line 1216 didn't jump to line 1257 because the condition on line 1216 was always true

1217 # Get the first key-value pair (schema supports single param) 

1218 param_key = next(iter(auth_query_params.keys())) 

1219 encrypted_value = auth_query_params[param_key] 

1220 # Decode the encrypted value - returns dict like {param_key: value} 

1221 decrypted_dict = decode_auth(encrypted_value) 

1222 # Extract the actual value from the dict 

1223 decrypted_value = decrypted_dict.get(param_key, "") if isinstance(decrypted_dict, dict) else str(decrypted_dict) 

1224 auth_kwargs["auth_query_param_key"] = param_key 

1225 auth_kwargs["auth_query_param_value"] = decrypted_value 

1226 logger.debug(f"Importing gateway with query_param auth, key: {param_key}") 

1227 except Exception as e: 

1228 logger.warning(f"Failed to decode query param auth for gateway: {str(e)}") 

1229 # Decode auth_value to get original credentials 

1230 elif gateway_data.get("auth_value"): 1230 ↛ 1257line 1230 didn't jump to line 1257 because the condition on line 1230 was always true

1231 try: 

1232 decoded_auth = decode_auth(gateway_data["auth_value"]) 

1233 if gateway_data["auth_type"] == "basic": 

1234 # Extract username and password from Basic auth 

1235 auth_header = decoded_auth.get("Authorization", "") 

1236 if auth_header.startswith("Basic "): 1236 ↛ 1257line 1236 didn't jump to line 1257 because the condition on line 1236 was always true

1237 creds = base64.b64decode(auth_header[6:]).decode("utf-8") 

1238 username, password = creds.split(":", 1) 

1239 auth_kwargs.update({"auth_username": username, "auth_password": password}) 

1240 elif gateway_data["auth_type"] == "bearer": 

1241 # Extract token from Bearer auth 

1242 auth_header = decoded_auth.get("Authorization", "") 

1243 if auth_header.startswith("Bearer "): 1243 ↛ 1257line 1243 didn't jump to line 1257 because the condition on line 1243 was always true

1244 auth_kwargs["auth_token"] = auth_header[7:] 

1245 elif gateway_data["auth_type"] == "authheaders": 1245 ↛ 1257line 1245 didn't jump to line 1257 because the condition on line 1245 was always true

1246 # Handle custom headers 

1247 if len(decoded_auth) == 1: 

1248 key, value = next(iter(decoded_auth.items())) 

1249 auth_kwargs.update({"auth_header_key": key, "auth_header_value": value}) 

1250 else: 

1251 # Multiple headers - use the new format 

1252 headers_list = [{"key": k, "value": v} for k, v in decoded_auth.items()] 

1253 auth_kwargs["auth_headers"] = headers_list 

1254 except Exception as e: 

1255 logger.warning(f"Failed to decode auth data for gateway: {str(e)}") 

1256 

1257 return GatewayCreate( 

1258 name=gateway_data["name"], 

1259 url=gateway_data["url"], 

1260 description=gateway_data.get("description"), 

1261 transport=gateway_data.get("transport", "SSE"), 

1262 passthrough_headers=gateway_data.get("passthrough_headers"), 

1263 tags=gateway_data.get("tags", []), 

1264 **auth_kwargs, 

1265 ) 

1266 

1267 def _convert_to_gateway_update(self, gateway_data: Dict[str, Any]) -> GatewayUpdate: 

1268 """Convert import data to GatewayUpdate schema. 

1269 

1270 Args: 

1271 gateway_data: Gateway data dictionary from import 

1272 

1273 Returns: 

1274 GatewayUpdate schema object 

1275 """ 

1276 # Similar to create but all fields optional 

1277 auth_kwargs = {} 

1278 if gateway_data.get("auth_type"): 

1279 auth_kwargs["auth_type"] = gateway_data["auth_type"] 

1280 

1281 # Handle query_param auth type (new in this version) 

1282 if gateway_data["auth_type"] == "query_param" and gateway_data.get("auth_query_params"): 

1283 try: 

1284 auth_query_params = gateway_data["auth_query_params"] 

1285 if auth_query_params: 1285 ↛ 1321line 1285 didn't jump to line 1321 because the condition on line 1285 was always true

1286 # Get the first key-value pair (schema supports single param) 

1287 param_key = next(iter(auth_query_params.keys())) 

1288 encrypted_value = auth_query_params[param_key] 

1289 # Decode the encrypted value - returns dict like {param_key: value} 

1290 decrypted_dict = decode_auth(encrypted_value) 

1291 # Extract the actual value from the dict 

1292 decrypted_value = decrypted_dict.get(param_key, "") if isinstance(decrypted_dict, dict) else str(decrypted_dict) 

1293 auth_kwargs["auth_query_param_key"] = param_key 

1294 auth_kwargs["auth_query_param_value"] = decrypted_value 

1295 logger.debug(f"Importing gateway update with query_param auth, key: {param_key}") 

1296 except Exception as e: 

1297 logger.warning(f"Failed to decode query param auth for gateway update: {str(e)}") 

1298 elif gateway_data.get("auth_value"): 1298 ↛ 1321line 1298 didn't jump to line 1321 because the condition on line 1298 was always true

1299 try: 

1300 decoded_auth = decode_auth(gateway_data["auth_value"]) 

1301 if gateway_data["auth_type"] == "basic": 

1302 auth_header = decoded_auth.get("Authorization", "") 

1303 if auth_header.startswith("Basic "): 1303 ↛ 1321line 1303 didn't jump to line 1321 because the condition on line 1303 was always true

1304 creds = base64.b64decode(auth_header[6:]).decode("utf-8") 

1305 username, password = creds.split(":", 1) 

1306 auth_kwargs.update({"auth_username": username, "auth_password": password}) 

1307 elif gateway_data["auth_type"] == "bearer": 

1308 auth_header = decoded_auth.get("Authorization", "") 

1309 if auth_header.startswith("Bearer "): 1309 ↛ 1321line 1309 didn't jump to line 1321 because the condition on line 1309 was always true

1310 auth_kwargs["auth_token"] = auth_header[7:] 

1311 elif gateway_data["auth_type"] == "authheaders": 1311 ↛ 1321line 1311 didn't jump to line 1321 because the condition on line 1311 was always true

1312 if len(decoded_auth) == 1: 

1313 key, value = next(iter(decoded_auth.items())) 

1314 auth_kwargs.update({"auth_header_key": key, "auth_header_value": value}) 

1315 else: 

1316 headers_list = [{"key": k, "value": v} for k, v in decoded_auth.items()] 

1317 auth_kwargs["auth_headers"] = headers_list 

1318 except Exception as e: 

1319 logger.warning(f"Failed to decode auth data for gateway update: {str(e)}") 

1320 

1321 return GatewayUpdate( 

1322 name=gateway_data.get("name"), 

1323 url=gateway_data.get("url"), 

1324 description=gateway_data.get("description"), 

1325 transport=gateway_data.get("transport"), 

1326 passthrough_headers=gateway_data.get("passthrough_headers"), 

1327 tags=gateway_data.get("tags"), 

1328 **auth_kwargs, 

1329 ) 

1330 

1331 async def _convert_to_server_create(self, db: Session, server_data: Dict[str, Any]) -> ServerCreate: 

1332 """Convert import data to ServerCreate schema, resolving tool references. 

1333 

1334 Args: 

1335 db: Database session 

1336 server_data: Server data dictionary from import 

1337 

1338 Returns: 

1339 ServerCreate schema object with resolved tool IDs 

1340 """ 

1341 # Resolve tool references (could be names or IDs) to current tool IDs 

1342 tool_references = server_data.get("tool_ids", []) or server_data.get("associated_tools", []) 

1343 resolved_tool_ids = [] 

1344 

1345 if tool_references: 

1346 # Get all tools to resolve references 

1347 all_tools, _ = await self.tool_service.list_tools(db, include_inactive=True) 

1348 

1349 for tool_ref in tool_references: 

1350 # Try to find tool by ID first, then by name 

1351 found_tool = None 

1352 

1353 # Try exact ID match 

1354 found_tool = next((t for t in all_tools if t.id == tool_ref), None) 

1355 

1356 # If not found, try by original_name or name 

1357 if not found_tool: 

1358 found_tool = next((t for t in all_tools if t.original_name == tool_ref), None) 

1359 

1360 if not found_tool: 

1361 found_tool = next((t for t in all_tools if hasattr(t, "name") and t.name == tool_ref), None) 

1362 

1363 if found_tool: 

1364 resolved_tool_ids.append(found_tool.id) 

1365 logger.debug(f"Resolved tool reference '{tool_ref}' to ID {found_tool.id}") 

1366 else: 

1367 logger.warning(f"Could not resolve tool reference: {tool_ref}") 

1368 # Don't include unresolvable references 

1369 

1370 return ServerCreate(name=server_data["name"], description=server_data.get("description"), associated_tools=resolved_tool_ids, tags=server_data.get("tags", [])) 

1371 

1372 async def _convert_to_server_update(self, db: Session, server_data: Dict[str, Any]) -> ServerUpdate: 

1373 """Convert import data to ServerUpdate schema, resolving tool references. 

1374 

1375 Args: 

1376 db: Database session 

1377 server_data: Server data dictionary from import 

1378 

1379 Returns: 

1380 ServerUpdate schema object with resolved tool IDs 

1381 """ 

1382 # Resolve tool references same as create method 

1383 tool_references = server_data.get("tool_ids", []) or server_data.get("associated_tools", []) 

1384 resolved_tool_ids = [] 

1385 

1386 if tool_references: 

1387 all_tools, _ = await self.tool_service.list_tools(db, include_inactive=True) 

1388 

1389 for tool_ref in tool_references: 

1390 found_tool = next((t for t in all_tools if t.id == tool_ref), None) 

1391 if not found_tool: 1391 ↛ 1393line 1391 didn't jump to line 1393 because the condition on line 1391 was always true

1392 found_tool = next((t for t in all_tools if t.original_name == tool_ref), None) 

1393 if not found_tool: 

1394 found_tool = next((t for t in all_tools if hasattr(t, "name") and t.name == tool_ref), None) 

1395 

1396 if found_tool: 

1397 resolved_tool_ids.append(found_tool.id) 

1398 else: 

1399 logger.warning(f"Could not resolve tool reference for update: {tool_ref}") 

1400 

1401 return ServerUpdate(name=server_data.get("name"), description=server_data.get("description"), associated_tools=resolved_tool_ids if resolved_tool_ids else None, tags=server_data.get("tags")) 

1402 

1403 def _convert_to_prompt_create(self, prompt_data: Dict[str, Any]) -> PromptCreate: 

1404 """Convert import data to PromptCreate schema. 

1405 

1406 Args: 

1407 prompt_data: Prompt data dictionary from import 

1408 

1409 Returns: 

1410 PromptCreate schema object 

1411 """ 

1412 # Convert input_schema back to arguments format 

1413 arguments = [] 

1414 input_schema = prompt_data.get("input_schema", {}) 

1415 if isinstance(input_schema, dict): 1415 ↛ 1422line 1415 didn't jump to line 1422 because the condition on line 1415 was always true

1416 properties = input_schema.get("properties", {}) 

1417 required_fields = input_schema.get("required", []) 

1418 

1419 for prop_name, prop_data in properties.items(): 

1420 arguments.append({"name": prop_name, "description": prop_data.get("description", ""), "required": prop_name in required_fields}) 

1421 

1422 original_name = prompt_data.get("original_name") or prompt_data["name"] 

1423 return PromptCreate( 

1424 name=original_name, 

1425 custom_name=prompt_data.get("custom_name"), 

1426 display_name=prompt_data.get("display_name"), 

1427 template=prompt_data["template"], 

1428 description=prompt_data.get("description"), 

1429 arguments=arguments, 

1430 tags=prompt_data.get("tags", []), 

1431 ) 

1432 

1433 def _convert_to_prompt_update(self, prompt_data: Dict[str, Any]) -> PromptUpdate: 

1434 """Convert import data to PromptUpdate schema. 

1435 

1436 Args: 

1437 prompt_data: Prompt data dictionary from import 

1438 

1439 Returns: 

1440 PromptUpdate schema object 

1441 """ 

1442 arguments = [] 

1443 input_schema = prompt_data.get("input_schema", {}) 

1444 if isinstance(input_schema, dict): 1444 ↛ 1451line 1444 didn't jump to line 1451 because the condition on line 1444 was always true

1445 properties = input_schema.get("properties", {}) 

1446 required_fields = input_schema.get("required", []) 

1447 

1448 for prop_name, prop_data in properties.items(): 

1449 arguments.append({"name": prop_name, "description": prop_data.get("description", ""), "required": prop_name in required_fields}) 

1450 

1451 original_name = prompt_data.get("original_name") or prompt_data.get("name") 

1452 return PromptUpdate( 

1453 name=original_name, 

1454 custom_name=prompt_data.get("custom_name"), 

1455 display_name=prompt_data.get("display_name"), 

1456 template=prompt_data.get("template"), 

1457 description=prompt_data.get("description"), 

1458 arguments=arguments if arguments else None, 

1459 tags=prompt_data.get("tags"), 

1460 ) 

1461 

1462 def _convert_to_resource_create(self, resource_data: Dict[str, Any]) -> ResourceCreate: 

1463 """Convert import data to ResourceCreate schema. 

1464 

1465 Args: 

1466 resource_data: Resource data dictionary from import 

1467 

1468 Returns: 

1469 ResourceCreate schema object 

1470 """ 

1471 return ResourceCreate( 

1472 uri=resource_data["uri"], 

1473 name=resource_data["name"], 

1474 description=resource_data.get("description"), 

1475 mime_type=resource_data.get("mime_type"), 

1476 content=resource_data.get("content", ""), # Default empty content 

1477 tags=resource_data.get("tags", []), 

1478 ) 

1479 

1480 def _convert_to_resource_update(self, resource_data: Dict[str, Any]) -> ResourceUpdate: 

1481 """Convert import data to ResourceUpdate schema. 

1482 

1483 Args: 

1484 resource_data: Resource data dictionary from import 

1485 

1486 Returns: 

1487 ResourceUpdate schema object 

1488 """ 

1489 return ResourceUpdate( 

1490 name=resource_data.get("name"), description=resource_data.get("description"), mime_type=resource_data.get("mime_type"), content=resource_data.get("content"), tags=resource_data.get("tags") 

1491 ) 

1492 

1493 def get_import_status(self, import_id: str) -> Optional[ImportStatus]: 

1494 """Get the status of an import operation. 

1495 

1496 Args: 

1497 import_id: Import operation ID 

1498 

1499 Returns: 

1500 Import status object or None if not found 

1501 """ 

1502 return self.active_imports.get(import_id) 

1503 

1504 def list_import_statuses(self) -> List[ImportStatus]: 

1505 """List all import statuses. 

1506 

1507 Returns: 

1508 List of all import status objects 

1509 """ 

1510 return list(self.active_imports.values()) 

1511 

1512 def cleanup_completed_imports(self, max_age_hours: int = 24) -> int: 

1513 """Clean up completed import statuses older than max_age_hours. 

1514 

1515 Args: 

1516 max_age_hours: Maximum age in hours for keeping completed imports 

1517 

1518 Returns: 

1519 Number of import statuses removed 

1520 """ 

1521 cutoff_time = datetime.now(timezone.utc) - timedelta(hours=max_age_hours) 

1522 removed = 0 

1523 

1524 to_remove = [] 

1525 for import_id, status in self.active_imports.items(): 

1526 if status.status in ["completed", "failed"] and status.completed_at and status.completed_at < cutoff_time: 1526 ↛ 1525line 1526 didn't jump to line 1525 because the condition on line 1526 was always true

1527 to_remove.append(import_id) 

1528 

1529 for import_id in to_remove: 

1530 del self.active_imports[import_id] 

1531 removed += 1 

1532 

1533 return removed 

1534 

1535 async def preview_import(self, db: Session, import_data: Dict[str, Any]) -> Dict[str, Any]: 

1536 """Preview import file to show what would be imported with smart categorization. 

1537 

1538 Args: 

1539 db: Database session 

1540 import_data: The validated import data 

1541 

1542 Returns: 

1543 Dictionary with categorized items for selective import UI 

1544 

1545 Examples: 

1546 >>> service = ImportService() 

1547 >>> # This would return a structure for the UI to build selection interface 

1548 """ 

1549 self.validate_import_data(import_data) 

1550 

1551 entities = import_data.get("entities", {}) 

1552 preview = { 

1553 "summary": {"total_items": sum(len(items) for items in entities.values()), "by_type": {entity_type: len(items) for entity_type, items in entities.items()}}, 

1554 "items": {}, 

1555 "bundles": {}, 

1556 "conflicts": {}, 

1557 "dependencies": {}, 

1558 } 

1559 

1560 # Categorize each entity type 

1561 for entity_type, entity_list in entities.items(): 

1562 preview["items"][entity_type] = [] 

1563 

1564 for entity in entity_list: 

1565 item_info = await self._analyze_import_item(db, entity_type, entity) 

1566 preview["items"][entity_type].append(item_info) 

1567 

1568 # Find gateway bundles (gateways + their tools/resources/prompts) 

1569 if "gateways" in entities: 1569 ↛ 1573line 1569 didn't jump to line 1573 because the condition on line 1569 was always true

1570 preview["bundles"] = self._find_gateway_bundles(entities) 

1571 

1572 # Find server dependencies 

1573 if "servers" in entities: 1573 ↛ 1577line 1573 didn't jump to line 1577 because the condition on line 1573 was always true

1574 preview["dependencies"] = self._find_server_dependencies(entities) 

1575 

1576 # Detect conflicts with existing items 

1577 preview["conflicts"] = await self._detect_import_conflicts(db, entities) 

1578 

1579 return preview 

1580 

1581 async def _analyze_import_item(self, db: Session, entity_type: str, entity: Dict[str, Any]) -> Dict[str, Any]: 

1582 """Analyze a single import item for the preview. 

1583 

1584 Args: 

1585 db: Database session 

1586 entity_type: Type of entity 

1587 entity: Entity data 

1588 

1589 Returns: 

1590 Item analysis with metadata for UI selection 

1591 """ 

1592 item_name = self._get_entity_identifier(entity_type, entity) 

1593 

1594 # Basic item info 

1595 item_info = { 

1596 "id": item_name, 

1597 "name": entity.get("name", item_name), 

1598 "type": entity_type, 

1599 "is_gateway_item": bool(entity.get("gateway_name") or entity.get("gateway_id")), 

1600 "is_custom": not bool(entity.get("gateway_name") or entity.get("gateway_id")), 

1601 "description": entity.get("description", ""), 

1602 } 

1603 

1604 # Check if it conflicts with existing items 

1605 try: 

1606 if entity_type == "tools": 

1607 existing, _ = await self.tool_service.list_tools(db) 

1608 item_info["conflicts_with"] = any(t.original_name == item_name for t in existing) 

1609 elif entity_type == "gateways": 

1610 existing, _ = await self.gateway_service.list_gateways(db) 

1611 item_info["conflicts_with"] = any(g.name == item_name for g in existing) 

1612 elif entity_type == "servers": 

1613 existing = await self.server_service.list_servers(db) 

1614 item_info["conflicts_with"] = any(s.name == item_name for s in existing) 

1615 elif entity_type == "prompts": 

1616 existing, _ = await self.prompt_service.list_prompts(db) 

1617 item_info["conflicts_with"] = any(p.name == item_name for p in existing) 

1618 elif entity_type == "resources": 

1619 existing, _ = await self.resource_service.list_resources(db) 

1620 item_info["conflicts_with"] = any(r.uri == item_name for r in existing) 

1621 else: 

1622 item_info["conflicts_with"] = False 

1623 except Exception: 

1624 item_info["conflicts_with"] = False 

1625 

1626 # Add metadata for smart selection 

1627 if entity_type == "servers": 

1628 item_info["dependencies"] = {"tools": entity.get("associated_tools", []), "resources": entity.get("associated_resources", []), "prompts": entity.get("associated_prompts", [])} 

1629 

1630 return item_info 

1631 

1632 def _find_gateway_bundles(self, entities: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]: 

1633 """Find gateway bundles (gateway + associated tools/resources/prompts). 

1634 

1635 Args: 

1636 entities: All entities from import data 

1637 

1638 Returns: 

1639 Gateway bundle information for UI 

1640 """ 

1641 bundles = {} 

1642 

1643 if "gateways" not in entities: 

1644 return bundles 

1645 

1646 for gateway in entities["gateways"]: 

1647 gateway_name = gateway.get("name", "") 

1648 bundle_items = {"tools": [], "resources": [], "prompts": []} 

1649 

1650 # Find items that belong to this gateway 

1651 for entity_type in ["tools", "resources", "prompts"]: 

1652 if entity_type in entities: 1652 ↛ 1651line 1652 didn't jump to line 1651 because the condition on line 1652 was always true

1653 for item in entities[entity_type]: 

1654 item_gateway = item.get("gateway_name") or item.get("gateway_id") 

1655 if item_gateway == gateway_name: 1655 ↛ 1653line 1655 didn't jump to line 1653 because the condition on line 1655 was always true

1656 item_name = self._get_entity_identifier(entity_type, item) 

1657 bundle_items[entity_type].append({"id": item_name, "name": item.get("name", item_name), "description": item.get("description", "")}) 

1658 

1659 if any(bundle_items.values()): # Only add if gateway has items 1659 ↛ 1646line 1659 didn't jump to line 1646 because the condition on line 1659 was always true

1660 bundles[gateway_name] = { 

1661 "gateway": {"name": gateway_name, "description": gateway.get("description", "")}, 

1662 "items": bundle_items, 

1663 "total_items": sum(len(items) for items in bundle_items.values()), 

1664 } 

1665 

1666 return bundles 

1667 

1668 def _find_server_dependencies(self, entities: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]: 

1669 """Find server dependencies for smart selection. 

1670 

1671 Args: 

1672 entities: All entities from import data 

1673 

1674 Returns: 

1675 Server dependency information for UI 

1676 """ 

1677 dependencies = {} 

1678 

1679 if "servers" not in entities: 

1680 return dependencies 

1681 

1682 for server in entities["servers"]: 

1683 server_name = server.get("name", "") 

1684 deps = {"tools": server.get("associated_tools", []), "resources": server.get("associated_resources", []), "prompts": server.get("associated_prompts", [])} 

1685 

1686 if any(deps.values()): # Only add if server has dependencies 1686 ↛ 1682line 1686 didn't jump to line 1682 because the condition on line 1686 was always true

1687 dependencies[server_name] = { 

1688 "server": {"name": server_name, "description": server.get("description", "")}, 

1689 "requires": deps, 

1690 "total_dependencies": sum(len(items) for items in deps.values()), 

1691 } 

1692 

1693 return dependencies 

1694 

1695 async def _detect_import_conflicts(self, db: Session, entities: Dict[str, List[Dict[str, Any]]]) -> Dict[str, List[Dict[str, Any]]]: 

1696 """Detect conflicts between import items and existing database items. 

1697 

1698 Args: 

1699 db: Database session 

1700 entities: Import entities 

1701 

1702 Returns: 

1703 Dictionary of conflicts by entity type 

1704 """ 

1705 conflicts = {} 

1706 

1707 try: 

1708 # Check tool conflicts 

1709 if "tools" in entities: 1709 ↛ 1723line 1709 didn't jump to line 1723 because the condition on line 1709 was always true

1710 existing_tools, _ = await self.tool_service.list_tools(db) 

1711 existing_names = {t.original_name for t in existing_tools} 

1712 

1713 tool_conflicts = [] 

1714 for tool in entities["tools"]: 

1715 tool_name = tool.get("name", "") 

1716 if tool_name in existing_names: 1716 ↛ 1714line 1716 didn't jump to line 1714 because the condition on line 1716 was always true

1717 tool_conflicts.append({"name": tool_name, "type": "name_conflict", "description": tool.get("description", "")}) 

1718 

1719 if tool_conflicts: 1719 ↛ 1723line 1719 didn't jump to line 1723 because the condition on line 1719 was always true

1720 conflicts["tools"] = tool_conflicts 

1721 

1722 # Check gateway conflicts 

1723 if "gateways" in entities: 1723 ↛ 1741line 1723 didn't jump to line 1741 because the condition on line 1723 was always true

1724 existing_gateways, _ = await self.gateway_service.list_gateways(db) 

1725 existing_names = {g.name for g in existing_gateways} 

1726 

1727 gateway_conflicts = [] 

1728 for gateway in entities["gateways"]: 

1729 gateway_name = gateway.get("name", "") 

1730 if gateway_name in existing_names: 1730 ↛ 1728line 1730 didn't jump to line 1728 because the condition on line 1730 was always true

1731 gateway_conflicts.append({"name": gateway_name, "type": "name_conflict", "description": gateway.get("description", "")}) 

1732 

1733 if gateway_conflicts: 1733 ↛ 1741line 1733 didn't jump to line 1741 because the condition on line 1733 was always true

1734 conflicts["gateways"] = gateway_conflicts 

1735 

1736 # Add other entity types as needed... 

1737 

1738 except Exception as e: 

1739 logger.warning(f"Could not detect all conflicts: {e}") 

1740 

1741 return conflicts 

1742 

1743 async def _get_user_context(self, db: Session, imported_by: str) -> Optional[Dict[str, Any]]: 

1744 """Get user context for import team assignment. 

1745 

1746 Args: 

1747 db: Database session 

1748 imported_by: Email of importing user 

1749 

1750 Returns: 

1751 User context dict or None if not found 

1752 """ 

1753 try: 

1754 user = db.query(EmailUser).filter(EmailUser.email == imported_by).first() 

1755 if not user: 

1756 logger.warning(f"Could not find importing user: {imported_by}") 

1757 return None 

1758 

1759 personal_team = user.get_personal_team() 

1760 if not personal_team: 

1761 logger.warning(f"User {imported_by} has no personal team") 

1762 return None 

1763 

1764 return {"user_email": user.email, "team_id": personal_team.id, "team_name": personal_team.name} 

1765 except Exception as e: 

1766 logger.error(f"Failed to get user context: {e}") 

1767 return None 

1768 

1769 def _add_multitenancy_context(self, entity_data: Dict[str, Any], user_context: Dict[str, Any]) -> Dict[str, Any]: 

1770 """Add team and visibility context to entity data for import. 

1771 

1772 Args: 

1773 entity_data: Original entity data 

1774 user_context: User context with team information 

1775 

1776 Returns: 

1777 Entity data enhanced with multitenancy fields 

1778 """ 

1779 # Create copy to avoid modifying original 

1780 enhanced_data = dict(entity_data) 

1781 

1782 # Add team assignment (assign to importing user's personal team) 

1783 if not enhanced_data.get("team_id"): 1783 ↛ 1786line 1783 didn't jump to line 1786 because the condition on line 1783 was always true

1784 enhanced_data["team_id"] = user_context["team_id"] 

1785 

1786 if not enhanced_data.get("owner_email"): 1786 ↛ 1791line 1786 didn't jump to line 1791 because the condition on line 1786 was always true

1787 enhanced_data["owner_email"] = user_context["user_email"] 

1788 

1789 # Set visibility: use export value if present, otherwise default to 'public' 

1790 # This supports pre-0.7.0 exports that don't have visibility field 

1791 if not enhanced_data.get("visibility"): 1791 ↛ 1795line 1791 didn't jump to line 1795 because the condition on line 1791 was always true

1792 enhanced_data["visibility"] = "public" # Default to public for backward compatibility 

1793 

1794 # Add import tracking 

1795 if not enhanced_data.get("federation_source"): 1795 ↛ 1798line 1795 didn't jump to line 1798 because the condition on line 1795 was always true

1796 enhanced_data["federation_source"] = f"imported-by-{user_context['user_email']}" 

1797 

1798 logger.debug(f"Enhanced entity with multitenancy: team_id={enhanced_data['team_id']}, visibility={enhanced_data['visibility']}") 

1799 return enhanced_data 

1800 

1801 async def _assign_imported_items_to_team(self, db: Session, imported_by: str) -> None: 

1802 """Assign imported items without team assignment to the importer's personal team. 

1803 

1804 Args: 

1805 db: Database session 

1806 imported_by: Email of user who performed the import 

1807 """ 

1808 try: 

1809 # Find the importing user and their personal team 

1810 user = db.query(EmailUser).filter(EmailUser.email == imported_by).first() 

1811 if not user: 

1812 logger.warning(f"Could not find importing user {imported_by} - skipping team assignment") 

1813 return 

1814 

1815 personal_team = user.get_personal_team() 

1816 if not personal_team: 

1817 logger.warning(f"User {imported_by} has no personal team - skipping team assignment") 

1818 return 

1819 

1820 logger.info(f"Assigning orphaned imported items to {imported_by}'s team: {personal_team.name}") 

1821 

1822 # Resource types to check 

1823 resource_types = [("servers", Server), ("tools", Tool), ("resources", Resource), ("prompts", Prompt), ("gateways", Gateway), ("a2a_agents", A2AAgent)] 

1824 

1825 total_assigned = 0 

1826 

1827 for resource_name, resource_model in resource_types: 

1828 try: 

1829 # Find items without team assignment (recently imported) 

1830 unassigned = db.query(resource_model).filter((resource_model.team_id.is_(None)) | (resource_model.owner_email.is_(None))).all() 

1831 

1832 if unassigned: 

1833 logger.info(f"Assigning {len(unassigned)} orphaned {resource_name} to user team") 

1834 

1835 for item in unassigned: 

1836 item.team_id = personal_team.id 

1837 item.owner_email = user.email 

1838 # Set imported items to public for better visibility 

1839 item.visibility = "public" 

1840 if hasattr(item, "federation_source") and not item.federation_source: 1840 ↛ 1835line 1840 didn't jump to line 1835 because the condition on line 1840 was always true

1841 item.federation_source = f"imported-by-{imported_by}" 

1842 

1843 total_assigned += len(unassigned) 

1844 

1845 except Exception as e: 

1846 logger.error(f"Failed to assign {resource_name} to team: {e}") 

1847 continue 

1848 

1849 if total_assigned > 0: 

1850 db.commit() 

1851 logger.info(f"Assigned {total_assigned} imported items to {personal_team.name} with public visibility") 

1852 else: 

1853 logger.debug("No orphaned imported items found") 

1854 

1855 except Exception as e: 

1856 logger.error(f"Failed to assign imported items to team: {e}") 

1857 # Don't fail the import for team assignment issues