Coverage for mcpgateway / services / catalog_service.py: 99%
265 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""MCP Server Catalog Service.
4This service manages the catalog of available MCP servers that can be
5easily registered with one-click from the admin UI.
6"""
8# Standard
9import asyncio
10from datetime import datetime, timezone
11import logging
12from pathlib import Path
13import time
14from typing import Any, Dict, Optional
16# Third-Party
17from sqlalchemy import select
18from sqlalchemy.orm import Session
19import yaml
21# First-Party
22from mcpgateway.config import settings
23from mcpgateway.schemas import (
24 CatalogBulkRegisterRequest,
25 CatalogBulkRegisterResponse,
26 CatalogListRequest,
27 CatalogListResponse,
28 CatalogServer,
29 CatalogServerRegisterRequest,
30 CatalogServerRegisterResponse,
31 CatalogServerStatusResponse,
32)
33from mcpgateway.services.gateway_service import GatewayService
34from mcpgateway.utils.create_slug import slugify
35from mcpgateway.validation.tags import validate_tags_field
37logger = logging.getLogger(__name__)
40class CatalogService:
41 """Service for managing MCP server catalog."""
43 def __init__(self):
44 """Initialize the catalog service."""
45 self._catalog_cache: Optional[Dict[str, Any]] = None
46 self._cache_timestamp: float = 0
47 self._gateway_service = GatewayService()
49 async def load_catalog(self, force_reload: bool = False) -> Dict[str, Any]:
50 """Load catalog from YAML file.
52 Args:
53 force_reload: Force reload even if cache is valid
55 Returns:
56 Catalog data dictionary
57 """
58 # Check cache validity
59 cache_age = time.time() - self._cache_timestamp
60 if not force_reload and self._catalog_cache and cache_age < settings.mcpgateway_catalog_cache_ttl:
61 return self._catalog_cache
63 try:
64 catalog_path = Path(settings.mcpgateway_catalog_file)
66 # Try multiple locations for the catalog file
67 if not catalog_path.is_absolute(): 67 ↛ 73line 67 didn't jump to line 73 because the condition on line 67 was always true
68 # Try current directory first
69 if not catalog_path.exists():
70 # Try project root
71 catalog_path = Path(__file__).parent.parent.parent / settings.mcpgateway_catalog_file
73 if not catalog_path.exists():
74 logger.warning(f"Catalog file not found: {catalog_path}")
75 return {"catalog_servers": [], "categories": [], "auth_types": []}
77 content = await asyncio.to_thread(catalog_path.read_text, encoding="utf-8")
78 catalog_data = yaml.safe_load(content)
80 # Update cache
81 self._catalog_cache = catalog_data
82 self._cache_timestamp = time.time()
84 logger.info(f"Loaded {len(catalog_data.get('catalog_servers', []))} servers from catalog")
85 return catalog_data
87 except Exception as e:
88 logger.error(f"Failed to load catalog: {e}")
89 return {"catalog_servers": [], "categories": [], "auth_types": []}
91 def _get_registry_cache(self):
92 """Get registry cache instance lazily.
94 Returns:
95 RegistryCache instance or None if unavailable.
96 """
97 try:
98 # First-Party
99 from mcpgateway.cache.registry_cache import get_registry_cache # pylint: disable=import-outside-toplevel
101 return get_registry_cache()
102 except ImportError:
103 return None
105 async def get_catalog_servers(self, request: CatalogListRequest, db) -> CatalogListResponse:
106 """Get filtered list of catalog servers.
108 Args:
109 request: Filter criteria
110 db: Database session
112 Returns:
113 Filtered catalog servers response
114 """
115 # Check cache first
116 cache = self._get_registry_cache()
117 if cache:
118 filters_hash = cache.hash_filters(
119 category=request.category,
120 auth_type=request.auth_type,
121 provider=request.provider,
122 search=request.search,
123 tags=sorted(request.tags) if request.tags else None,
124 show_registered_only=request.show_registered_only,
125 show_available_only=request.show_available_only,
126 offset=request.offset,
127 limit=request.limit,
128 )
129 cached = await cache.get("catalog", filters_hash)
130 if cached is not None:
131 return CatalogListResponse.model_validate(cached)
133 catalog_data = await self.load_catalog()
134 servers = catalog_data.get("catalog_servers", [])
136 # Check which servers are already registered
137 registered_urls = set()
138 if servers:
139 try:
140 # Ensure we're using the correct Gateway model
141 # First-Party
142 from mcpgateway.db import Gateway as DbGateway # pylint: disable=import-outside-toplevel
144 # Query all gateways (enabled and disabled) to properly track registration status
145 # Include auth_type and oauth_config to distinguish OAuth servers needing setup
146 # from OAuth servers that were manually disabled after configuration
147 stmt = select(DbGateway.url, DbGateway.enabled, DbGateway.auth_type, DbGateway.oauth_config)
148 result = db.execute(stmt)
149 registered_urls = set()
150 oauth_disabled_urls = set()
151 for row in result:
152 url, enabled, auth_type, oauth_config = row
153 registered_urls.add(url)
154 # Only mark as requiring OAuth config if:
155 # - disabled AND OAuth auth_type AND oauth_config is empty/None
156 # This distinguishes unconfigured OAuth servers from manually disabled ones
157 if not enabled and auth_type == "oauth" and not oauth_config:
158 oauth_disabled_urls.add(url)
159 except Exception as e:
160 logger.warning(f"Failed to check registered servers: {e}")
161 # Continue without marking registered servers
162 registered_urls = set()
163 oauth_disabled_urls = set()
165 # Convert to CatalogServer objects and mark registered ones
166 catalog_servers = []
167 for server_data in servers:
168 server = CatalogServer(**server_data)
169 server.is_registered = server.url in registered_urls
170 # Mark servers that are registered but disabled due to OAuth config needed
171 server.requires_oauth_config = server.url in oauth_disabled_urls
172 # Set availability based on registration status (registered servers are assumed available)
173 # Individual health checks can be done via the /status endpoint
174 server.is_available = server.is_registered or server_data.get("is_available", True)
175 catalog_servers.append(server)
177 # Apply filters
178 filtered = catalog_servers
180 if request.category:
181 filtered = [s for s in filtered if s.category == request.category]
183 if request.auth_type:
184 filtered = [s for s in filtered if s.auth_type == request.auth_type]
186 if request.provider:
187 filtered = [s for s in filtered if s.provider == request.provider]
189 if request.search:
190 search_lower = request.search.lower()
191 filtered = [s for s in filtered if search_lower in s.name.lower() or search_lower in s.description.lower()]
193 if request.tags:
194 filtered = [s for s in filtered if any(tag in s.tags for tag in request.tags)]
196 if request.show_registered_only:
197 filtered = [s for s in filtered if s.is_registered]
199 if request.show_available_only:
200 filtered = [s for s in filtered if s.is_available]
202 # Pagination
203 total = len(filtered)
204 start = request.offset
205 end = start + request.limit
206 paginated = filtered[start:end]
208 # Collect unique values for filters
209 all_categories = sorted(set(s.category for s in catalog_servers))
210 all_auth_types = sorted(set(s.auth_type for s in catalog_servers))
211 all_providers = sorted(set(s.provider for s in catalog_servers))
212 all_tags = sorted(set(tag for s in catalog_servers for tag in s.tags))
214 response = CatalogListResponse(servers=paginated, total=total, categories=all_categories, auth_types=all_auth_types, providers=all_providers, all_tags=all_tags)
216 # Store in cache
217 if cache:
218 try:
219 cache_data = response.model_dump(mode="json")
220 await cache.set("catalog", cache_data, filters_hash)
221 except Exception as e:
222 logger.debug(f"Failed to cache catalog response: {e}")
224 return response
226 async def register_catalog_server(self, catalog_id: str, request: Optional[CatalogServerRegisterRequest], db: Session) -> CatalogServerRegisterResponse:
227 """Register a catalog server as a gateway.
229 Args:
230 catalog_id: Catalog server ID
231 request: Registration request with optional overrides
232 db: Database session
234 Returns:
235 Registration response
236 """
237 try:
238 # Load catalog to find the server
239 catalog_data = await self.load_catalog()
240 servers = catalog_data.get("catalog_servers", [])
242 # Find the server in catalog
243 server_data = None
244 for s in servers:
245 if s.get("id") == catalog_id:
246 server_data = s
247 break
249 if not server_data:
250 return CatalogServerRegisterResponse(success=False, server_id="", message="Server not found in catalog", error="Invalid catalog server ID")
252 # Check if already registered
253 try:
254 # First-Party
255 from mcpgateway.db import Gateway as DbGateway # pylint: disable=import-outside-toplevel
257 stmt = select(DbGateway).where(DbGateway.url == server_data["url"])
258 result = db.execute(stmt)
259 existing = result.scalar_one_or_none()
260 except Exception as e:
261 logger.warning(f"Error checking existing registration: {e}")
262 existing = None
264 if existing:
265 return CatalogServerRegisterResponse(success=False, server_id=str(existing.id), message="Server already registered", error="This server is already registered in the system")
267 # Prepare gateway creation request using proper schema
268 # First-Party
269 from mcpgateway.schemas import GatewayCreate # pylint: disable=import-outside-toplevel
271 # Use explicit transport if provided, otherwise auto-detect from URL
272 transport = server_data.get("transport")
273 if not transport:
274 # Detect transport type from URL or use SSE as default
275 url = server_data["url"].lower()
276 # Check for WebSocket patterns (highest priority)
277 if url.startswith("ws://") or url.startswith("wss://"):
278 transport = "WEBSOCKET" # WebSocket transport for ws:// and wss:// URLs
279 # Check for SSE patterns
280 elif url.endswith("/sse") or "/sse/" in url:
281 transport = "SSE" # SSE endpoints or paths containing /sse/
282 # Then check for HTTP patterns
283 elif "/mcp" in url or url.endswith("/"):
284 transport = "STREAMABLEHTTP" # Generic MCP endpoints typically use HTTP
285 else:
286 transport = "SSE" # Default to SSE for most catalog servers
288 # Check for IPv6 URLs early to provide a clear error message
289 url = server_data["url"]
290 if "[" in url or "]" in url:
291 return CatalogServerRegisterResponse(
292 success=False, server_id="", message="Registration failed", error="IPv6 URLs are not currently supported for security reasons. Please use IPv4 or domain names."
293 )
295 # Prepare the gateway creation data
296 gateway_data = {
297 "name": request.name if request and request.name else server_data["name"],
298 "url": server_data["url"],
299 "description": server_data["description"],
300 "transport": transport,
301 "tags": server_data.get("tags", []),
302 }
304 # Set authentication based on server requirements
305 auth_type = server_data.get("auth_type", "Open")
306 skip_initialization = False # Flag to skip connection test for OAuth servers without creds
308 if request and request.api_key and auth_type != "Open":
309 # Handle all possible auth types from the catalog
310 if auth_type in ["API Key", "API"]:
311 # Use bearer token for API key authentication
312 gateway_data["auth_type"] = "bearer"
313 gateway_data["auth_token"] = request.api_key
314 elif auth_type in ["OAuth2.1", "OAuth", "OAuth2.1 & API Key"]:
315 # OAuth servers and mixed auth may need API key as a bearer token
316 gateway_data["auth_type"] = "bearer"
317 gateway_data["auth_token"] = request.api_key
318 else:
319 # For any other auth types, use custom headers (as list of dicts)
320 gateway_data["auth_type"] = "authheaders"
321 gateway_data["auth_headers"] = [{"key": "X-API-Key", "value": request.api_key}]
322 elif auth_type in ["OAuth2.1", "OAuth"]:
323 # OAuth server without credentials - register but skip initialization
324 # User will need to complete OAuth flow later
325 skip_initialization = True
326 logger.info(f"Registering OAuth server {server_data['name']} without credentials - OAuth flow required later")
328 # For OAuth servers without credentials, register directly without connection test
329 if skip_initialization:
330 # Create minimal gateway entry without tool discovery
331 # First-Party
332 from mcpgateway.db import Gateway as DbGateway # pylint: disable=import-outside-toplevel
334 gateway_create = GatewayCreate(**gateway_data)
335 slug_name = slugify(gateway_data["name"])
337 db_gateway = DbGateway(
338 name=gateway_data["name"],
339 slug=slug_name,
340 url=gateway_data["url"],
341 description=gateway_data["description"],
342 tags=gateway_data.get("tags", []),
343 transport=gateway_data["transport"],
344 capabilities={},
345 auth_type="oauth", # Mark as OAuth so it can be identified after page refresh
346 enabled=False, # Disabled until OAuth is configured
347 created_via="catalog",
348 visibility="public",
349 version=1,
350 )
352 db.add(db_gateway)
353 db.commit()
354 db.refresh(db_gateway)
356 # First-Party
357 from mcpgateway.schemas import GatewayRead # pylint: disable=import-outside-toplevel
359 # Build dict for GatewayRead validation with converted tags
360 # This avoids mutating the database object
361 # Handle both legacy List[str] and new List[Dict[str, str]] tag formats
362 if db_gateway.tags:
363 if isinstance(db_gateway.tags[0], str):
364 # Legacy format: convert to dict format
365 tags_for_read = validate_tags_field(db_gateway.tags)
366 else:
367 # Already in dict format, pass through
368 tags_for_read = db_gateway.tags
369 else:
370 tags_for_read = []
372 gateway_dict = {
373 "id": db_gateway.id,
374 "name": db_gateway.name,
375 "slug": db_gateway.slug,
376 "url": db_gateway.url,
377 "description": db_gateway.description,
378 "tags": tags_for_read,
379 "transport": db_gateway.transport,
380 "capabilities": db_gateway.capabilities,
381 "created_at": db_gateway.created_at,
382 "updated_at": db_gateway.updated_at,
383 "enabled": db_gateway.enabled,
384 "reachable": db_gateway.reachable,
385 "last_seen": db_gateway.last_seen,
386 "auth_type": db_gateway.auth_type,
387 "visibility": db_gateway.visibility,
388 "version": db_gateway.version,
389 "team_id": db_gateway.team_id,
390 "owner_email": db_gateway.owner_email,
391 }
393 gateway_read = GatewayRead.model_validate(gateway_dict)
395 # Invalidate catalog cache since registration status changed
396 cache = self._get_registry_cache()
397 if cache:
398 await cache.invalidate_catalog()
400 return CatalogServerRegisterResponse(
401 success=True,
402 server_id=str(gateway_read.id),
403 message=f"Successfully registered {gateway_read.name} - OAuth configuration required before activation",
404 error=None,
405 oauth_required=True,
406 )
408 gateway_create = GatewayCreate(**gateway_data)
410 # Use the proper gateway registration method which will discover tools
411 gateway_read = await self._gateway_service.register_gateway(
412 db=db,
413 gateway=gateway_create,
414 created_via="catalog",
415 visibility="public", # Catalog servers should be public
416 initialize_timeout=settings.httpx_admin_read_timeout,
417 )
419 logger.info(f"Registered catalog server: {gateway_read.name} ({catalog_id})")
421 # Query for tools discovered from this gateway
422 # First-Party
423 from mcpgateway.db import Tool as DbTool # pylint: disable=import-outside-toplevel
425 tool_count = 0
426 if gateway_read.id:
427 stmt = select(DbTool).where(DbTool.gateway_id == gateway_read.id)
428 result = db.execute(stmt)
429 tools = result.scalars().all()
430 tool_count = len(tools)
432 message = f"Successfully registered {gateway_read.name}"
433 if tool_count > 0:
434 message += f" with {tool_count} tools discovered"
436 # Invalidate catalog cache since registration status changed
437 cache = self._get_registry_cache()
438 if cache:
439 await cache.invalidate_catalog()
441 return CatalogServerRegisterResponse(success=True, server_id=str(gateway_read.id), message=message, error=None)
443 except Exception as e:
444 logger.error(f"Failed to register catalog server {catalog_id}: {e}")
446 # Map common exceptions to user-friendly messages
447 error_str = str(e)
448 user_message = "Registration failed"
450 if "Connection refused" in error_str or "connect" in error_str.lower():
451 user_message = "Server is offline or unreachable"
452 elif "SSL" in error_str or "certificate" in error_str.lower():
453 user_message = "SSL certificate verification failed - check server security settings"
454 elif "timeout" in error_str.lower() or "timed out" in error_str.lower():
455 user_message = "Server took too long to respond - it may be slow or unavailable"
456 elif "401" in error_str or "Unauthorized" in error_str:
457 user_message = "Authentication failed - check API key or OAuth credentials"
458 elif "403" in error_str or "Forbidden" in error_str:
459 user_message = "Access forbidden - check permissions and API key"
460 elif "404" in error_str or "Not Found" in error_str:
461 user_message = "Server endpoint not found - check URL is correct"
462 elif "500" in error_str or "Internal Server Error" in error_str:
463 user_message = "Remote server error - the MCP server is experiencing issues"
464 elif "IPv6" in error_str:
465 user_message = "IPv6 URLs are not supported - please use IPv4 or domain names"
467 # Don't rollback here - let FastAPI handle it
468 # db.rollback()
469 return CatalogServerRegisterResponse(success=False, server_id="", message=user_message, error=error_str)
471 async def check_server_availability(self, catalog_id: str) -> CatalogServerStatusResponse:
472 """Check if a catalog server is available.
474 Args:
475 catalog_id: Catalog server ID
477 Returns:
478 Server status response
479 """
480 try:
481 # Load catalog to find the server
482 catalog_data = await self.load_catalog()
483 servers = catalog_data.get("catalog_servers", [])
485 # Find the server in catalog
486 server_data = None
487 for s in servers:
488 if s.get("id") == catalog_id:
489 server_data = s
490 break
492 if not server_data:
493 return CatalogServerStatusResponse(server_id=catalog_id, is_available=False, is_registered=False, error="Server not found in catalog")
495 # Check if registered (we'll need db passed in for this)
496 is_registered = False
498 # Perform health check
499 start_time = time.time()
500 is_available = False
501 error = None
503 try:
504 # First-Party
505 from mcpgateway.services.http_client_service import get_http_client # pylint: disable=import-outside-toplevel
507 client = await get_http_client()
508 # Try a simple GET request with short timeout
509 response = await client.get(server_data["url"], timeout=5.0, follow_redirects=True)
510 is_available = response.status_code < 500
511 except Exception as e:
512 error = str(e)
513 is_available = False
515 response_time_ms = (time.time() - start_time) * 1000
517 return CatalogServerStatusResponse(
518 server_id=catalog_id, is_available=is_available, is_registered=is_registered, last_checked=datetime.now(timezone.utc), response_time_ms=response_time_ms, error=error
519 )
521 except Exception as e:
522 logger.error(f"Failed to check server status for {catalog_id}: {e}")
523 return CatalogServerStatusResponse(server_id=catalog_id, is_available=False, is_registered=False, error=str(e))
525 async def bulk_register_servers(self, request: CatalogBulkRegisterRequest, db: Session) -> CatalogBulkRegisterResponse:
526 """Register multiple catalog servers.
528 Args:
529 request: Bulk registration request
530 db: Database session
532 Returns:
533 Bulk registration response
534 """
535 successful = []
536 failed = []
538 for server_id in request.server_ids:
539 try:
540 response = await self.register_catalog_server(catalog_id=server_id, request=None, db=db)
542 if response.success:
543 successful.append(server_id)
544 else:
545 failed.append({"server_id": server_id, "error": response.error or "Registration failed"})
547 if not request.skip_errors:
548 break
550 except Exception as e:
551 failed.append({"server_id": server_id, "error": str(e)})
553 if not request.skip_errors:
554 break
556 return CatalogBulkRegisterResponse(successful=successful, failed=failed, total_attempted=len(request.server_ids), total_successful=len(successful))
559# Global instance
560catalog_service = CatalogService()