Coverage for mcpgateway / services / catalog_service.py: 99%

265 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""MCP Server Catalog Service. 

3 

4This service manages the catalog of available MCP servers that can be 

5easily registered with one-click from the admin UI. 

6""" 

7 

8# Standard 

9import asyncio 

10from datetime import datetime, timezone 

11import logging 

12from pathlib import Path 

13import time 

14from typing import Any, Dict, Optional 

15 

16# Third-Party 

17from sqlalchemy import select 

18from sqlalchemy.orm import Session 

19import yaml 

20 

21# First-Party 

22from mcpgateway.config import settings 

23from mcpgateway.schemas import ( 

24 CatalogBulkRegisterRequest, 

25 CatalogBulkRegisterResponse, 

26 CatalogListRequest, 

27 CatalogListResponse, 

28 CatalogServer, 

29 CatalogServerRegisterRequest, 

30 CatalogServerRegisterResponse, 

31 CatalogServerStatusResponse, 

32) 

33from mcpgateway.services.gateway_service import GatewayService 

34from mcpgateway.utils.create_slug import slugify 

35from mcpgateway.validation.tags import validate_tags_field 

36 

37logger = logging.getLogger(__name__) 

38 

39 

40class CatalogService: 

41 """Service for managing MCP server catalog.""" 

42 

43 def __init__(self): 

44 """Initialize the catalog service.""" 

45 self._catalog_cache: Optional[Dict[str, Any]] = None 

46 self._cache_timestamp: float = 0 

47 self._gateway_service = GatewayService() 

48 

49 async def load_catalog(self, force_reload: bool = False) -> Dict[str, Any]: 

50 """Load catalog from YAML file. 

51 

52 Args: 

53 force_reload: Force reload even if cache is valid 

54 

55 Returns: 

56 Catalog data dictionary 

57 """ 

58 # Check cache validity 

59 cache_age = time.time() - self._cache_timestamp 

60 if not force_reload and self._catalog_cache and cache_age < settings.mcpgateway_catalog_cache_ttl: 

61 return self._catalog_cache 

62 

63 try: 

64 catalog_path = Path(settings.mcpgateway_catalog_file) 

65 

66 # Try multiple locations for the catalog file 

67 if not catalog_path.is_absolute(): 67 ↛ 73line 67 didn't jump to line 73 because the condition on line 67 was always true

68 # Try current directory first 

69 if not catalog_path.exists(): 

70 # Try project root 

71 catalog_path = Path(__file__).parent.parent.parent / settings.mcpgateway_catalog_file 

72 

73 if not catalog_path.exists(): 

74 logger.warning(f"Catalog file not found: {catalog_path}") 

75 return {"catalog_servers": [], "categories": [], "auth_types": []} 

76 

77 content = await asyncio.to_thread(catalog_path.read_text, encoding="utf-8") 

78 catalog_data = yaml.safe_load(content) 

79 

80 # Update cache 

81 self._catalog_cache = catalog_data 

82 self._cache_timestamp = time.time() 

83 

84 logger.info(f"Loaded {len(catalog_data.get('catalog_servers', []))} servers from catalog") 

85 return catalog_data 

86 

87 except Exception as e: 

88 logger.error(f"Failed to load catalog: {e}") 

89 return {"catalog_servers": [], "categories": [], "auth_types": []} 

90 

91 def _get_registry_cache(self): 

92 """Get registry cache instance lazily. 

93 

94 Returns: 

95 RegistryCache instance or None if unavailable. 

96 """ 

97 try: 

98 # First-Party 

99 from mcpgateway.cache.registry_cache import get_registry_cache # pylint: disable=import-outside-toplevel 

100 

101 return get_registry_cache() 

102 except ImportError: 

103 return None 

104 

105 async def get_catalog_servers(self, request: CatalogListRequest, db) -> CatalogListResponse: 

106 """Get filtered list of catalog servers. 

107 

108 Args: 

109 request: Filter criteria 

110 db: Database session 

111 

112 Returns: 

113 Filtered catalog servers response 

114 """ 

115 # Check cache first 

116 cache = self._get_registry_cache() 

117 if cache: 

118 filters_hash = cache.hash_filters( 

119 category=request.category, 

120 auth_type=request.auth_type, 

121 provider=request.provider, 

122 search=request.search, 

123 tags=sorted(request.tags) if request.tags else None, 

124 show_registered_only=request.show_registered_only, 

125 show_available_only=request.show_available_only, 

126 offset=request.offset, 

127 limit=request.limit, 

128 ) 

129 cached = await cache.get("catalog", filters_hash) 

130 if cached is not None: 

131 return CatalogListResponse.model_validate(cached) 

132 

133 catalog_data = await self.load_catalog() 

134 servers = catalog_data.get("catalog_servers", []) 

135 

136 # Check which servers are already registered 

137 registered_urls = set() 

138 if servers: 

139 try: 

140 # Ensure we're using the correct Gateway model 

141 # First-Party 

142 from mcpgateway.db import Gateway as DbGateway # pylint: disable=import-outside-toplevel 

143 

144 # Query all gateways (enabled and disabled) to properly track registration status 

145 # Include auth_type and oauth_config to distinguish OAuth servers needing setup 

146 # from OAuth servers that were manually disabled after configuration 

147 stmt = select(DbGateway.url, DbGateway.enabled, DbGateway.auth_type, DbGateway.oauth_config) 

148 result = db.execute(stmt) 

149 registered_urls = set() 

150 oauth_disabled_urls = set() 

151 for row in result: 

152 url, enabled, auth_type, oauth_config = row 

153 registered_urls.add(url) 

154 # Only mark as requiring OAuth config if: 

155 # - disabled AND OAuth auth_type AND oauth_config is empty/None 

156 # This distinguishes unconfigured OAuth servers from manually disabled ones 

157 if not enabled and auth_type == "oauth" and not oauth_config: 

158 oauth_disabled_urls.add(url) 

159 except Exception as e: 

160 logger.warning(f"Failed to check registered servers: {e}") 

161 # Continue without marking registered servers 

162 registered_urls = set() 

163 oauth_disabled_urls = set() 

164 

165 # Convert to CatalogServer objects and mark registered ones 

166 catalog_servers = [] 

167 for server_data in servers: 

168 server = CatalogServer(**server_data) 

169 server.is_registered = server.url in registered_urls 

170 # Mark servers that are registered but disabled due to OAuth config needed 

171 server.requires_oauth_config = server.url in oauth_disabled_urls 

172 # Set availability based on registration status (registered servers are assumed available) 

173 # Individual health checks can be done via the /status endpoint 

174 server.is_available = server.is_registered or server_data.get("is_available", True) 

175 catalog_servers.append(server) 

176 

177 # Apply filters 

178 filtered = catalog_servers 

179 

180 if request.category: 

181 filtered = [s for s in filtered if s.category == request.category] 

182 

183 if request.auth_type: 

184 filtered = [s for s in filtered if s.auth_type == request.auth_type] 

185 

186 if request.provider: 

187 filtered = [s for s in filtered if s.provider == request.provider] 

188 

189 if request.search: 

190 search_lower = request.search.lower() 

191 filtered = [s for s in filtered if search_lower in s.name.lower() or search_lower in s.description.lower()] 

192 

193 if request.tags: 

194 filtered = [s for s in filtered if any(tag in s.tags for tag in request.tags)] 

195 

196 if request.show_registered_only: 

197 filtered = [s for s in filtered if s.is_registered] 

198 

199 if request.show_available_only: 

200 filtered = [s for s in filtered if s.is_available] 

201 

202 # Pagination 

203 total = len(filtered) 

204 start = request.offset 

205 end = start + request.limit 

206 paginated = filtered[start:end] 

207 

208 # Collect unique values for filters 

209 all_categories = sorted(set(s.category for s in catalog_servers)) 

210 all_auth_types = sorted(set(s.auth_type for s in catalog_servers)) 

211 all_providers = sorted(set(s.provider for s in catalog_servers)) 

212 all_tags = sorted(set(tag for s in catalog_servers for tag in s.tags)) 

213 

214 response = CatalogListResponse(servers=paginated, total=total, categories=all_categories, auth_types=all_auth_types, providers=all_providers, all_tags=all_tags) 

215 

216 # Store in cache 

217 if cache: 

218 try: 

219 cache_data = response.model_dump(mode="json") 

220 await cache.set("catalog", cache_data, filters_hash) 

221 except Exception as e: 

222 logger.debug(f"Failed to cache catalog response: {e}") 

223 

224 return response 

225 

226 async def register_catalog_server(self, catalog_id: str, request: Optional[CatalogServerRegisterRequest], db: Session) -> CatalogServerRegisterResponse: 

227 """Register a catalog server as a gateway. 

228 

229 Args: 

230 catalog_id: Catalog server ID 

231 request: Registration request with optional overrides 

232 db: Database session 

233 

234 Returns: 

235 Registration response 

236 """ 

237 try: 

238 # Load catalog to find the server 

239 catalog_data = await self.load_catalog() 

240 servers = catalog_data.get("catalog_servers", []) 

241 

242 # Find the server in catalog 

243 server_data = None 

244 for s in servers: 

245 if s.get("id") == catalog_id: 

246 server_data = s 

247 break 

248 

249 if not server_data: 

250 return CatalogServerRegisterResponse(success=False, server_id="", message="Server not found in catalog", error="Invalid catalog server ID") 

251 

252 # Check if already registered 

253 try: 

254 # First-Party 

255 from mcpgateway.db import Gateway as DbGateway # pylint: disable=import-outside-toplevel 

256 

257 stmt = select(DbGateway).where(DbGateway.url == server_data["url"]) 

258 result = db.execute(stmt) 

259 existing = result.scalar_one_or_none() 

260 except Exception as e: 

261 logger.warning(f"Error checking existing registration: {e}") 

262 existing = None 

263 

264 if existing: 

265 return CatalogServerRegisterResponse(success=False, server_id=str(existing.id), message="Server already registered", error="This server is already registered in the system") 

266 

267 # Prepare gateway creation request using proper schema 

268 # First-Party 

269 from mcpgateway.schemas import GatewayCreate # pylint: disable=import-outside-toplevel 

270 

271 # Use explicit transport if provided, otherwise auto-detect from URL 

272 transport = server_data.get("transport") 

273 if not transport: 

274 # Detect transport type from URL or use SSE as default 

275 url = server_data["url"].lower() 

276 # Check for WebSocket patterns (highest priority) 

277 if url.startswith("ws://") or url.startswith("wss://"): 

278 transport = "WEBSOCKET" # WebSocket transport for ws:// and wss:// URLs 

279 # Check for SSE patterns 

280 elif url.endswith("/sse") or "/sse/" in url: 

281 transport = "SSE" # SSE endpoints or paths containing /sse/ 

282 # Then check for HTTP patterns 

283 elif "/mcp" in url or url.endswith("/"): 

284 transport = "STREAMABLEHTTP" # Generic MCP endpoints typically use HTTP 

285 else: 

286 transport = "SSE" # Default to SSE for most catalog servers 

287 

288 # Check for IPv6 URLs early to provide a clear error message 

289 url = server_data["url"] 

290 if "[" in url or "]" in url: 

291 return CatalogServerRegisterResponse( 

292 success=False, server_id="", message="Registration failed", error="IPv6 URLs are not currently supported for security reasons. Please use IPv4 or domain names." 

293 ) 

294 

295 # Prepare the gateway creation data 

296 gateway_data = { 

297 "name": request.name if request and request.name else server_data["name"], 

298 "url": server_data["url"], 

299 "description": server_data["description"], 

300 "transport": transport, 

301 "tags": server_data.get("tags", []), 

302 } 

303 

304 # Set authentication based on server requirements 

305 auth_type = server_data.get("auth_type", "Open") 

306 skip_initialization = False # Flag to skip connection test for OAuth servers without creds 

307 

308 if request and request.api_key and auth_type != "Open": 

309 # Handle all possible auth types from the catalog 

310 if auth_type in ["API Key", "API"]: 

311 # Use bearer token for API key authentication 

312 gateway_data["auth_type"] = "bearer" 

313 gateway_data["auth_token"] = request.api_key 

314 elif auth_type in ["OAuth2.1", "OAuth", "OAuth2.1 & API Key"]: 

315 # OAuth servers and mixed auth may need API key as a bearer token 

316 gateway_data["auth_type"] = "bearer" 

317 gateway_data["auth_token"] = request.api_key 

318 else: 

319 # For any other auth types, use custom headers (as list of dicts) 

320 gateway_data["auth_type"] = "authheaders" 

321 gateway_data["auth_headers"] = [{"key": "X-API-Key", "value": request.api_key}] 

322 elif auth_type in ["OAuth2.1", "OAuth"]: 

323 # OAuth server without credentials - register but skip initialization 

324 # User will need to complete OAuth flow later 

325 skip_initialization = True 

326 logger.info(f"Registering OAuth server {server_data['name']} without credentials - OAuth flow required later") 

327 

328 # For OAuth servers without credentials, register directly without connection test 

329 if skip_initialization: 

330 # Create minimal gateway entry without tool discovery 

331 # First-Party 

332 from mcpgateway.db import Gateway as DbGateway # pylint: disable=import-outside-toplevel 

333 

334 gateway_create = GatewayCreate(**gateway_data) 

335 slug_name = slugify(gateway_data["name"]) 

336 

337 db_gateway = DbGateway( 

338 name=gateway_data["name"], 

339 slug=slug_name, 

340 url=gateway_data["url"], 

341 description=gateway_data["description"], 

342 tags=gateway_data.get("tags", []), 

343 transport=gateway_data["transport"], 

344 capabilities={}, 

345 auth_type="oauth", # Mark as OAuth so it can be identified after page refresh 

346 enabled=False, # Disabled until OAuth is configured 

347 created_via="catalog", 

348 visibility="public", 

349 version=1, 

350 ) 

351 

352 db.add(db_gateway) 

353 db.commit() 

354 db.refresh(db_gateway) 

355 

356 # First-Party 

357 from mcpgateway.schemas import GatewayRead # pylint: disable=import-outside-toplevel 

358 

359 # Build dict for GatewayRead validation with converted tags 

360 # This avoids mutating the database object 

361 # Handle both legacy List[str] and new List[Dict[str, str]] tag formats 

362 if db_gateway.tags: 

363 if isinstance(db_gateway.tags[0], str): 

364 # Legacy format: convert to dict format 

365 tags_for_read = validate_tags_field(db_gateway.tags) 

366 else: 

367 # Already in dict format, pass through 

368 tags_for_read = db_gateway.tags 

369 else: 

370 tags_for_read = [] 

371 

372 gateway_dict = { 

373 "id": db_gateway.id, 

374 "name": db_gateway.name, 

375 "slug": db_gateway.slug, 

376 "url": db_gateway.url, 

377 "description": db_gateway.description, 

378 "tags": tags_for_read, 

379 "transport": db_gateway.transport, 

380 "capabilities": db_gateway.capabilities, 

381 "created_at": db_gateway.created_at, 

382 "updated_at": db_gateway.updated_at, 

383 "enabled": db_gateway.enabled, 

384 "reachable": db_gateway.reachable, 

385 "last_seen": db_gateway.last_seen, 

386 "auth_type": db_gateway.auth_type, 

387 "visibility": db_gateway.visibility, 

388 "version": db_gateway.version, 

389 "team_id": db_gateway.team_id, 

390 "owner_email": db_gateway.owner_email, 

391 } 

392 

393 gateway_read = GatewayRead.model_validate(gateway_dict) 

394 

395 # Invalidate catalog cache since registration status changed 

396 cache = self._get_registry_cache() 

397 if cache: 

398 await cache.invalidate_catalog() 

399 

400 return CatalogServerRegisterResponse( 

401 success=True, 

402 server_id=str(gateway_read.id), 

403 message=f"Successfully registered {gateway_read.name} - OAuth configuration required before activation", 

404 error=None, 

405 oauth_required=True, 

406 ) 

407 

408 gateway_create = GatewayCreate(**gateway_data) 

409 

410 # Use the proper gateway registration method which will discover tools 

411 gateway_read = await self._gateway_service.register_gateway( 

412 db=db, 

413 gateway=gateway_create, 

414 created_via="catalog", 

415 visibility="public", # Catalog servers should be public 

416 initialize_timeout=settings.httpx_admin_read_timeout, 

417 ) 

418 

419 logger.info(f"Registered catalog server: {gateway_read.name} ({catalog_id})") 

420 

421 # Query for tools discovered from this gateway 

422 # First-Party 

423 from mcpgateway.db import Tool as DbTool # pylint: disable=import-outside-toplevel 

424 

425 tool_count = 0 

426 if gateway_read.id: 

427 stmt = select(DbTool).where(DbTool.gateway_id == gateway_read.id) 

428 result = db.execute(stmt) 

429 tools = result.scalars().all() 

430 tool_count = len(tools) 

431 

432 message = f"Successfully registered {gateway_read.name}" 

433 if tool_count > 0: 

434 message += f" with {tool_count} tools discovered" 

435 

436 # Invalidate catalog cache since registration status changed 

437 cache = self._get_registry_cache() 

438 if cache: 

439 await cache.invalidate_catalog() 

440 

441 return CatalogServerRegisterResponse(success=True, server_id=str(gateway_read.id), message=message, error=None) 

442 

443 except Exception as e: 

444 logger.error(f"Failed to register catalog server {catalog_id}: {e}") 

445 

446 # Map common exceptions to user-friendly messages 

447 error_str = str(e) 

448 user_message = "Registration failed" 

449 

450 if "Connection refused" in error_str or "connect" in error_str.lower(): 

451 user_message = "Server is offline or unreachable" 

452 elif "SSL" in error_str or "certificate" in error_str.lower(): 

453 user_message = "SSL certificate verification failed - check server security settings" 

454 elif "timeout" in error_str.lower() or "timed out" in error_str.lower(): 

455 user_message = "Server took too long to respond - it may be slow or unavailable" 

456 elif "401" in error_str or "Unauthorized" in error_str: 

457 user_message = "Authentication failed - check API key or OAuth credentials" 

458 elif "403" in error_str or "Forbidden" in error_str: 

459 user_message = "Access forbidden - check permissions and API key" 

460 elif "404" in error_str or "Not Found" in error_str: 

461 user_message = "Server endpoint not found - check URL is correct" 

462 elif "500" in error_str or "Internal Server Error" in error_str: 

463 user_message = "Remote server error - the MCP server is experiencing issues" 

464 elif "IPv6" in error_str: 

465 user_message = "IPv6 URLs are not supported - please use IPv4 or domain names" 

466 

467 # Don't rollback here - let FastAPI handle it 

468 # db.rollback() 

469 return CatalogServerRegisterResponse(success=False, server_id="", message=user_message, error=error_str) 

470 

471 async def check_server_availability(self, catalog_id: str) -> CatalogServerStatusResponse: 

472 """Check if a catalog server is available. 

473 

474 Args: 

475 catalog_id: Catalog server ID 

476 

477 Returns: 

478 Server status response 

479 """ 

480 try: 

481 # Load catalog to find the server 

482 catalog_data = await self.load_catalog() 

483 servers = catalog_data.get("catalog_servers", []) 

484 

485 # Find the server in catalog 

486 server_data = None 

487 for s in servers: 

488 if s.get("id") == catalog_id: 

489 server_data = s 

490 break 

491 

492 if not server_data: 

493 return CatalogServerStatusResponse(server_id=catalog_id, is_available=False, is_registered=False, error="Server not found in catalog") 

494 

495 # Check if registered (we'll need db passed in for this) 

496 is_registered = False 

497 

498 # Perform health check 

499 start_time = time.time() 

500 is_available = False 

501 error = None 

502 

503 try: 

504 # First-Party 

505 from mcpgateway.services.http_client_service import get_http_client # pylint: disable=import-outside-toplevel 

506 

507 client = await get_http_client() 

508 # Try a simple GET request with short timeout 

509 response = await client.get(server_data["url"], timeout=5.0, follow_redirects=True) 

510 is_available = response.status_code < 500 

511 except Exception as e: 

512 error = str(e) 

513 is_available = False 

514 

515 response_time_ms = (time.time() - start_time) * 1000 

516 

517 return CatalogServerStatusResponse( 

518 server_id=catalog_id, is_available=is_available, is_registered=is_registered, last_checked=datetime.now(timezone.utc), response_time_ms=response_time_ms, error=error 

519 ) 

520 

521 except Exception as e: 

522 logger.error(f"Failed to check server status for {catalog_id}: {e}") 

523 return CatalogServerStatusResponse(server_id=catalog_id, is_available=False, is_registered=False, error=str(e)) 

524 

525 async def bulk_register_servers(self, request: CatalogBulkRegisterRequest, db: Session) -> CatalogBulkRegisterResponse: 

526 """Register multiple catalog servers. 

527 

528 Args: 

529 request: Bulk registration request 

530 db: Database session 

531 

532 Returns: 

533 Bulk registration response 

534 """ 

535 successful = [] 

536 failed = [] 

537 

538 for server_id in request.server_ids: 

539 try: 

540 response = await self.register_catalog_server(catalog_id=server_id, request=None, db=db) 

541 

542 if response.success: 

543 successful.append(server_id) 

544 else: 

545 failed.append({"server_id": server_id, "error": response.error or "Registration failed"}) 

546 

547 if not request.skip_errors: 

548 break 

549 

550 except Exception as e: 

551 failed.append({"server_id": server_id, "error": str(e)}) 

552 

553 if not request.skip_errors: 

554 break 

555 

556 return CatalogBulkRegisterResponse(successful=successful, failed=failed, total_attempted=len(request.server_ids), total_successful=len(successful)) 

557 

558 

559# Global instance 

560catalog_service = CatalogService()