Coverage for mcpgateway / services / server_service.py: 100%
572 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/server_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7ContextForge Server Service
9This module implements server management for the MCP Servers Catalog.
10It handles server registration, listing, retrieval, updates, activation toggling, and deletion.
11It also publishes event notifications for server changes.
12"""
14# Standard
15import asyncio
16import binascii
17from datetime import datetime, timezone
18from typing import Any, AsyncGenerator, Dict, List, Optional, Union
20# Third-Party
21import httpx
22from pydantic import ValidationError
23from sqlalchemy import and_, delete, desc, or_, select
24from sqlalchemy.exc import IntegrityError, OperationalError
25from sqlalchemy.orm import joinedload, selectinload, Session
27# First-Party
28from mcpgateway.config import settings
29from mcpgateway.db import A2AAgent as DbA2AAgent
30from mcpgateway.db import EmailTeam as DbEmailTeam
31from mcpgateway.db import EmailTeamMember as DbEmailTeamMember
32from mcpgateway.db import get_for_update
33from mcpgateway.db import Prompt as DbPrompt
34from mcpgateway.db import Resource as DbResource
35from mcpgateway.db import Server as DbServer
36from mcpgateway.db import ServerMetric, ServerMetricsHourly
37from mcpgateway.db import Tool as DbTool
38from mcpgateway.schemas import ServerCreate, ServerMetrics, ServerRead, ServerUpdate, TopPerformer
39from mcpgateway.services.audit_trail_service import get_audit_trail_service
40from mcpgateway.services.base_service import BaseService
41from mcpgateway.services.encryption_service import protect_oauth_config_for_storage
42from mcpgateway.services.logging_service import LoggingService
43from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge
44from mcpgateway.services.performance_tracker import get_performance_tracker
45from mcpgateway.services.structured_logger import get_structured_logger
46from mcpgateway.services.team_management_service import TeamManagementService
47from mcpgateway.utils.metrics_common import build_top_performers
48from mcpgateway.utils.pagination import unified_paginate
49from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr
51# Cache import (lazy to avoid circular dependencies)
52_REGISTRY_CACHE = None
55def _get_registry_cache():
56 """Get registry cache singleton lazily.
58 Returns:
59 RegistryCache instance.
60 """
61 global _REGISTRY_CACHE # pylint: disable=global-statement
62 if _REGISTRY_CACHE is None:
63 # First-Party
64 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel
66 _REGISTRY_CACHE = registry_cache
67 return _REGISTRY_CACHE
70def _validate_server_team_assignment(db: Session, user_email: Optional[str], target_team_id: Optional[str]) -> None:
71 """Validate team assignment and ownership requirements for server updates.
73 Args:
74 db: Database session used for membership checks.
75 user_email: Requesting user email. When omitted, ownership checks are skipped
76 for system/internal update paths.
77 target_team_id: Team identifier to validate.
79 Raises:
80 ValueError: If team ID is missing, team does not exist, or caller is not
81 an active team owner.
82 """
83 if not target_team_id:
84 raise ValueError("Cannot set visibility to 'team' without a team_id")
86 team = db.query(DbEmailTeam).filter(DbEmailTeam.id == target_team_id).first()
87 if not team:
88 raise ValueError(f"Team {target_team_id} not found")
90 # Preserve existing behavior for system/internal updates where
91 # user context may be intentionally omitted.
92 if not user_email:
93 return
95 membership = (
96 db.query(DbEmailTeamMember)
97 .filter(DbEmailTeamMember.team_id == target_team_id, DbEmailTeamMember.user_email == user_email, DbEmailTeamMember.is_active, DbEmailTeamMember.role == "owner")
98 .first()
99 )
100 if not membership:
101 raise ValueError("User membership in team not sufficient for this update.")
104# Initialize logging service first
105logging_service = LoggingService()
106logger = logging_service.get_logger(__name__)
109class ServerError(Exception):
110 """Base class for server-related errors."""
113class ServerNotFoundError(ServerError):
114 """Raised when a requested server is not found."""
117class ServerLockConflictError(ServerError):
118 """Raised when a server row is locked by another transaction."""
121class ServerNameConflictError(ServerError):
122 """Raised when a server name conflicts with an existing one."""
124 def __init__(self, name: str, enabled: bool = True, server_id: Optional[str] = None, visibility: str = "public") -> None:
125 """
126 Initialize a ServerNameConflictError exception.
128 This exception indicates a server name conflict, with additional context about visibility,
129 whether the conflicting server is active, and its ID if known. The error message starts
130 with the visibility information.
132 Visibility rules:
133 - public: Restricts server names globally (across all teams).
134 - team: Restricts server names only within the same team.
136 Args:
137 name: The server name that caused the conflict.
138 enabled: Whether the conflicting server is currently active. Defaults to True.
139 server_id: The ID of the conflicting server, if known. Only included in message for inactive servers.
140 visibility: The visibility of the conflicting server (e.g., "public", "private", "team").
142 Examples:
143 >>> error = ServerNameConflictError("My Server")
144 >>> str(error)
145 'Public Server already exists with name: My Server'
146 >>> error = ServerNameConflictError("My Server", enabled=False, server_id=123)
147 >>> str(error)
148 'Public Server already exists with name: My Server (currently inactive, ID: 123)'
149 >>> error.enabled
150 False
151 >>> error.server_id
152 123
153 >>> error = ServerNameConflictError("My Server", enabled=False, visibility="team")
154 >>> str(error)
155 'Team Server already exists with name: My Server (currently inactive, ID: None)'
156 >>> error.enabled
157 False
158 >>> error.server_id is None
159 True
160 """
161 self.name = name
162 self.enabled = enabled
163 self.server_id = server_id
164 message = f"{visibility.capitalize()} Server already exists with name: {name}"
165 if not enabled:
166 message += f" (currently inactive, ID: {server_id})"
167 super().__init__(message)
170class ServerService(BaseService):
171 """Service for managing MCP Servers in the catalog.
173 Provides methods to create, list, retrieve, update, set state, and delete server records.
174 Also supports event notifications for changes in server data.
175 """
177 _visibility_model_cls = DbServer
179 def __init__(self) -> None:
180 """Initialize a new ServerService instance.
182 Sets up the service with:
183 - An empty list for event subscribers that will receive server change notifications
184 - An HTTP client configured with timeout and SSL verification settings from config
186 The HTTP client is used for health checks and other server-related HTTP operations.
187 Event subscribers can register to receive notifications about server additions,
188 updates, activations, deactivations, and deletions.
190 Examples:
191 >>> from mcpgateway.services.server_service import ServerService
192 >>> service = ServerService()
193 >>> isinstance(service._event_subscribers, list)
194 True
195 >>> len(service._event_subscribers)
196 0
197 >>> hasattr(service, '_http_client')
198 True
199 """
200 self._event_subscribers: List[asyncio.Queue] = []
201 self._http_client = httpx.AsyncClient(
202 timeout=settings.federation_timeout,
203 verify=not settings.skip_ssl_verify,
204 limits=httpx.Limits(
205 max_connections=settings.httpx_max_connections,
206 max_keepalive_connections=settings.httpx_max_keepalive_connections,
207 keepalive_expiry=settings.httpx_keepalive_expiry,
208 ),
209 )
210 self._structured_logger = get_structured_logger("server_service")
211 self._audit_trail = get_audit_trail_service()
212 self._performance_tracker = get_performance_tracker()
214 async def initialize(self) -> None:
215 """Initialize the server service."""
216 logger.info("Initializing server service")
218 async def shutdown(self) -> None:
219 """Shutdown the server service."""
220 await self._http_client.aclose()
221 logger.info("Server service shutdown complete")
223 # get_top_server
224 async def get_top_servers(self, db: Session, limit: Optional[int] = 5, include_deleted: bool = False) -> List[TopPerformer]:
225 """Retrieve the top-performing servers based on execution count.
227 Queries the database to get servers with their metrics, ordered by the number of executions
228 in descending order. Combines recent raw metrics with historical hourly rollups for complete
229 historical coverage. Returns a list of TopPerformer objects containing server details and
230 performance metrics. Results are cached for performance.
232 Args:
233 db (Session): Database session for querying server metrics.
234 limit (Optional[int]): Maximum number of servers to return. Defaults to 5.
235 include_deleted (bool): Whether to include deleted servers from rollups.
237 Returns:
238 List[TopPerformer]: A list of TopPerformer objects, each containing:
239 - id: Server ID.
240 - name: Server name.
241 - execution_count: Total number of executions.
242 - avg_response_time: Average response time in seconds, or None if no metrics.
243 - success_rate: Success rate percentage, or None if no metrics.
244 - last_execution: Timestamp of the last execution, or None if no metrics.
245 """
246 # Check cache first (if enabled)
247 # First-Party
248 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel
250 effective_limit = limit or 5
251 cache_key = f"top_servers:{effective_limit}:include_deleted={include_deleted}"
253 if is_cache_enabled():
254 cached = metrics_cache.get(cache_key)
255 if cached is not None:
256 return cached
258 # Use combined query that includes both raw metrics and rollup data
259 # First-Party
260 from mcpgateway.services.metrics_query_service import get_top_performers_combined # pylint: disable=import-outside-toplevel
262 results = get_top_performers_combined(
263 db=db,
264 metric_type="server",
265 entity_model=DbServer,
266 limit=effective_limit,
267 include_deleted=include_deleted,
268 )
269 top_performers = build_top_performers(results)
271 # Cache the result (if enabled)
272 if is_cache_enabled():
273 metrics_cache.set(cache_key, top_performers)
275 return top_performers
277 def convert_server_to_read(self, server: DbServer, include_metrics: bool = False) -> ServerRead:
278 """
279 Converts a DbServer instance into a ServerRead model, optionally including aggregated metrics.
281 Args:
282 server (DbServer): The ORM instance of the server.
283 include_metrics (bool): Whether to include metrics in the result. Defaults to False.
284 Set to False for list operations to avoid N+1 query issues.
286 Returns:
287 ServerRead: The Pydantic model representing the server, optionally including aggregated metrics.
289 Examples:
290 >>> from types import SimpleNamespace
291 >>> from datetime import datetime, timezone
292 >>> svc = ServerService()
293 >>> now = datetime.now(timezone.utc)
294 >>> # Fake metric objects
295 >>> m1 = SimpleNamespace(is_success=True, response_time=0.2, timestamp=now)
296 >>> m2 = SimpleNamespace(is_success=False, response_time=0.4, timestamp=now)
297 >>> server = SimpleNamespace(
298 ... id='s1', name='S', description=None, icon=None,
299 ... created_at=now, updated_at=now, enabled=True,
300 ... associated_tools=[], associated_resources=[], associated_prompts=[], associated_a2a_agents=[],
301 ... tags=[], metrics=[m1, m2],
302 ... metrics_summary={"total_executions": 2, "successful_executions": 1, "failed_executions": 1,
303 ... "failure_rate": 0.5, "min_response_time": 0.2, "max_response_time": 0.4,
304 ... "avg_response_time": 0.3, "last_execution_time": now},
305 ... tools=[], resources=[], prompts=[], a2a_agents=[],
306 ... team_id=None, owner_email=None, visibility=None,
307 ... created_by=None, modified_by=None
308 ... )
309 >>> result = svc.convert_server_to_read(server, include_metrics=True)
310 >>> result.metrics.total_executions
311 2
312 >>> result.metrics.successful_executions
313 1
314 """
315 # Build dict explicitly from attributes to ensure SQLAlchemy populates them
316 # (using __dict__.copy() can return empty dict with certain query patterns)
317 server_dict = {
318 "id": server.id,
319 "name": server.name,
320 "description": server.description,
321 "icon": server.icon,
322 "enabled": server.enabled,
323 "created_at": server.created_at,
324 "updated_at": server.updated_at,
325 "team_id": server.team_id,
326 "owner_email": server.owner_email,
327 "visibility": server.visibility,
328 "created_by": server.created_by,
329 "created_from_ip": getattr(server, "created_from_ip", None),
330 "created_via": getattr(server, "created_via", None),
331 "created_user_agent": getattr(server, "created_user_agent", None),
332 "modified_by": server.modified_by,
333 "modified_from_ip": getattr(server, "modified_from_ip", None),
334 "modified_via": getattr(server, "modified_via", None),
335 "modified_user_agent": getattr(server, "modified_user_agent", None),
336 "import_batch_id": getattr(server, "import_batch_id", None),
337 "federation_source": getattr(server, "federation_source", None),
338 "version": getattr(server, "version", None),
339 "tags": server.tags or [],
340 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata
341 "oauth_enabled": getattr(server, "oauth_enabled", False),
342 "oauth_config": getattr(server, "oauth_config", None),
343 }
345 # Compute aggregated metrics only if requested (avoids N+1 queries in list operations)
346 if include_metrics:
347 # Use metrics_summary which combines raw + hourly rollup data (matches tool_service pattern)
348 metrics = server.metrics_summary
349 server_dict["metrics"] = {
350 "total_executions": metrics["total_executions"],
351 "successful_executions": metrics["successful_executions"],
352 "failed_executions": metrics["failed_executions"],
353 "failure_rate": metrics["failure_rate"],
354 "min_response_time": metrics["min_response_time"],
355 "max_response_time": metrics["max_response_time"],
356 "avg_response_time": metrics["avg_response_time"],
357 "last_execution_time": metrics["last_execution_time"],
358 }
359 else:
360 server_dict["metrics"] = None
361 # Add associated IDs from relationships
362 server_dict["associated_tools"] = [tool.name for tool in server.tools] if server.tools else []
363 server_dict["associated_tool_ids"] = [str(tool.id) for tool in server.tools] if server.tools else []
364 server_dict["associated_resources"] = [res.id for res in server.resources] if server.resources else []
365 server_dict["associated_prompts"] = [prompt.id for prompt in server.prompts] if server.prompts else []
366 server_dict["associated_a2a_agents"] = [agent.id for agent in server.a2a_agents] if server.a2a_agents else []
368 # Team name is loaded via server.team property from email_team relationship
369 server_dict["team"] = getattr(server, "team", None)
371 return ServerRead.model_validate(server_dict).masked()
373 def _assemble_associated_items(
374 self,
375 tools: Optional[List[str]],
376 resources: Optional[List[str]],
377 prompts: Optional[List[str]],
378 a2a_agents: Optional[List[str]] = None,
379 gateways: Optional[List[str]] = None,
380 ) -> Dict[str, Any]:
381 """
382 Assemble the associated items dictionary from the separate fields.
384 Args:
385 tools: List of tool IDs.
386 resources: List of resource IDs.
387 prompts: List of prompt IDs.
388 a2a_agents: List of A2A agent IDs.
389 gateways: List of gateway IDs.
391 Returns:
392 A dictionary with keys "tools", "resources", "prompts", "a2a_agents", and "gateways".
394 Examples:
395 >>> service = ServerService()
396 >>> # Test with all None values
397 >>> result = service._assemble_associated_items(None, None, None)
398 >>> result
399 {'tools': [], 'resources': [], 'prompts': [], 'a2a_agents': [], 'gateways': []}
401 >>> # Test with empty lists
402 >>> result = service._assemble_associated_items([], [], [])
403 >>> result
404 {'tools': [], 'resources': [], 'prompts': [], 'a2a_agents': [], 'gateways': []}
406 >>> # Test with actual values
407 >>> result = service._assemble_associated_items(['tool1', 'tool2'], ['res1'], ['prompt1'])
408 >>> result
409 {'tools': ['tool1', 'tool2'], 'resources': ['res1'], 'prompts': ['prompt1'], 'a2a_agents': [], 'gateways': []}
411 >>> # Test with mixed None and values
412 >>> result = service._assemble_associated_items(['tool1'], None, ['prompt1'])
413 >>> result
414 {'tools': ['tool1'], 'resources': [], 'prompts': ['prompt1'], 'a2a_agents': [], 'gateways': []}
415 """
416 return {
417 "tools": tools or [],
418 "resources": resources or [],
419 "prompts": prompts or [],
420 "a2a_agents": a2a_agents or [],
421 "gateways": gateways or [],
422 }
424 async def register_server(
425 self,
426 db: Session,
427 server_in: ServerCreate,
428 created_by: Optional[str] = None,
429 created_from_ip: Optional[str] = None,
430 created_via: Optional[str] = None,
431 created_user_agent: Optional[str] = None,
432 team_id: Optional[str] = None,
433 owner_email: Optional[str] = None,
434 visibility: Optional[str] = "public",
435 ) -> ServerRead:
436 """
437 Register a new server in the catalog and validate that all associated items exist.
439 This function performs the following steps:
440 1. Checks if a server with the same name already exists.
441 2. Creates a new server record.
442 3. For each ID provided in associated_tools, associated_resources, and associated_prompts,
443 verifies that the corresponding item exists. If an item does not exist, an error is raised.
444 4. Associates the verified items to the new server.
445 5. Commits the transaction, refreshes the ORM instance, and forces the loading of relationship data.
446 6. Constructs a response dictionary that includes lists of associated item IDs.
447 7. Notifies subscribers of the addition and returns the validated response.
449 Args:
450 db (Session): The SQLAlchemy database session.
451 server_in (ServerCreate): The server creation schema containing server details and lists of
452 associated tool, resource, and prompt IDs (as strings).
453 created_by (Optional[str]): Email of the user creating the server, used for ownership tracking.
454 created_from_ip (Optional[str]): IP address from which the creation request originated.
455 created_via (Optional[str]): Source of creation (api, ui, import).
456 created_user_agent (Optional[str]): User agent string from the creation request.
457 team_id (Optional[str]): Team ID to assign the server to.
458 owner_email (Optional[str]): Email of the user who owns this server.
459 visibility (str): Server visibility level (private, team, public).
461 Returns:
462 ServerRead: The newly created server, with associated item IDs.
464 Raises:
465 IntegrityError: If a database integrity error occurs.
466 ServerNameConflictError: If a server name conflict occurs (public or team visibility).
467 ServerError: If any associated tool, resource, or prompt does not exist, or if any other registration error occurs.
469 Examples:
470 >>> from mcpgateway.services.server_service import ServerService
471 >>> from unittest.mock import MagicMock, AsyncMock, patch
472 >>> from mcpgateway.schemas import ServerRead
473 >>> service = ServerService()
474 >>> db = MagicMock()
475 >>> server_in = MagicMock()
476 >>> server_in.id = None # No custom UUID for this test
477 >>> db.execute.return_value.scalar_one_or_none.return_value = None
478 >>> db.add = MagicMock()
479 >>> db.commit = MagicMock()
480 >>> db.refresh = MagicMock()
481 >>> service._notify_server_added = AsyncMock()
482 >>> service.convert_server_to_read = MagicMock(return_value='server_read')
483 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes
484 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes
485 >>> ServerRead.model_validate = MagicMock(return_value='server_read')
486 >>> import asyncio
487 >>> asyncio.run(service.register_server(db, server_in))
488 'server_read'
489 """
490 try:
491 logger.info(f"Registering server: {server_in.name}")
492 oauth_config = await protect_oauth_config_for_storage(getattr(server_in, "oauth_config", None))
493 # # Create the new server record.
494 db_server = DbServer(
495 name=server_in.name,
496 description=server_in.description,
497 icon=server_in.icon,
498 enabled=True,
499 tags=server_in.tags or [],
500 # Team scoping fields - use schema values if provided, otherwise fallback to parameters
501 team_id=getattr(server_in, "team_id", None) or team_id,
502 owner_email=getattr(server_in, "owner_email", None) or owner_email or created_by,
503 # IMPORTANT: Prefer function parameter over schema default
504 # The API has visibility as a separate Body param that should override schema default
505 visibility=visibility or getattr(server_in, "visibility", None) or "public",
506 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata
507 oauth_enabled=getattr(server_in, "oauth_enabled", False) or False,
508 oauth_config=oauth_config,
509 # Metadata fields
510 created_by=created_by,
511 created_from_ip=created_from_ip,
512 created_via=created_via,
513 created_user_agent=created_user_agent,
514 version=1,
515 )
516 # Check for existing server with the same name (with row locking to prevent race conditions)
517 # The unique constraint is on (team_id, owner_email, name), so we check based on that
518 owner_email_to_check = getattr(server_in, "owner_email", None) or owner_email or created_by
519 team_id_to_check = getattr(server_in, "team_id", None) or team_id
521 # Build conditions based on the actual unique constraint: (team_id, owner_email, name)
522 conditions = [
523 DbServer.name == server_in.name,
524 DbServer.team_id == team_id_to_check if team_id_to_check else DbServer.team_id.is_(None),
525 DbServer.owner_email == owner_email_to_check if owner_email_to_check else DbServer.owner_email.is_(None),
526 ]
527 if server_in.id:
528 conditions.append(DbServer.id != server_in.id)
530 existing_server = get_for_update(db, DbServer, where=and_(*conditions))
531 if existing_server:
532 raise ServerNameConflictError(server_in.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility)
533 # Set custom UUID if provided
534 if server_in.id:
535 logger.info(f"Setting custom UUID for server: {server_in.id}")
536 db_server.id = server_in.id
537 logger.info(f"Adding server to DB session: {db_server.name}")
538 db.add(db_server)
540 # Associate tools, verifying each exists using bulk query when multiple items
541 if server_in.associated_tools:
542 tool_ids = [tool_id.strip() for tool_id in server_in.associated_tools if tool_id.strip()]
543 if len(tool_ids) > 1:
544 # Use bulk query for multiple items
545 tools = db.execute(select(DbTool).where(DbTool.id.in_(tool_ids))).scalars().all()
546 found_tool_ids = {tool.id for tool in tools}
547 missing_tool_ids = set(tool_ids) - found_tool_ids
548 if missing_tool_ids:
549 raise ServerError(f"Tools with ids {missing_tool_ids} do not exist.")
550 db_server.tools.extend(tools)
551 elif tool_ids:
552 # Use single query for single item (maintains test compatibility)
553 tool_obj = db.get(DbTool, tool_ids[0])
554 if not tool_obj:
555 raise ServerError(f"Tool with id {tool_ids[0]} does not exist.")
556 db_server.tools.append(tool_obj)
558 # Associate resources, verifying each exists using bulk query when multiple items
559 if server_in.associated_resources:
560 resource_ids = [resource_id.strip() for resource_id in server_in.associated_resources if resource_id.strip()]
561 if len(resource_ids) > 1:
562 # Use bulk query for multiple items
563 resources = db.execute(select(DbResource).where(DbResource.id.in_(resource_ids))).scalars().all()
564 found_resource_ids = {resource.id for resource in resources}
565 missing_resource_ids = set(resource_ids) - found_resource_ids
566 if missing_resource_ids:
567 raise ServerError(f"Resources with ids {missing_resource_ids} do not exist.")
568 db_server.resources.extend(resources)
569 elif resource_ids:
570 # Use single query for single item (maintains test compatibility)
571 resource_obj = db.get(DbResource, resource_ids[0])
572 if not resource_obj:
573 raise ServerError(f"Resource with id {resource_ids[0]} does not exist.")
574 db_server.resources.append(resource_obj)
576 # Associate prompts, verifying each exists using bulk query when multiple items
577 if server_in.associated_prompts:
578 prompt_ids = [prompt_id.strip() for prompt_id in server_in.associated_prompts if prompt_id.strip()]
579 if len(prompt_ids) > 1:
580 # Use bulk query for multiple items
581 prompts = db.execute(select(DbPrompt).where(DbPrompt.id.in_(prompt_ids))).scalars().all()
582 found_prompt_ids = {prompt.id for prompt in prompts}
583 missing_prompt_ids = set(prompt_ids) - found_prompt_ids
584 if missing_prompt_ids:
585 raise ServerError(f"Prompts with ids {missing_prompt_ids} do not exist.")
586 db_server.prompts.extend(prompts)
587 elif prompt_ids:
588 # Use single query for single item (maintains test compatibility)
589 prompt_obj = db.get(DbPrompt, prompt_ids[0])
590 if not prompt_obj:
591 raise ServerError(f"Prompt with id {prompt_ids[0]} does not exist.")
592 db_server.prompts.append(prompt_obj)
594 # Associate A2A agents, verifying each exists using bulk query when multiple items
595 if server_in.associated_a2a_agents:
596 agent_ids = [agent_id.strip() for agent_id in server_in.associated_a2a_agents if agent_id.strip()]
597 if len(agent_ids) > 1:
598 # Use bulk query for multiple items
599 agents = db.execute(select(DbA2AAgent).where(DbA2AAgent.id.in_(agent_ids))).scalars().all()
600 found_agent_ids = {agent.id for agent in agents}
601 missing_agent_ids = set(agent_ids) - found_agent_ids
602 if missing_agent_ids:
603 raise ServerError(f"A2A Agents with ids {missing_agent_ids} do not exist.")
604 db_server.a2a_agents.extend(agents)
606 # Note: Auto-tool creation for A2A agents should be handled
607 # by a separate service or background task to avoid circular imports
608 for agent in agents:
609 logger.info(f"A2A agent {agent.name} associated with server {db_server.name}")
610 elif agent_ids:
611 # Use single query for single item (maintains test compatibility)
612 agent_obj = db.get(DbA2AAgent, agent_ids[0])
613 if not agent_obj:
614 raise ServerError(f"A2A Agent with id {agent_ids[0]} does not exist.")
615 db_server.a2a_agents.append(agent_obj)
616 logger.info(f"A2A agent {agent_obj.name} associated with server {db_server.name}")
618 # Commit the new record and refresh.
619 db.commit()
620 db.refresh(db_server)
621 # Force load the relationship attributes.
622 _ = db_server.tools, db_server.resources, db_server.prompts, db_server.a2a_agents
624 # Assemble response data with associated item IDs.
625 server_data = {
626 "id": db_server.id,
627 "name": db_server.name,
628 "description": db_server.description,
629 "icon": db_server.icon,
630 "created_at": db_server.created_at,
631 "updated_at": db_server.updated_at,
632 "enabled": db_server.enabled,
633 "associated_tools": [str(tool.id) for tool in db_server.tools],
634 "associated_resources": [str(resource.id) for resource in db_server.resources],
635 "associated_prompts": [str(prompt.id) for prompt in db_server.prompts],
636 }
637 logger.debug(f"Server Data: {server_data}")
638 await self._notify_server_added(db_server)
639 logger.info(f"Registered server: {server_in.name}")
641 # Structured logging: Audit trail for server creation
642 self._audit_trail.log_action(
643 user_id=created_by or "system",
644 action="create_server",
645 resource_type="server",
646 resource_id=db_server.id,
647 details={
648 "server_name": db_server.name,
649 "visibility": visibility,
650 "team_id": team_id,
651 "associated_tools_count": len(db_server.tools),
652 "associated_resources_count": len(db_server.resources),
653 "associated_prompts_count": len(db_server.prompts),
654 "associated_a2a_agents_count": len(db_server.a2a_agents),
655 },
656 metadata={
657 "created_from_ip": created_from_ip,
658 "created_via": created_via,
659 "created_user_agent": created_user_agent,
660 },
661 )
663 # Structured logging: Log successful server creation
664 self._structured_logger.log(
665 level="INFO",
666 message="Server created successfully",
667 event_type="server_created",
668 component="server_service",
669 server_id=db_server.id,
670 server_name=db_server.name,
671 visibility=visibility,
672 created_by=created_by,
673 user_email=created_by,
674 )
676 # Team name is loaded via db_server.team property from email_team relationship
677 return self.convert_server_to_read(db_server)
678 except IntegrityError as ie:
679 db.rollback()
680 logger.error(f"IntegrityErrors in group: {ie}")
682 # Structured logging: Log database integrity error
683 self._structured_logger.log(
684 level="ERROR",
685 message="Server creation failed due to database integrity error",
686 event_type="server_creation_failed",
687 component="server_service",
688 server_name=server_in.name,
689 error_type="IntegrityError",
690 error_message=str(ie),
691 created_by=created_by,
692 user_email=created_by,
693 )
694 raise ie
695 except ServerNameConflictError as se:
696 db.rollback()
698 # Structured logging: Log name conflict error
699 self._structured_logger.log(
700 level="WARNING",
701 message="Server creation failed due to name conflict",
702 event_type="server_name_conflict",
703 component="server_service",
704 server_name=server_in.name,
705 visibility=visibility,
706 created_by=created_by,
707 user_email=created_by,
708 )
709 raise se
710 except Exception as ex:
711 db.rollback()
713 # Structured logging: Log generic server creation failure
714 self._structured_logger.log(
715 level="ERROR",
716 message="Server creation failed",
717 event_type="server_creation_failed",
718 component="server_service",
719 server_name=server_in.name,
720 error_type=type(ex).__name__,
721 error_message=str(ex),
722 created_by=created_by,
723 user_email=created_by,
724 )
725 raise ServerError(f"Failed to register server: {str(ex)}")
727 async def list_servers(
728 self,
729 db: Session,
730 include_inactive: bool = False,
731 include_metrics: bool = False,
732 tags: Optional[List[str]] = None,
733 cursor: Optional[str] = None,
734 limit: Optional[int] = None,
735 page: Optional[int] = None,
736 per_page: Optional[int] = None,
737 user_email: Optional[str] = None,
738 team_id: Optional[str] = None,
739 visibility: Optional[str] = None,
740 token_teams: Optional[List[str]] = None,
741 ) -> Union[tuple[List[ServerRead], Optional[str]], Dict[str, Any]]:
742 """List all registered servers with cursor or page-based pagination and optional team filtering.
744 Args:
745 db: Database session.
746 include_inactive: Whether to include inactive servers.
747 include_metrics: Whether to include aggregated metrics in the results.
748 tags: Filter servers by tags. If provided, only servers with at least one matching tag will be returned.
749 cursor: Cursor for pagination (encoded last created_at and id).
750 limit: Maximum number of servers to return. None for default, 0 for unlimited.
751 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
752 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size.
753 user_email: Email of user for team-based access control. None for no access control.
754 team_id: Optional team ID to filter by specific team (requires user_email).
755 visibility: Optional visibility filter (private, team, public) (requires user_email).
756 token_teams: Optional list of team IDs from the token (None=unrestricted, []=public-only).
758 Returns:
759 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}}
760 If cursor is provided or neither: tuple of (list of ServerRead objects, next_cursor).
762 Examples:
763 >>> from mcpgateway.services.server_service import ServerService
764 >>> from unittest.mock import MagicMock
765 >>> service = ServerService()
766 >>> db = MagicMock()
767 >>> server_read = MagicMock()
768 >>> service.convert_server_to_read = MagicMock(return_value=server_read)
769 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()]
770 >>> import asyncio
771 >>> servers, cursor = asyncio.run(service.list_servers(db))
772 >>> isinstance(servers, list) and cursor is None
773 True
774 """
775 # Check cache for first page only
776 # SECURITY: Only cache public-only results (token_teams=[])
777 # - token_teams=None (admin bypass): Don't cache - admin sees all, should be fresh
778 # - token_teams=[] (public-only): Cache - same result for all public-only users
779 # - token_teams=[...] (team-scoped): Don't cache - results vary by team
780 # - user_email set: Don't cache - results vary by user ownership
781 cache = _get_registry_cache()
782 is_public_only = token_teams is not None and len(token_teams) == 0
783 use_cache = cursor is None and user_email is None and page is None and is_public_only
784 if use_cache:
785 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None, visibility=visibility)
786 cached = await cache.get("servers", filters_hash)
787 if cached is not None:
788 # Reconstruct ServerRead objects from cached dicts
789 cached_servers = [ServerRead.model_validate(s).masked() for s in cached["servers"]]
790 return (cached_servers, cached.get("next_cursor"))
792 # Build base query with ordering and eager load relationships to avoid N+1
793 query = (
794 select(DbServer)
795 .options(
796 selectinload(DbServer.tools),
797 selectinload(DbServer.resources),
798 selectinload(DbServer.prompts),
799 selectinload(DbServer.a2a_agents),
800 joinedload(DbServer.email_team),
801 )
802 .order_by(desc(DbServer.created_at), desc(DbServer.id))
803 )
805 # Eager load metrics relationships to prevent N+1 queries when include_metrics=true
806 if include_metrics:
807 query = query.options(selectinload(DbServer.metrics), selectinload(DbServer.metrics_hourly))
809 # Apply active/inactive filter
810 if not include_inactive:
811 query = query.where(DbServer.enabled)
813 query = await self._apply_access_control(query, db, user_email, token_teams, team_id)
815 if visibility:
816 query = query.where(DbServer.visibility == visibility)
818 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats)
819 if tags:
820 query = query.where(json_contains_tag_expr(db, DbServer.tags, tags, match_any=True))
822 # Use unified pagination helper - handles both page and cursor pagination
823 pag_result = await unified_paginate(
824 db=db,
825 query=query,
826 page=page,
827 per_page=per_page,
828 cursor=cursor,
829 limit=limit,
830 base_url="/admin/servers", # Used for page-based links
831 query_params={"include_inactive": include_inactive} if include_inactive else {},
832 )
834 next_cursor = None
835 # Extract servers based on pagination type
836 if page is not None:
837 # Page-based: pag_result is a dict
838 servers_db = pag_result["data"]
839 else:
840 # Cursor-based: pag_result is a tuple
841 servers_db, next_cursor = pag_result
843 db.commit() # Release transaction to avoid idle-in-transaction
845 # Convert to ServerRead (common for both pagination types)
846 # Team names are loaded via joinedload(DbServer.email_team)
847 result = []
848 for s in servers_db:
849 try:
850 result.append(self.convert_server_to_read(s, include_metrics=include_metrics))
851 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
852 logger.exception(f"Failed to convert server {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}")
853 # Continue with remaining servers instead of failing completely
855 # Return appropriate format based on pagination type
856 if page is not None:
857 # Page-based format
858 return {
859 "data": result,
860 "pagination": pag_result["pagination"],
861 "links": pag_result["links"],
862 }
864 # Cursor-based format
866 # Cache first page results - only for public-only queries (no user/team filtering)
867 # SECURITY: Only cache public-only results (token_teams=[]), never admin bypass or team-scoped
868 if cursor is None and user_email is None and is_public_only:
869 try:
870 cache_data = {"servers": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor}
871 await cache.set("servers", cache_data, filters_hash)
872 except AttributeError:
873 pass # Skip caching if result objects don't support model_dump (e.g., in doctests)
875 return (result, next_cursor)
877 async def list_servers_for_user(
878 self, db: Session, user_email: str, team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100
879 ) -> List[ServerRead]:
880 """
881 DEPRECATED: Use list_servers() with user_email parameter instead.
883 This method is maintained for backward compatibility but is no longer used.
884 New code should call list_servers() with user_email, team_id, and visibility parameters.
886 List servers user has access to with team filtering.
888 Args:
889 db: Database session
890 user_email: Email of the user requesting servers
891 team_id: Optional team ID to filter by specific team
892 visibility: Optional visibility filter (private, team, public)
893 include_inactive: Whether to include inactive servers
894 skip: Number of servers to skip for pagination
895 limit: Maximum number of servers to return
897 Returns:
898 List[ServerRead]: Servers the user has access to
899 """
900 # Build query following existing patterns from list_servers()
901 team_service = TeamManagementService(db)
902 user_teams = await team_service.get_user_teams(user_email)
903 team_ids = [team.id for team in user_teams]
905 # Eager load relationships to avoid N+1 queries
906 query = select(DbServer).options(
907 selectinload(DbServer.tools),
908 selectinload(DbServer.resources),
909 selectinload(DbServer.prompts),
910 selectinload(DbServer.a2a_agents),
911 joinedload(DbServer.email_team),
912 )
914 # Apply active/inactive filter
915 if not include_inactive:
916 query = query.where(DbServer.enabled)
918 if team_id:
919 if team_id not in team_ids:
920 return [] # No access to team
922 access_conditions = []
923 # Filter by specific team
924 access_conditions.append(and_(DbServer.team_id == team_id, DbServer.visibility.in_(["team", "public"])))
926 access_conditions.append(and_(DbServer.team_id == team_id, DbServer.owner_email == user_email))
928 query = query.where(or_(*access_conditions))
929 else:
930 # Get user's accessible teams
931 # Build access conditions following existing patterns
932 access_conditions = []
934 # 1. User's personal resources (owner_email matches)
935 access_conditions.append(DbServer.owner_email == user_email)
937 # 2. Team resources where user is member
938 if team_ids:
939 access_conditions.append(and_(DbServer.team_id.in_(team_ids), DbServer.visibility.in_(["team", "public"])))
941 # 3. Public resources (if visibility allows)
942 access_conditions.append(DbServer.visibility == "public")
944 query = query.where(or_(*access_conditions))
946 # Apply visibility filter if specified
947 if visibility:
948 query = query.where(DbServer.visibility == visibility)
950 # Apply pagination following existing patterns
951 query = query.offset(skip).limit(limit)
953 servers = db.execute(query).scalars().all()
955 db.commit() # Release transaction to avoid idle-in-transaction
957 # Skip metrics to avoid N+1 queries in list operations
958 # Team names are loaded via joinedload(DbServer.email_team)
959 result = []
960 for s in servers:
961 try:
962 result.append(self.convert_server_to_read(s, include_metrics=False))
963 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
964 logger.exception(f"Failed to convert server {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}")
965 # Continue with remaining servers instead of failing completely
966 return result
968 async def get_server(self, db: Session, server_id: str) -> ServerRead:
969 """Retrieve server details by ID.
971 Args:
972 db: Database session.
973 server_id: The unique identifier of the server.
975 Returns:
976 The corresponding ServerRead object.
978 Raises:
979 ServerNotFoundError: If no server with the given ID exists.
981 Examples:
982 >>> from mcpgateway.services.server_service import ServerService
983 >>> from unittest.mock import MagicMock
984 >>> service = ServerService()
985 >>> db = MagicMock()
986 >>> server = MagicMock()
987 >>> db.get.return_value = server
988 >>> service.convert_server_to_read = MagicMock(return_value='server_read')
989 >>> import asyncio
990 >>> asyncio.run(service.get_server(db, 'server_id'))
991 'server_read'
992 """
993 server = db.execute(
994 select(DbServer)
995 .options(
996 selectinload(DbServer.tools),
997 selectinload(DbServer.resources),
998 selectinload(DbServer.prompts),
999 selectinload(DbServer.a2a_agents),
1000 joinedload(DbServer.email_team),
1001 )
1002 .where(DbServer.id == server_id)
1003 ).scalar_one_or_none()
1004 if not server:
1005 raise ServerNotFoundError(f"Server not found: {server_id}")
1006 server_data = {
1007 "id": server.id,
1008 "name": server.name,
1009 "description": server.description,
1010 "icon": server.icon,
1011 "created_at": server.created_at,
1012 "updated_at": server.updated_at,
1013 "enabled": server.enabled,
1014 "associated_tools": [tool.name for tool in server.tools],
1015 "associated_resources": [res.id for res in server.resources],
1016 "associated_prompts": [prompt.id for prompt in server.prompts],
1017 }
1018 logger.debug(f"Server Data: {server_data}")
1019 # Team name is loaded via server.team property from email_team relationship
1020 server_read = self.convert_server_to_read(server)
1022 self._structured_logger.log(
1023 level="INFO",
1024 message="Server retrieved successfully",
1025 event_type="server_viewed",
1026 component="server_service",
1027 server_id=server.id,
1028 server_name=server.name,
1029 team_id=getattr(server, "team_id", None),
1030 resource_type="server",
1031 resource_id=server.id,
1032 custom_fields={
1033 "enabled": server.enabled,
1034 "tool_count": len(getattr(server, "tools", []) or []),
1035 "resource_count": len(getattr(server, "resources", []) or []),
1036 "prompt_count": len(getattr(server, "prompts", []) or []),
1037 },
1038 )
1040 self._audit_trail.log_action(
1041 action="view_server",
1042 resource_type="server",
1043 resource_id=server.id,
1044 resource_name=server.name,
1045 user_id="system",
1046 team_id=getattr(server, "team_id", None),
1047 context={"enabled": server.enabled},
1048 db=db,
1049 )
1051 return server_read
1053 async def update_server(
1054 self,
1055 db: Session,
1056 server_id: str,
1057 server_update: ServerUpdate,
1058 user_email: str,
1059 modified_by: Optional[str] = None,
1060 modified_from_ip: Optional[str] = None,
1061 modified_via: Optional[str] = None,
1062 modified_user_agent: Optional[str] = None,
1063 ) -> ServerRead:
1064 """Update an existing server.
1066 Args:
1067 db: Database session.
1068 server_id: The unique identifier of the server.
1069 server_update: Server update schema with new data.
1070 user_email: email of the user performing the update (for permission checks).
1071 modified_by: Username who modified this server.
1072 modified_from_ip: IP address from which modification was made.
1073 modified_via: Source of modification (api, ui, etc.).
1074 modified_user_agent: User agent of the client making the modification.
1076 Returns:
1077 The updated ServerRead object.
1079 Raises:
1080 ServerNotFoundError: If the server is not found.
1081 PermissionError: If user doesn't own the server.
1082 ServerNameConflictError: If a new name conflicts with an existing server.
1083 ServerError: For other update errors.
1084 IntegrityError: If a database integrity error occurs.
1085 ValueError: If visibility or team constraints are violated.
1087 Examples:
1088 >>> from mcpgateway.services.server_service import ServerService
1089 >>> from unittest.mock import MagicMock, AsyncMock, patch
1090 >>> from mcpgateway.schemas import ServerRead
1091 >>> service = ServerService()
1092 >>> db = MagicMock()
1093 >>> server = MagicMock()
1094 >>> server.id = 'server_id'
1095 >>> server.name = 'test_server'
1096 >>> server.owner_email = 'user_email' # Set owner to match user performing update
1097 >>> server.team_id = None
1098 >>> server.visibility = 'public'
1099 >>> db.get.return_value = server
1100 >>> db.commit = MagicMock()
1101 >>> db.refresh = MagicMock()
1102 >>> db.execute.return_value.scalar_one_or_none.return_value = None
1103 >>> service.convert_server_to_read = MagicMock(return_value='server_read')
1104 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes
1105 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes
1106 >>> ServerRead.model_validate = MagicMock(return_value='server_read')
1107 >>> server_update = MagicMock()
1108 >>> server_update.id = None # No UUID change
1109 >>> server_update.name = None # No name change
1110 >>> server_update.description = None
1111 >>> server_update.icon = None
1112 >>> server_update.visibility = None
1113 >>> server_update.team_id = None
1114 >>> import asyncio
1115 >>> with patch('mcpgateway.services.server_service.get_for_update', return_value=server):
1116 ... asyncio.run(service.update_server(db, 'server_id', server_update, 'user_email'))
1117 'server_read'
1118 """
1119 try:
1120 server = get_for_update(
1121 db,
1122 DbServer,
1123 server_id,
1124 options=[
1125 selectinload(DbServer.tools),
1126 selectinload(DbServer.resources),
1127 selectinload(DbServer.prompts),
1128 selectinload(DbServer.a2a_agents),
1129 selectinload(DbServer.email_team),
1130 ],
1131 )
1132 if not server:
1133 raise ServerNotFoundError(f"Server not found: {server_id}")
1135 # Check ownership if user_email provided
1136 if user_email:
1137 # First-Party
1138 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
1140 permission_service = PermissionService(db)
1141 if not await permission_service.check_resource_ownership(user_email, server):
1142 raise PermissionError("Only the owner can update this server")
1144 # Check for name conflict if name is being changed and visibility is public
1145 if server_update.name and server_update.name != server.name:
1146 visibility = server_update.visibility or server.visibility
1147 team_id = server_update.team_id or server.team_id
1148 if visibility.lower() == "public":
1149 # Check for existing public server with the same name
1150 existing_server = get_for_update(db, DbServer, where=and_(DbServer.name == server_update.name, DbServer.visibility == "public", DbServer.id != server.id))
1151 if existing_server:
1152 raise ServerNameConflictError(server_update.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility)
1153 elif visibility.lower() == "team" and team_id:
1154 # Check for existing team server with the same name
1155 existing_server = get_for_update(
1156 db, DbServer, where=and_(DbServer.name == server_update.name, DbServer.visibility == "team", DbServer.team_id == team_id, DbServer.id != server.id)
1157 )
1158 if existing_server:
1159 raise ServerNameConflictError(server_update.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility)
1161 # Update simple fields
1162 if server_update.id is not None and server_update.id != server.id:
1163 # Check if the new UUID is already in use
1164 existing = db.get(DbServer, server_update.id)
1165 if existing:
1166 raise ServerError(f"Server with ID {server_update.id} already exists")
1167 server.id = server_update.id
1168 if server_update.name is not None:
1169 server.name = server_update.name
1170 if server_update.description is not None:
1171 server.description = server_update.description
1172 if server_update.icon is not None:
1173 server.icon = server_update.icon
1175 if server_update.visibility is not None:
1176 new_visibility = server_update.visibility
1178 # Validate visibility transitions
1179 if new_visibility == "team":
1180 target_team_id = server_update.team_id if server_update.team_id is not None else server.team_id
1181 _validate_server_team_assignment(db, user_email, target_team_id)
1183 elif new_visibility == "public":
1184 # Optional: Check if user has permission to make resources public
1185 # This could be a platform-level permission
1186 pass
1188 server.visibility = new_visibility
1190 if server_update.team_id is not None:
1191 if server_update.team_id != server.team_id:
1192 _validate_server_team_assignment(db, user_email, server_update.team_id)
1193 server.team_id = server_update.team_id
1195 # Update associated tools if provided using bulk query
1196 if server_update.associated_tools is not None:
1197 server.tools = []
1198 if server_update.associated_tools:
1199 tool_ids = [tool_id for tool_id in server_update.associated_tools if tool_id]
1200 if tool_ids:
1201 tools = db.execute(select(DbTool).where(DbTool.id.in_(tool_ids))).scalars().all()
1202 server.tools = list(tools)
1204 # Update associated resources if provided using bulk query
1205 if server_update.associated_resources is not None:
1206 server.resources = []
1207 if server_update.associated_resources:
1208 resource_ids = [resource_id for resource_id in server_update.associated_resources if resource_id]
1209 if resource_ids:
1210 resources = db.execute(select(DbResource).where(DbResource.id.in_(resource_ids))).scalars().all()
1211 server.resources = list(resources)
1213 # Update associated prompts if provided using bulk query
1214 if server_update.associated_prompts is not None:
1215 server.prompts = []
1216 if server_update.associated_prompts:
1217 prompt_ids = [prompt_id for prompt_id in server_update.associated_prompts if prompt_id]
1218 if prompt_ids:
1219 prompts = db.execute(select(DbPrompt).where(DbPrompt.id.in_(prompt_ids))).scalars().all()
1220 server.prompts = list(prompts)
1222 # Update tags if provided
1223 if server_update.tags is not None:
1224 server.tags = server_update.tags
1226 # Update OAuth 2.0 configuration if provided
1227 # Track if OAuth is being explicitly disabled to prevent config re-assignment
1228 oauth_being_disabled = server_update.oauth_enabled is not None and not server_update.oauth_enabled
1230 if server_update.oauth_enabled is not None:
1231 server.oauth_enabled = server_update.oauth_enabled
1232 # If OAuth is being disabled, clear the config
1233 if oauth_being_disabled:
1234 server.oauth_config = None
1236 # Only update oauth_config if OAuth is not being explicitly disabled
1237 # This prevents the case where oauth_enabled=False and oauth_config are both provided
1238 if not oauth_being_disabled:
1239 if hasattr(server_update, "model_fields_set") and "oauth_config" in server_update.model_fields_set:
1240 server.oauth_config = await protect_oauth_config_for_storage(server_update.oauth_config, existing_oauth_config=server.oauth_config)
1241 elif server_update.oauth_config is not None:
1242 server.oauth_config = await protect_oauth_config_for_storage(server_update.oauth_config, existing_oauth_config=server.oauth_config)
1244 # Update metadata fields
1245 server.updated_at = datetime.now(timezone.utc)
1246 if modified_by:
1247 server.modified_by = modified_by
1248 if modified_from_ip:
1249 server.modified_from_ip = modified_from_ip
1250 if modified_via:
1251 server.modified_via = modified_via
1252 if modified_user_agent:
1253 server.modified_user_agent = modified_user_agent
1254 if hasattr(server, "version") and server.version is not None:
1255 server.version = server.version + 1
1256 else:
1257 server.version = 1
1259 db.commit()
1260 db.refresh(server)
1261 # Force loading relationships
1262 _ = server.tools, server.resources, server.prompts
1264 # Invalidate cache after successful update
1265 cache = _get_registry_cache()
1266 await cache.invalidate_servers()
1267 # Also invalidate tags cache since server tags may have changed
1268 # First-Party
1269 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
1271 await admin_stats_cache.invalidate_tags()
1273 await self._notify_server_updated(server)
1274 logger.info(f"Updated server: {server.name}")
1276 # Structured logging: Audit trail for server update
1277 changes = []
1278 if server_update.name:
1279 changes.append(f"name: {server_update.name}")
1280 if server_update.visibility:
1281 changes.append(f"visibility: {server_update.visibility}")
1282 if server_update.team_id:
1283 changes.append(f"team_id: {server_update.team_id}")
1285 self._audit_trail.log_action(
1286 user_id=user_email or "system",
1287 action="update_server",
1288 resource_type="server",
1289 resource_id=server.id,
1290 details={
1291 "server_name": server.name,
1292 "changes": ", ".join(changes) if changes else "metadata only",
1293 "version": server.version,
1294 },
1295 metadata={
1296 "modified_from_ip": modified_from_ip,
1297 "modified_via": modified_via,
1298 "modified_user_agent": modified_user_agent,
1299 },
1300 )
1302 # Structured logging: Log successful server update
1303 self._structured_logger.log(
1304 level="INFO",
1305 message="Server updated successfully",
1306 event_type="server_updated",
1307 component="server_service",
1308 server_id=server.id,
1309 server_name=server.name,
1310 modified_by=user_email,
1311 user_email=user_email,
1312 )
1314 # Build a dictionary with associated IDs
1315 # Team name is loaded via server.team property from email_team relationship
1316 server_data = {
1317 "id": server.id,
1318 "name": server.name,
1319 "description": server.description,
1320 "icon": server.icon,
1321 "team": server.team,
1322 "created_at": server.created_at,
1323 "updated_at": server.updated_at,
1324 "enabled": server.enabled,
1325 "associated_tools": [tool.id for tool in server.tools],
1326 "associated_resources": [res.id for res in server.resources],
1327 "associated_prompts": [prompt.id for prompt in server.prompts],
1328 }
1329 logger.debug(f"Server Data: {server_data}")
1330 return self.convert_server_to_read(server)
1331 except IntegrityError as ie:
1332 db.rollback()
1333 logger.error(f"IntegrityErrors in group: {ie}")
1335 # Structured logging: Log database integrity error
1336 self._structured_logger.log(
1337 level="ERROR",
1338 message="Server update failed due to database integrity error",
1339 event_type="server_update_failed",
1340 component="server_service",
1341 server_id=server_id,
1342 error_type="IntegrityError",
1343 error_message=str(ie),
1344 modified_by=user_email,
1345 user_email=user_email,
1346 )
1347 raise ie
1348 except ServerNameConflictError as snce:
1349 db.rollback()
1350 logger.error(f"Server name conflict: {snce}")
1352 # Structured logging: Log name conflict error
1353 self._structured_logger.log(
1354 level="WARNING",
1355 message="Server update failed due to name conflict",
1356 event_type="server_name_conflict",
1357 component="server_service",
1358 server_id=server_id,
1359 modified_by=user_email,
1360 user_email=user_email,
1361 )
1362 raise snce
1363 except Exception as e:
1364 db.rollback()
1366 # Structured logging: Log generic server update failure
1367 self._structured_logger.log(
1368 level="ERROR",
1369 message="Server update failed",
1370 event_type="server_update_failed",
1371 component="server_service",
1372 server_id=server_id,
1373 error_type=type(e).__name__,
1374 error_message=str(e),
1375 modified_by=user_email,
1376 user_email=user_email,
1377 )
1378 raise ServerError(f"Failed to update server: {str(e)}")
1380 async def set_server_state(self, db: Session, server_id: str, activate: bool, user_email: Optional[str] = None) -> ServerRead:
1381 """Set the activation status of a server.
1383 Args:
1384 db: Database session.
1385 server_id: The unique identifier of the server.
1386 activate: True to activate, False to deactivate.
1387 user_email: Optional[str] The email of the user to check if the user has permission to modify.
1389 Returns:
1390 The updated ServerRead object.
1392 Raises:
1393 ServerNotFoundError: If the server is not found.
1394 ServerLockConflictError: If the server row is locked by another transaction.
1395 ServerError: For other errors.
1396 PermissionError: If user doesn't own the agent.
1398 Examples:
1399 >>> from mcpgateway.services.server_service import ServerService
1400 >>> from unittest.mock import MagicMock, AsyncMock, patch
1401 >>> from mcpgateway.schemas import ServerRead
1402 >>> service = ServerService()
1403 >>> db = MagicMock()
1404 >>> server = MagicMock()
1405 >>> db.get.return_value = server
1406 >>> db.commit = MagicMock()
1407 >>> db.refresh = MagicMock()
1408 >>> service._notify_server_activated = AsyncMock()
1409 >>> service._notify_server_deactivated = AsyncMock()
1410 >>> service.convert_server_to_read = MagicMock(return_value='server_read')
1411 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes
1412 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes
1413 >>> ServerRead.model_validate = MagicMock(return_value='server_read')
1414 >>> import asyncio
1415 >>> asyncio.run(service.set_server_state(db, 'server_id', True))
1416 'server_read'
1417 """
1418 try:
1419 # Use nowait=True to fail fast if row is locked, preventing lock contention under high load
1420 try:
1421 server = get_for_update(
1422 db,
1423 DbServer,
1424 server_id,
1425 nowait=True,
1426 options=[
1427 selectinload(DbServer.tools),
1428 selectinload(DbServer.resources),
1429 selectinload(DbServer.prompts),
1430 selectinload(DbServer.a2a_agents),
1431 selectinload(DbServer.email_team),
1432 ],
1433 )
1434 except OperationalError as lock_err:
1435 # Row is locked by another transaction - fail fast with 409
1436 db.rollback()
1437 raise ServerLockConflictError(f"Server {server_id} is currently being modified by another request") from lock_err
1438 if not server:
1439 raise ServerNotFoundError(f"Server not found: {server_id}")
1441 if user_email:
1442 # First-Party
1443 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
1445 permission_service = PermissionService(db)
1446 if not await permission_service.check_resource_ownership(user_email, server):
1447 raise PermissionError("Only the owner can activate the Server" if activate else "Only the owner can deactivate the Server")
1449 if server.enabled != activate:
1450 server.enabled = activate
1451 server.updated_at = datetime.now(timezone.utc)
1452 db.commit()
1453 db.refresh(server)
1455 # Invalidate cache after status change
1456 cache = _get_registry_cache()
1457 await cache.invalidate_servers()
1459 if activate:
1460 await self._notify_server_activated(server)
1461 else:
1462 await self._notify_server_deactivated(server)
1463 logger.info(f"Server {server.name} {'activated' if activate else 'deactivated'}")
1465 # Structured logging: Audit trail for server state change
1466 self._audit_trail.log_action(
1467 user_id=user_email or "system",
1468 action="activate_server" if activate else "deactivate_server",
1469 resource_type="server",
1470 resource_id=server.id,
1471 details={
1472 "server_name": server.name,
1473 "new_status": "active" if activate else "inactive",
1474 },
1475 )
1477 # Structured logging: Log server status change
1478 self._structured_logger.log(
1479 level="INFO",
1480 message=f"Server {'activated' if activate else 'deactivated'}",
1481 event_type="server_status_changed",
1482 component="server_service",
1483 server_id=server.id,
1484 server_name=server.name,
1485 new_status="active" if activate else "inactive",
1486 changed_by=user_email,
1487 user_email=user_email,
1488 )
1490 # Team name is loaded via server.team property from email_team relationship
1491 server_data = {
1492 "id": server.id,
1493 "name": server.name,
1494 "description": server.description,
1495 "icon": server.icon,
1496 "team": server.team,
1497 "created_at": server.created_at,
1498 "updated_at": server.updated_at,
1499 "enabled": server.enabled,
1500 "associated_tools": [tool.id for tool in server.tools],
1501 "associated_resources": [res.id for res in server.resources],
1502 "associated_prompts": [prompt.id for prompt in server.prompts],
1503 }
1504 logger.info(f"Server Data: {server_data}")
1505 return self.convert_server_to_read(server)
1506 except PermissionError as e:
1507 # Structured logging: Log permission error
1508 self._structured_logger.log(
1509 level="WARNING",
1510 message="Server state change failed due to insufficient permissions",
1511 event_type="server_state_change_permission_denied",
1512 component="server_service",
1513 server_id=server_id,
1514 user_email=user_email,
1515 )
1516 raise e
1517 except ServerLockConflictError:
1518 # Re-raise lock conflicts without wrapping - allows 409 response
1519 raise
1520 except ServerNotFoundError:
1521 # Re-raise not found without wrapping - allows 404 response
1522 raise
1523 except Exception as e:
1524 db.rollback()
1526 # Structured logging: Log generic server state change failure
1527 self._structured_logger.log(
1528 level="ERROR",
1529 message="Server state change failed",
1530 event_type="server_state_change_failed",
1531 component="server_service",
1532 server_id=server_id,
1533 error_type=type(e).__name__,
1534 error_message=str(e),
1535 user_email=user_email,
1536 )
1537 raise ServerError(f"Failed to set server state: {str(e)}")
1539 async def delete_server(self, db: Session, server_id: str, user_email: Optional[str] = None, purge_metrics: bool = False) -> None:
1540 """Permanently delete a server.
1542 Args:
1543 db: Database session.
1544 server_id: The unique identifier of the server.
1545 user_email: Email of user performing deletion (for ownership check).
1546 purge_metrics: If True, delete raw + rollup metrics for this server.
1548 Raises:
1549 ServerNotFoundError: If the server is not found.
1550 PermissionError: If user doesn't own the server.
1551 ServerError: For other deletion errors.
1553 Examples:
1554 >>> from mcpgateway.services.server_service import ServerService
1555 >>> from unittest.mock import MagicMock, AsyncMock, patch
1556 >>> service = ServerService()
1557 >>> db = MagicMock()
1558 >>> server = MagicMock()
1559 >>> db.get.return_value = server
1560 >>> db.delete = MagicMock()
1561 >>> db.commit = MagicMock()
1562 >>> service._notify_server_deleted = AsyncMock()
1563 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes
1564 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes
1565 >>> import asyncio
1566 >>> asyncio.run(service.delete_server(db, 'server_id', 'user@example.com'))
1567 """
1568 try:
1569 server = db.get(DbServer, server_id)
1570 if not server:
1571 raise ServerNotFoundError(f"Server not found: {server_id}")
1573 # Check ownership if user_email provided
1574 if user_email:
1575 # First-Party
1576 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
1578 permission_service = PermissionService(db)
1579 if not await permission_service.check_resource_ownership(user_email, server):
1580 raise PermissionError("Only the owner can delete this server")
1582 server_info = {"id": server.id, "name": server.name}
1583 if purge_metrics:
1584 with pause_rollup_during_purge(reason=f"purge_server:{server_id}"):
1585 delete_metrics_in_batches(db, ServerMetric, ServerMetric.server_id, server_id)
1586 delete_metrics_in_batches(db, ServerMetricsHourly, ServerMetricsHourly.server_id, server_id)
1587 db.delete(server)
1588 db.commit()
1590 # Invalidate cache after successful deletion
1591 cache = _get_registry_cache()
1592 await cache.invalidate_servers()
1593 # Also invalidate tags cache since server tags may have changed
1594 # First-Party
1595 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
1597 await admin_stats_cache.invalidate_tags()
1598 # First-Party
1599 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
1601 metrics_cache.invalidate_prefix("top_servers:")
1602 metrics_cache.invalidate("servers")
1604 await self._notify_server_deleted(server_info)
1605 logger.info(f"Deleted server: {server_info['name']}")
1607 # Structured logging: Audit trail for server deletion
1608 self._audit_trail.log_action(
1609 user_id=user_email or "system",
1610 action="delete_server",
1611 resource_type="server",
1612 resource_id=server_info["id"],
1613 details={
1614 "server_name": server_info["name"],
1615 },
1616 )
1618 # Structured logging: Log successful server deletion
1619 self._structured_logger.log(
1620 level="INFO",
1621 message="Server deleted successfully",
1622 event_type="server_deleted",
1623 component="server_service",
1624 server_id=server_info["id"],
1625 server_name=server_info["name"],
1626 deleted_by=user_email,
1627 user_email=user_email,
1628 purge_metrics=purge_metrics,
1629 )
1630 except PermissionError as pe:
1631 db.rollback()
1633 # Structured logging: Log permission error
1634 self._structured_logger.log(
1635 level="WARNING",
1636 message="Server deletion failed due to insufficient permissions",
1637 event_type="server_deletion_permission_denied",
1638 component="server_service",
1639 server_id=server_id,
1640 user_email=user_email,
1641 )
1642 raise pe
1643 except Exception as e:
1644 db.rollback()
1646 # Structured logging: Log generic server deletion failure
1647 self._structured_logger.log(
1648 level="ERROR",
1649 message="Server deletion failed",
1650 event_type="server_deletion_failed",
1651 component="server_service",
1652 server_id=server_id,
1653 error_type=type(e).__name__,
1654 error_message=str(e),
1655 user_email=user_email,
1656 )
1657 raise ServerError(f"Failed to delete server: {str(e)}")
1659 async def _publish_event(self, event: Dict[str, Any]) -> None:
1660 """
1661 Publish an event to all subscribed queues.
1663 Args:
1664 event: Event to publish
1665 """
1666 for queue in self._event_subscribers:
1667 await queue.put(event)
1669 async def subscribe_events(self) -> AsyncGenerator[Dict[str, Any], None]:
1670 """Subscribe to server events.
1672 Yields:
1673 Server event messages.
1674 """
1675 queue: asyncio.Queue = asyncio.Queue()
1676 self._event_subscribers.append(queue)
1677 try:
1678 while True:
1679 event = await queue.get()
1680 yield event
1681 finally:
1682 self._event_subscribers.remove(queue)
1684 async def _notify_server_added(self, server: DbServer) -> None:
1685 """
1686 Notify subscribers that a new server has been added.
1688 Args:
1689 server: Server to add
1690 """
1691 associated_tools = [tool.id for tool in server.tools] if server.tools else []
1692 associated_resources = [res.id for res in server.resources] if server.resources else []
1693 associated_prompts = [prompt.id for prompt in server.prompts] if server.prompts else []
1694 event = {
1695 "type": "server_added",
1696 "data": {
1697 "id": server.id,
1698 "name": server.name,
1699 "description": server.description,
1700 "icon": server.icon,
1701 "associated_tools": associated_tools,
1702 "associated_resources": associated_resources,
1703 "associated_prompts": associated_prompts,
1704 "enabled": server.enabled,
1705 },
1706 "timestamp": datetime.now(timezone.utc).isoformat(),
1707 }
1708 await self._publish_event(event)
1710 async def _notify_server_updated(self, server: DbServer) -> None:
1711 """
1712 Notify subscribers that a server has been updated.
1714 Args:
1715 server: Server to update
1716 """
1717 associated_tools = [tool.id for tool in server.tools] if server.tools else []
1718 associated_resources = [res.id for res in server.resources] if server.resources else []
1719 associated_prompts = [prompt.id for prompt in server.prompts] if server.prompts else []
1720 event = {
1721 "type": "server_updated",
1722 "data": {
1723 "id": server.id,
1724 "name": server.name,
1725 "description": server.description,
1726 "icon": server.icon,
1727 "associated_tools": associated_tools,
1728 "associated_resources": associated_resources,
1729 "associated_prompts": associated_prompts,
1730 "enabled": server.enabled,
1731 },
1732 "timestamp": datetime.now(timezone.utc).isoformat(),
1733 }
1734 await self._publish_event(event)
1736 async def _notify_server_activated(self, server: DbServer) -> None:
1737 """
1738 Notify subscribers that a server has been activated.
1740 Args:
1741 server: Server to activate
1742 """
1743 event = {
1744 "type": "server_activated",
1745 "data": {
1746 "id": server.id,
1747 "name": server.name,
1748 "enabled": True,
1749 },
1750 "timestamp": datetime.now(timezone.utc).isoformat(),
1751 }
1752 await self._publish_event(event)
1754 async def _notify_server_deactivated(self, server: DbServer) -> None:
1755 """
1756 Notify subscribers that a server has been deactivated.
1758 Args:
1759 server: Server to deactivate
1760 """
1761 event = {
1762 "type": "server_deactivated",
1763 "data": {
1764 "id": server.id,
1765 "name": server.name,
1766 "enabled": False,
1767 },
1768 "timestamp": datetime.now(timezone.utc).isoformat(),
1769 }
1770 await self._publish_event(event)
1772 async def _notify_server_deleted(self, server_info: Dict[str, Any]) -> None:
1773 """
1774 Notify subscribers that a server has been deleted.
1776 Args:
1777 server_info: Dictionary on server to be deleted
1778 """
1779 event = {
1780 "type": "server_deleted",
1781 "data": server_info,
1782 "timestamp": datetime.now(timezone.utc).isoformat(),
1783 }
1784 await self._publish_event(event)
1786 # --- Metrics ---
1787 async def aggregate_metrics(self, db: Session) -> ServerMetrics:
1788 """
1789 Aggregate metrics for all server invocations across all servers.
1791 Combines recent raw metrics (within retention period) with historical
1792 hourly rollups for complete historical coverage. Uses in-memory caching
1793 (10s TTL) to reduce database load under high request rates.
1795 Args:
1796 db: Database session
1798 Returns:
1799 ServerMetrics: Aggregated metrics from raw + hourly rollup tables.
1801 Examples:
1802 >>> from mcpgateway.services.server_service import ServerService
1803 >>> service = ServerService()
1804 >>> # Method exists and is callable
1805 >>> callable(service.aggregate_metrics)
1806 True
1807 """
1808 # Check cache first (if enabled)
1809 # First-Party
1810 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel
1812 if is_cache_enabled():
1813 cached = metrics_cache.get("servers")
1814 if cached is not None:
1815 return ServerMetrics(**cached)
1817 # Use combined raw + rollup query for full historical coverage
1818 # First-Party
1819 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel
1821 result = aggregate_metrics_combined(db, "server")
1823 metrics = ServerMetrics(
1824 total_executions=result.total_executions,
1825 successful_executions=result.successful_executions,
1826 failed_executions=result.failed_executions,
1827 failure_rate=result.failure_rate,
1828 min_response_time=result.min_response_time,
1829 max_response_time=result.max_response_time,
1830 avg_response_time=result.avg_response_time,
1831 last_execution_time=result.last_execution_time,
1832 )
1834 # Cache the result as dict for serialization compatibility (if enabled)
1835 if is_cache_enabled():
1836 metrics_cache.set("servers", metrics.model_dump())
1838 return metrics
1840 async def reset_metrics(self, db: Session) -> None:
1841 """
1842 Reset all server metrics by deleting raw and hourly rollup records.
1844 Args:
1845 db: Database session
1847 Examples:
1848 >>> from mcpgateway.services.server_service import ServerService
1849 >>> from unittest.mock import MagicMock
1850 >>> service = ServerService()
1851 >>> db = MagicMock()
1852 >>> db.execute = MagicMock()
1853 >>> db.commit = MagicMock()
1854 >>> import asyncio
1855 >>> asyncio.run(service.reset_metrics(db))
1856 """
1857 db.execute(delete(ServerMetric))
1858 db.execute(delete(ServerMetricsHourly))
1859 db.commit()
1861 # Invalidate metrics cache
1862 # First-Party
1863 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
1865 metrics_cache.invalidate("servers")
1866 metrics_cache.invalidate_prefix("top_servers:")
1868 def get_oauth_protected_resource_metadata(self, db: Session, server_id: str, resource_base_url: str) -> Dict[str, Any]:
1869 """
1870 Get RFC 9728 OAuth 2.0 Protected Resource Metadata for a server.
1872 This method retrieves the OAuth configuration for a server and formats it
1873 according to RFC 9728 Protected Resource Metadata specification, enabling
1874 MCP clients to discover OAuth authorization servers for browser-based SSO.
1876 Args:
1877 db: Database session.
1878 server_id: The ID of the server.
1879 resource_base_url: The base URL for the resource (e.g., "https://gateway.example.com/servers/abc123/mcp").
1881 Returns:
1882 Dict containing RFC 9728 Protected Resource Metadata:
1883 - resource: The protected resource identifier (URL with /mcp suffix)
1884 - authorization_servers: JSON array of authorization server issuer URIs (RFC 9728 Section 2)
1885 - bearer_methods_supported: Supported bearer token methods (always ["header"])
1886 - scopes_supported: Optional list of supported scopes
1888 Raises:
1889 ServerNotFoundError: If server doesn't exist, is disabled, or is non-public.
1890 ServerError: If OAuth is not enabled or not properly configured.
1892 Examples:
1893 >>> from mcpgateway.services.server_service import ServerService
1894 >>> service = ServerService()
1895 >>> # Method exists and is callable
1896 >>> callable(service.get_oauth_protected_resource_metadata)
1897 True
1898 """
1899 server = db.get(DbServer, server_id)
1901 # Return not found for non-existent, disabled, or non-public servers
1902 # (avoids leaking information about private/team servers)
1903 if not server:
1904 raise ServerNotFoundError(f"Server not found: {server_id}")
1906 if not server.enabled:
1907 raise ServerNotFoundError(f"Server not found: {server_id}")
1909 if getattr(server, "visibility", "public") != "public":
1910 raise ServerNotFoundError(f"Server not found: {server_id}")
1912 # Check OAuth configuration
1913 if not getattr(server, "oauth_enabled", False):
1914 raise ServerError(f"OAuth not enabled for server: {server_id}")
1916 oauth_config = getattr(server, "oauth_config", None)
1917 if not oauth_config:
1918 raise ServerError(f"OAuth not configured for server: {server_id}")
1920 # Extract authorization server(s) - support both list and single value in config
1921 authorization_servers = oauth_config.get("authorization_servers", [])
1922 if not authorization_servers:
1923 auth_server = oauth_config.get("authorization_server")
1924 if auth_server:
1925 authorization_servers = [auth_server] if isinstance(auth_server, str) else auth_server
1927 if not authorization_servers:
1928 raise ServerError(f"OAuth authorization_server not configured for server: {server_id}")
1930 # Build RFC 9728 Protected Resource Metadata response
1931 response_data: Dict[str, Any] = {
1932 "resource": resource_base_url,
1933 "authorization_servers": authorization_servers,
1934 "bearer_methods_supported": ["header"],
1935 }
1937 # Add optional scopes if configured (never include secrets from oauth_config)
1938 scopes = oauth_config.get("scopes_supported") or oauth_config.get("scopes")
1939 if scopes:
1940 response_data["scopes_supported"] = scopes
1942 logger.debug(f"Returning OAuth protected resource metadata for server {server_id}")
1943 return response_data
1946# Lazy singleton - created on first access, not at module import time.
1947# This avoids instantiation when only exception classes are imported.
1948_server_service_instance = None # pylint: disable=invalid-name
1951def __getattr__(name: str):
1952 """Module-level __getattr__ for lazy singleton creation.
1954 Args:
1955 name: The attribute name being accessed.
1957 Returns:
1958 The server_service singleton instance if name is "server_service".
1960 Raises:
1961 AttributeError: If the attribute name is not "server_service".
1962 """
1963 global _server_service_instance # pylint: disable=global-statement
1964 if name == "server_service":
1965 if _server_service_instance is None:
1966 _server_service_instance = ServerService()
1967 return _server_service_instance
1968 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")