Coverage for mcpgateway / services / server_service.py: 100%
591 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/server_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7ContextForge Server Service
9This module implements server management for the MCP Servers Catalog.
10It handles server registration, listing, retrieval, updates, activation toggling, and deletion.
11It also publishes event notifications for server changes.
12"""
14# Standard
15import asyncio
16import binascii
17from datetime import datetime, timezone
18from typing import Any, AsyncGenerator, Dict, List, Optional, Union
20# Third-Party
21import httpx
22from pydantic import ValidationError
23from sqlalchemy import and_, delete, desc, or_, select
24from sqlalchemy.exc import IntegrityError, OperationalError
25from sqlalchemy.orm import joinedload, selectinload, Session
27# First-Party
28from mcpgateway.config import settings
29from mcpgateway.db import A2AAgent as DbA2AAgent
30from mcpgateway.db import EmailTeam as DbEmailTeam
31from mcpgateway.db import EmailTeamMember as DbEmailTeamMember
32from mcpgateway.db import get_for_update
33from mcpgateway.db import Prompt as DbPrompt
34from mcpgateway.db import Resource as DbResource
35from mcpgateway.db import Server as DbServer
36from mcpgateway.db import ServerMetric, ServerMetricsHourly
37from mcpgateway.db import Tool as DbTool
38from mcpgateway.schemas import ServerCreate, ServerMetrics, ServerRead, ServerUpdate, TopPerformer
39from mcpgateway.services.audit_trail_service import get_audit_trail_service
40from mcpgateway.services.base_service import BaseService
41from mcpgateway.services.encryption_service import protect_oauth_config_for_storage
42from mcpgateway.services.logging_service import LoggingService
43from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge
44from mcpgateway.services.performance_tracker import get_performance_tracker
45from mcpgateway.services.structured_logger import get_structured_logger
46from mcpgateway.services.team_management_service import TeamManagementService
47from mcpgateway.utils.metrics_common import build_top_performers
48from mcpgateway.utils.pagination import unified_paginate
49from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr
51# Cache import (lazy to avoid circular dependencies)
52_REGISTRY_CACHE = None
55def _get_registry_cache():
56 """Get registry cache singleton lazily.
58 Returns:
59 RegistryCache instance.
60 """
61 global _REGISTRY_CACHE # pylint: disable=global-statement
62 if _REGISTRY_CACHE is None:
63 # First-Party
64 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel
66 _REGISTRY_CACHE = registry_cache
67 return _REGISTRY_CACHE
70def _validate_server_team_assignment(db: Session, user_email: Optional[str], target_team_id: Optional[str]) -> None:
71 """Validate team assignment and ownership requirements for server updates.
73 Args:
74 db: Database session used for membership checks.
75 user_email: Requesting user email. When omitted, ownership checks are skipped
76 for system/internal update paths.
77 target_team_id: Team identifier to validate.
79 Raises:
80 ValueError: If team ID is missing, team does not exist, or caller is not
81 an active team owner.
82 """
83 if not target_team_id:
84 raise ValueError("Cannot set visibility to 'team' without a team_id")
86 team = db.query(DbEmailTeam).filter(DbEmailTeam.id == target_team_id).first()
87 if not team:
88 raise ValueError(f"Team {target_team_id} not found")
90 # Preserve existing behavior for system/internal updates where
91 # user context may be intentionally omitted.
92 if not user_email:
93 return
95 membership = (
96 db.query(DbEmailTeamMember)
97 .filter(DbEmailTeamMember.team_id == target_team_id, DbEmailTeamMember.user_email == user_email, DbEmailTeamMember.is_active, DbEmailTeamMember.role == "owner")
98 .first()
99 )
100 if not membership:
101 raise ValueError("User membership in team not sufficient for this update.")
104# Initialize logging service first
105logging_service = LoggingService()
106logger = logging_service.get_logger(__name__)
109class ServerError(Exception):
110 """Base class for server-related errors."""
113class ServerNotFoundError(ServerError):
114 """Raised when a requested server is not found."""
117class ServerLockConflictError(ServerError):
118 """Raised when a server row is locked by another transaction."""
121class ServerNameConflictError(ServerError):
122 """Raised when a server name conflicts with an existing one."""
124 def __init__(self, name: str, enabled: bool = True, server_id: Optional[str] = None, visibility: str = "public") -> None:
125 """
126 Initialize a ServerNameConflictError exception.
128 This exception indicates a server name conflict, with additional context about visibility,
129 whether the conflicting server is active, and its ID if known. The error message starts
130 with the visibility information.
132 Visibility rules:
133 - public: Restricts server names globally (across all teams).
134 - team: Restricts server names only within the same team.
136 Args:
137 name: The server name that caused the conflict.
138 enabled: Whether the conflicting server is currently active. Defaults to True.
139 server_id: The ID of the conflicting server, if known. Only included in message for inactive servers.
140 visibility: The visibility of the conflicting server (e.g., "public", "private", "team").
142 Examples:
143 >>> error = ServerNameConflictError("My Server")
144 >>> str(error)
145 'Public Server already exists with name: My Server'
146 >>> error = ServerNameConflictError("My Server", enabled=False, server_id=123)
147 >>> str(error)
148 'Public Server already exists with name: My Server (currently inactive, ID: 123)'
149 >>> error.enabled
150 False
151 >>> error.server_id
152 123
153 >>> error = ServerNameConflictError("My Server", enabled=False, visibility="team")
154 >>> str(error)
155 'Team Server already exists with name: My Server (currently inactive, ID: None)'
156 >>> error.enabled
157 False
158 >>> error.server_id is None
159 True
160 """
161 self.name = name
162 self.enabled = enabled
163 self.server_id = server_id
164 message = f"{visibility.capitalize()} Server already exists with name: {name}"
165 if not enabled:
166 message += f" (currently inactive, ID: {server_id})"
167 super().__init__(message)
170class ServerService(BaseService):
171 """Service for managing MCP Servers in the catalog.
173 Provides methods to create, list, retrieve, update, set state, and delete server records.
174 Also supports event notifications for changes in server data.
175 """
177 _visibility_model_cls = DbServer
179 def __init__(self) -> None:
180 """Initialize a new ServerService instance.
182 Sets up the service with:
183 - An empty list for event subscribers that will receive server change notifications
184 - An HTTP client configured with timeout and SSL verification settings from config
186 The HTTP client is used for health checks and other server-related HTTP operations.
187 Event subscribers can register to receive notifications about server additions,
188 updates, activations, deactivations, and deletions.
190 Examples:
191 >>> from mcpgateway.services.server_service import ServerService
192 >>> service = ServerService()
193 >>> isinstance(service._event_subscribers, list)
194 True
195 >>> len(service._event_subscribers)
196 0
197 >>> hasattr(service, '_http_client')
198 True
199 """
200 self._event_subscribers: List[asyncio.Queue] = []
201 self._http_client = httpx.AsyncClient(
202 timeout=settings.federation_timeout,
203 verify=not settings.skip_ssl_verify,
204 limits=httpx.Limits(
205 max_connections=settings.httpx_max_connections,
206 max_keepalive_connections=settings.httpx_max_keepalive_connections,
207 keepalive_expiry=settings.httpx_keepalive_expiry,
208 ),
209 )
210 self._structured_logger = get_structured_logger("server_service")
211 self._audit_trail = get_audit_trail_service()
212 self._performance_tracker = get_performance_tracker()
214 async def initialize(self) -> None:
215 """Initialize the server service."""
216 logger.info("Initializing server service")
218 async def shutdown(self) -> None:
219 """Shutdown the server service."""
220 await self._http_client.aclose()
221 logger.info("Server service shutdown complete")
223 # get_top_server
224 async def get_top_servers(self, db: Session, limit: Optional[int] = 5, include_deleted: bool = False) -> List[TopPerformer]:
225 """Retrieve the top-performing servers based on execution count.
227 Queries the database to get servers with their metrics, ordered by the number of executions
228 in descending order. Combines recent raw metrics with historical hourly rollups for complete
229 historical coverage. Returns a list of TopPerformer objects containing server details and
230 performance metrics. Results are cached for performance.
232 Args:
233 db (Session): Database session for querying server metrics.
234 limit (Optional[int]): Maximum number of servers to return. Defaults to 5.
235 include_deleted (bool): Whether to include deleted servers from rollups.
237 Returns:
238 List[TopPerformer]: A list of TopPerformer objects, each containing:
239 - id: Server ID.
240 - name: Server name.
241 - execution_count: Total number of executions.
242 - avg_response_time: Average response time in seconds, or None if no metrics.
243 - success_rate: Success rate percentage, or None if no metrics.
244 - last_execution: Timestamp of the last execution, or None if no metrics.
245 """
246 # Check cache first (if enabled)
247 # First-Party
248 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel
250 effective_limit = limit or 5
251 cache_key = f"top_servers:{effective_limit}:include_deleted={include_deleted}"
253 if is_cache_enabled():
254 cached = metrics_cache.get(cache_key)
255 if cached is not None:
256 return cached
258 # Use combined query that includes both raw metrics and rollup data
259 # First-Party
260 from mcpgateway.services.metrics_query_service import get_top_performers_combined # pylint: disable=import-outside-toplevel
262 results = get_top_performers_combined(
263 db=db,
264 metric_type="server",
265 entity_model=DbServer,
266 limit=effective_limit,
267 include_deleted=include_deleted,
268 )
269 top_performers = build_top_performers(results)
271 # Cache the result (if enabled)
272 if is_cache_enabled():
273 metrics_cache.set(cache_key, top_performers)
275 return top_performers
277 def convert_server_to_read(self, server: DbServer, include_metrics: bool = False) -> ServerRead:
278 """
279 Converts a DbServer instance into a ServerRead model, optionally including aggregated metrics.
281 Args:
282 server (DbServer): The ORM instance of the server.
283 include_metrics (bool): Whether to include metrics in the result. Defaults to False.
284 Set to False for list operations to avoid N+1 query issues.
286 Returns:
287 ServerRead: The Pydantic model representing the server, optionally including aggregated metrics.
289 Examples:
290 >>> from types import SimpleNamespace
291 >>> from datetime import datetime, timezone
292 >>> svc = ServerService()
293 >>> now = datetime.now(timezone.utc)
294 >>> # Fake metric objects
295 >>> m1 = SimpleNamespace(is_success=True, response_time=0.2, timestamp=now)
296 >>> m2 = SimpleNamespace(is_success=False, response_time=0.4, timestamp=now)
297 >>> server = SimpleNamespace(
298 ... id='s1', name='S', description=None, icon=None,
299 ... created_at=now, updated_at=now, enabled=True,
300 ... associated_tools=[], associated_resources=[], associated_prompts=[], associated_a2a_agents=[],
301 ... tags=[], metrics=[m1, m2],
302 ... tools=[], resources=[], prompts=[], a2a_agents=[],
303 ... team_id=None, owner_email=None, visibility=None,
304 ... created_by=None, modified_by=None
305 ... )
306 >>> result = svc.convert_server_to_read(server, include_metrics=True)
307 >>> result.metrics.total_executions
308 2
309 >>> result.metrics.successful_executions
310 1
311 """
312 # Build dict explicitly from attributes to ensure SQLAlchemy populates them
313 # (using __dict__.copy() can return empty dict with certain query patterns)
314 server_dict = {
315 "id": server.id,
316 "name": server.name,
317 "description": server.description,
318 "icon": server.icon,
319 "enabled": server.enabled,
320 "created_at": server.created_at,
321 "updated_at": server.updated_at,
322 "team_id": server.team_id,
323 "owner_email": server.owner_email,
324 "visibility": server.visibility,
325 "created_by": server.created_by,
326 "created_from_ip": getattr(server, "created_from_ip", None),
327 "created_via": getattr(server, "created_via", None),
328 "created_user_agent": getattr(server, "created_user_agent", None),
329 "modified_by": server.modified_by,
330 "modified_from_ip": getattr(server, "modified_from_ip", None),
331 "modified_via": getattr(server, "modified_via", None),
332 "modified_user_agent": getattr(server, "modified_user_agent", None),
333 "import_batch_id": getattr(server, "import_batch_id", None),
334 "federation_source": getattr(server, "federation_source", None),
335 "version": getattr(server, "version", None),
336 "tags": server.tags or [],
337 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata
338 "oauth_enabled": getattr(server, "oauth_enabled", False),
339 "oauth_config": getattr(server, "oauth_config", None),
340 }
342 # Compute aggregated metrics only if requested (avoids N+1 queries in list operations)
343 if include_metrics:
344 total = 0
345 successful = 0
346 failed = 0
347 min_rt = None
348 max_rt = None
349 sum_rt = 0.0
350 last_time = None
352 if hasattr(server, "metrics") and server.metrics:
353 for m in server.metrics:
354 total += 1
355 if m.is_success:
356 successful += 1
357 else:
358 failed += 1
360 # Track min/max response times
361 if min_rt is None or m.response_time < min_rt:
362 min_rt = m.response_time
363 if max_rt is None or m.response_time > max_rt:
364 max_rt = m.response_time
366 sum_rt += m.response_time
368 # Track last execution time
369 if last_time is None or m.timestamp > last_time:
370 last_time = m.timestamp
372 failure_rate = (failed / total) if total > 0 else 0.0
373 avg_rt = (sum_rt / total) if total > 0 else None
375 server_dict["metrics"] = {
376 "total_executions": total,
377 "successful_executions": successful,
378 "failed_executions": failed,
379 "failure_rate": failure_rate,
380 "min_response_time": min_rt,
381 "max_response_time": max_rt,
382 "avg_response_time": avg_rt,
383 "last_execution_time": last_time,
384 }
385 else:
386 server_dict["metrics"] = None
387 # Add associated IDs from relationships
388 server_dict["associated_tools"] = [tool.name for tool in server.tools] if server.tools else []
389 server_dict["associated_tool_ids"] = [str(tool.id) for tool in server.tools] if server.tools else []
390 server_dict["associated_resources"] = [res.id for res in server.resources] if server.resources else []
391 server_dict["associated_prompts"] = [prompt.id for prompt in server.prompts] if server.prompts else []
392 server_dict["associated_a2a_agents"] = [agent.id for agent in server.a2a_agents] if server.a2a_agents else []
394 # Team name is loaded via server.team property from email_team relationship
395 server_dict["team"] = getattr(server, "team", None)
397 return ServerRead.model_validate(server_dict).masked()
399 def _assemble_associated_items(
400 self,
401 tools: Optional[List[str]],
402 resources: Optional[List[str]],
403 prompts: Optional[List[str]],
404 a2a_agents: Optional[List[str]] = None,
405 gateways: Optional[List[str]] = None,
406 ) -> Dict[str, Any]:
407 """
408 Assemble the associated items dictionary from the separate fields.
410 Args:
411 tools: List of tool IDs.
412 resources: List of resource IDs.
413 prompts: List of prompt IDs.
414 a2a_agents: List of A2A agent IDs.
415 gateways: List of gateway IDs.
417 Returns:
418 A dictionary with keys "tools", "resources", "prompts", "a2a_agents", and "gateways".
420 Examples:
421 >>> service = ServerService()
422 >>> # Test with all None values
423 >>> result = service._assemble_associated_items(None, None, None)
424 >>> result
425 {'tools': [], 'resources': [], 'prompts': [], 'a2a_agents': [], 'gateways': []}
427 >>> # Test with empty lists
428 >>> result = service._assemble_associated_items([], [], [])
429 >>> result
430 {'tools': [], 'resources': [], 'prompts': [], 'a2a_agents': [], 'gateways': []}
432 >>> # Test with actual values
433 >>> result = service._assemble_associated_items(['tool1', 'tool2'], ['res1'], ['prompt1'])
434 >>> result
435 {'tools': ['tool1', 'tool2'], 'resources': ['res1'], 'prompts': ['prompt1'], 'a2a_agents': [], 'gateways': []}
437 >>> # Test with mixed None and values
438 >>> result = service._assemble_associated_items(['tool1'], None, ['prompt1'])
439 >>> result
440 {'tools': ['tool1'], 'resources': [], 'prompts': ['prompt1'], 'a2a_agents': [], 'gateways': []}
441 """
442 return {
443 "tools": tools or [],
444 "resources": resources or [],
445 "prompts": prompts or [],
446 "a2a_agents": a2a_agents or [],
447 "gateways": gateways or [],
448 }
450 async def register_server(
451 self,
452 db: Session,
453 server_in: ServerCreate,
454 created_by: Optional[str] = None,
455 created_from_ip: Optional[str] = None,
456 created_via: Optional[str] = None,
457 created_user_agent: Optional[str] = None,
458 team_id: Optional[str] = None,
459 owner_email: Optional[str] = None,
460 visibility: Optional[str] = "public",
461 ) -> ServerRead:
462 """
463 Register a new server in the catalog and validate that all associated items exist.
465 This function performs the following steps:
466 1. Checks if a server with the same name already exists.
467 2. Creates a new server record.
468 3. For each ID provided in associated_tools, associated_resources, and associated_prompts,
469 verifies that the corresponding item exists. If an item does not exist, an error is raised.
470 4. Associates the verified items to the new server.
471 5. Commits the transaction, refreshes the ORM instance, and forces the loading of relationship data.
472 6. Constructs a response dictionary that includes lists of associated item IDs.
473 7. Notifies subscribers of the addition and returns the validated response.
475 Args:
476 db (Session): The SQLAlchemy database session.
477 server_in (ServerCreate): The server creation schema containing server details and lists of
478 associated tool, resource, and prompt IDs (as strings).
479 created_by (Optional[str]): Email of the user creating the server, used for ownership tracking.
480 created_from_ip (Optional[str]): IP address from which the creation request originated.
481 created_via (Optional[str]): Source of creation (api, ui, import).
482 created_user_agent (Optional[str]): User agent string from the creation request.
483 team_id (Optional[str]): Team ID to assign the server to.
484 owner_email (Optional[str]): Email of the user who owns this server.
485 visibility (str): Server visibility level (private, team, public).
487 Returns:
488 ServerRead: The newly created server, with associated item IDs.
490 Raises:
491 IntegrityError: If a database integrity error occurs.
492 ServerNameConflictError: If a server name conflict occurs (public or team visibility).
493 ServerError: If any associated tool, resource, or prompt does not exist, or if any other registration error occurs.
495 Examples:
496 >>> from mcpgateway.services.server_service import ServerService
497 >>> from unittest.mock import MagicMock, AsyncMock, patch
498 >>> from mcpgateway.schemas import ServerRead
499 >>> service = ServerService()
500 >>> db = MagicMock()
501 >>> server_in = MagicMock()
502 >>> server_in.id = None # No custom UUID for this test
503 >>> db.execute.return_value.scalar_one_or_none.return_value = None
504 >>> db.add = MagicMock()
505 >>> db.commit = MagicMock()
506 >>> db.refresh = MagicMock()
507 >>> service._notify_server_added = AsyncMock()
508 >>> service.convert_server_to_read = MagicMock(return_value='server_read')
509 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes
510 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes
511 >>> ServerRead.model_validate = MagicMock(return_value='server_read')
512 >>> import asyncio
513 >>> asyncio.run(service.register_server(db, server_in))
514 'server_read'
515 """
516 try:
517 logger.info(f"Registering server: {server_in.name}")
518 oauth_config = await protect_oauth_config_for_storage(getattr(server_in, "oauth_config", None))
519 # # Create the new server record.
520 db_server = DbServer(
521 name=server_in.name,
522 description=server_in.description,
523 icon=server_in.icon,
524 enabled=True,
525 tags=server_in.tags or [],
526 # Team scoping fields - use schema values if provided, otherwise fallback to parameters
527 team_id=getattr(server_in, "team_id", None) or team_id,
528 owner_email=getattr(server_in, "owner_email", None) or owner_email or created_by,
529 # IMPORTANT: Prefer function parameter over schema default
530 # The API has visibility as a separate Body param that should override schema default
531 visibility=visibility or getattr(server_in, "visibility", None) or "public",
532 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata
533 oauth_enabled=getattr(server_in, "oauth_enabled", False) or False,
534 oauth_config=oauth_config,
535 # Metadata fields
536 created_by=created_by,
537 created_from_ip=created_from_ip,
538 created_via=created_via,
539 created_user_agent=created_user_agent,
540 version=1,
541 )
542 # Check for existing server with the same name (with row locking to prevent race conditions)
543 # The unique constraint is on (team_id, owner_email, name), so we check based on that
544 owner_email_to_check = getattr(server_in, "owner_email", None) or owner_email or created_by
545 team_id_to_check = getattr(server_in, "team_id", None) or team_id
547 # Build conditions based on the actual unique constraint: (team_id, owner_email, name)
548 conditions = [
549 DbServer.name == server_in.name,
550 DbServer.team_id == team_id_to_check if team_id_to_check else DbServer.team_id.is_(None),
551 DbServer.owner_email == owner_email_to_check if owner_email_to_check else DbServer.owner_email.is_(None),
552 ]
553 if server_in.id:
554 conditions.append(DbServer.id != server_in.id)
556 existing_server = get_for_update(db, DbServer, where=and_(*conditions))
557 if existing_server:
558 raise ServerNameConflictError(server_in.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility)
559 # Set custom UUID if provided
560 if server_in.id:
561 logger.info(f"Setting custom UUID for server: {server_in.id}")
562 db_server.id = server_in.id
563 logger.info(f"Adding server to DB session: {db_server.name}")
564 db.add(db_server)
566 # Associate tools, verifying each exists using bulk query when multiple items
567 if server_in.associated_tools:
568 tool_ids = [tool_id.strip() for tool_id in server_in.associated_tools if tool_id.strip()]
569 if len(tool_ids) > 1:
570 # Use bulk query for multiple items
571 tools = db.execute(select(DbTool).where(DbTool.id.in_(tool_ids))).scalars().all()
572 found_tool_ids = {tool.id for tool in tools}
573 missing_tool_ids = set(tool_ids) - found_tool_ids
574 if missing_tool_ids:
575 raise ServerError(f"Tools with ids {missing_tool_ids} do not exist.")
576 db_server.tools.extend(tools)
577 elif tool_ids:
578 # Use single query for single item (maintains test compatibility)
579 tool_obj = db.get(DbTool, tool_ids[0])
580 if not tool_obj:
581 raise ServerError(f"Tool with id {tool_ids[0]} does not exist.")
582 db_server.tools.append(tool_obj)
584 # Associate resources, verifying each exists using bulk query when multiple items
585 if server_in.associated_resources:
586 resource_ids = [resource_id.strip() for resource_id in server_in.associated_resources if resource_id.strip()]
587 if len(resource_ids) > 1:
588 # Use bulk query for multiple items
589 resources = db.execute(select(DbResource).where(DbResource.id.in_(resource_ids))).scalars().all()
590 found_resource_ids = {resource.id for resource in resources}
591 missing_resource_ids = set(resource_ids) - found_resource_ids
592 if missing_resource_ids:
593 raise ServerError(f"Resources with ids {missing_resource_ids} do not exist.")
594 db_server.resources.extend(resources)
595 elif resource_ids:
596 # Use single query for single item (maintains test compatibility)
597 resource_obj = db.get(DbResource, resource_ids[0])
598 if not resource_obj:
599 raise ServerError(f"Resource with id {resource_ids[0]} does not exist.")
600 db_server.resources.append(resource_obj)
602 # Associate prompts, verifying each exists using bulk query when multiple items
603 if server_in.associated_prompts:
604 prompt_ids = [prompt_id.strip() for prompt_id in server_in.associated_prompts if prompt_id.strip()]
605 if len(prompt_ids) > 1:
606 # Use bulk query for multiple items
607 prompts = db.execute(select(DbPrompt).where(DbPrompt.id.in_(prompt_ids))).scalars().all()
608 found_prompt_ids = {prompt.id for prompt in prompts}
609 missing_prompt_ids = set(prompt_ids) - found_prompt_ids
610 if missing_prompt_ids:
611 raise ServerError(f"Prompts with ids {missing_prompt_ids} do not exist.")
612 db_server.prompts.extend(prompts)
613 elif prompt_ids:
614 # Use single query for single item (maintains test compatibility)
615 prompt_obj = db.get(DbPrompt, prompt_ids[0])
616 if not prompt_obj:
617 raise ServerError(f"Prompt with id {prompt_ids[0]} does not exist.")
618 db_server.prompts.append(prompt_obj)
620 # Associate A2A agents, verifying each exists using bulk query when multiple items
621 if server_in.associated_a2a_agents:
622 agent_ids = [agent_id.strip() for agent_id in server_in.associated_a2a_agents if agent_id.strip()]
623 if len(agent_ids) > 1:
624 # Use bulk query for multiple items
625 agents = db.execute(select(DbA2AAgent).where(DbA2AAgent.id.in_(agent_ids))).scalars().all()
626 found_agent_ids = {agent.id for agent in agents}
627 missing_agent_ids = set(agent_ids) - found_agent_ids
628 if missing_agent_ids:
629 raise ServerError(f"A2A Agents with ids {missing_agent_ids} do not exist.")
630 db_server.a2a_agents.extend(agents)
632 # Note: Auto-tool creation for A2A agents should be handled
633 # by a separate service or background task to avoid circular imports
634 for agent in agents:
635 logger.info(f"A2A agent {agent.name} associated with server {db_server.name}")
636 elif agent_ids:
637 # Use single query for single item (maintains test compatibility)
638 agent_obj = db.get(DbA2AAgent, agent_ids[0])
639 if not agent_obj:
640 raise ServerError(f"A2A Agent with id {agent_ids[0]} does not exist.")
641 db_server.a2a_agents.append(agent_obj)
642 logger.info(f"A2A agent {agent_obj.name} associated with server {db_server.name}")
644 # Commit the new record and refresh.
645 db.commit()
646 db.refresh(db_server)
647 # Force load the relationship attributes.
648 _ = db_server.tools, db_server.resources, db_server.prompts, db_server.a2a_agents
650 # Assemble response data with associated item IDs.
651 server_data = {
652 "id": db_server.id,
653 "name": db_server.name,
654 "description": db_server.description,
655 "icon": db_server.icon,
656 "created_at": db_server.created_at,
657 "updated_at": db_server.updated_at,
658 "enabled": db_server.enabled,
659 "associated_tools": [str(tool.id) for tool in db_server.tools],
660 "associated_resources": [str(resource.id) for resource in db_server.resources],
661 "associated_prompts": [str(prompt.id) for prompt in db_server.prompts],
662 }
663 logger.debug(f"Server Data: {server_data}")
664 await self._notify_server_added(db_server)
665 logger.info(f"Registered server: {server_in.name}")
667 # Structured logging: Audit trail for server creation
668 self._audit_trail.log_action(
669 user_id=created_by or "system",
670 action="create_server",
671 resource_type="server",
672 resource_id=db_server.id,
673 details={
674 "server_name": db_server.name,
675 "visibility": visibility,
676 "team_id": team_id,
677 "associated_tools_count": len(db_server.tools),
678 "associated_resources_count": len(db_server.resources),
679 "associated_prompts_count": len(db_server.prompts),
680 "associated_a2a_agents_count": len(db_server.a2a_agents),
681 },
682 metadata={
683 "created_from_ip": created_from_ip,
684 "created_via": created_via,
685 "created_user_agent": created_user_agent,
686 },
687 )
689 # Structured logging: Log successful server creation
690 self._structured_logger.log(
691 level="INFO",
692 message="Server created successfully",
693 event_type="server_created",
694 component="server_service",
695 server_id=db_server.id,
696 server_name=db_server.name,
697 visibility=visibility,
698 created_by=created_by,
699 user_email=created_by,
700 )
702 # Team name is loaded via db_server.team property from email_team relationship
703 return self.convert_server_to_read(db_server)
704 except IntegrityError as ie:
705 db.rollback()
706 logger.error(f"IntegrityErrors in group: {ie}")
708 # Structured logging: Log database integrity error
709 self._structured_logger.log(
710 level="ERROR",
711 message="Server creation failed due to database integrity error",
712 event_type="server_creation_failed",
713 component="server_service",
714 server_name=server_in.name,
715 error_type="IntegrityError",
716 error_message=str(ie),
717 created_by=created_by,
718 user_email=created_by,
719 )
720 raise ie
721 except ServerNameConflictError as se:
722 db.rollback()
724 # Structured logging: Log name conflict error
725 self._structured_logger.log(
726 level="WARNING",
727 message="Server creation failed due to name conflict",
728 event_type="server_name_conflict",
729 component="server_service",
730 server_name=server_in.name,
731 visibility=visibility,
732 created_by=created_by,
733 user_email=created_by,
734 )
735 raise se
736 except Exception as ex:
737 db.rollback()
739 # Structured logging: Log generic server creation failure
740 self._structured_logger.log(
741 level="ERROR",
742 message="Server creation failed",
743 event_type="server_creation_failed",
744 component="server_service",
745 server_name=server_in.name,
746 error_type=type(ex).__name__,
747 error_message=str(ex),
748 created_by=created_by,
749 user_email=created_by,
750 )
751 raise ServerError(f"Failed to register server: {str(ex)}")
753 async def list_servers(
754 self,
755 db: Session,
756 include_inactive: bool = False,
757 tags: Optional[List[str]] = None,
758 cursor: Optional[str] = None,
759 limit: Optional[int] = None,
760 page: Optional[int] = None,
761 per_page: Optional[int] = None,
762 user_email: Optional[str] = None,
763 team_id: Optional[str] = None,
764 visibility: Optional[str] = None,
765 token_teams: Optional[List[str]] = None,
766 ) -> Union[tuple[List[ServerRead], Optional[str]], Dict[str, Any]]:
767 """List all registered servers with cursor or page-based pagination and optional team filtering.
769 Args:
770 db: Database session.
771 include_inactive: Whether to include inactive servers.
772 tags: Filter servers by tags. If provided, only servers with at least one matching tag will be returned.
773 cursor: Cursor for pagination (encoded last created_at and id).
774 limit: Maximum number of servers to return. None for default, 0 for unlimited.
775 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
776 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size.
777 user_email: Email of user for team-based access control. None for no access control.
778 team_id: Optional team ID to filter by specific team (requires user_email).
779 visibility: Optional visibility filter (private, team, public) (requires user_email).
780 token_teams: Optional list of team IDs from the token (None=unrestricted, []=public-only).
782 Returns:
783 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}}
784 If cursor is provided or neither: tuple of (list of ServerRead objects, next_cursor).
786 Examples:
787 >>> from mcpgateway.services.server_service import ServerService
788 >>> from unittest.mock import MagicMock
789 >>> service = ServerService()
790 >>> db = MagicMock()
791 >>> server_read = MagicMock()
792 >>> service.convert_server_to_read = MagicMock(return_value=server_read)
793 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()]
794 >>> import asyncio
795 >>> servers, cursor = asyncio.run(service.list_servers(db))
796 >>> isinstance(servers, list) and cursor is None
797 True
798 """
799 # Check cache for first page only
800 # SECURITY: Only cache public-only results (token_teams=[])
801 # - token_teams=None (admin bypass): Don't cache - admin sees all, should be fresh
802 # - token_teams=[] (public-only): Cache - same result for all public-only users
803 # - token_teams=[...] (team-scoped): Don't cache - results vary by team
804 # - user_email set: Don't cache - results vary by user ownership
805 cache = _get_registry_cache()
806 is_public_only = token_teams is not None and len(token_teams) == 0
807 use_cache = cursor is None and user_email is None and page is None and is_public_only
808 if use_cache:
809 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None)
810 cached = await cache.get("servers", filters_hash)
811 if cached is not None:
812 # Reconstruct ServerRead objects from cached dicts
813 cached_servers = [ServerRead.model_validate(s).masked() for s in cached["servers"]]
814 return (cached_servers, cached.get("next_cursor"))
816 # Build base query with ordering and eager load relationships to avoid N+1
817 query = (
818 select(DbServer)
819 .options(
820 selectinload(DbServer.tools),
821 selectinload(DbServer.resources),
822 selectinload(DbServer.prompts),
823 selectinload(DbServer.a2a_agents),
824 joinedload(DbServer.email_team),
825 )
826 .order_by(desc(DbServer.created_at), desc(DbServer.id))
827 )
829 # Apply active/inactive filter
830 if not include_inactive:
831 query = query.where(DbServer.enabled)
833 query = await self._apply_access_control(query, db, user_email, token_teams, team_id)
835 if visibility:
836 query = query.where(DbServer.visibility == visibility)
838 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats)
839 if tags:
840 query = query.where(json_contains_tag_expr(db, DbServer.tags, tags, match_any=True))
842 # Use unified pagination helper - handles both page and cursor pagination
843 pag_result = await unified_paginate(
844 db=db,
845 query=query,
846 page=page,
847 per_page=per_page,
848 cursor=cursor,
849 limit=limit,
850 base_url="/admin/servers", # Used for page-based links
851 query_params={"include_inactive": include_inactive} if include_inactive else {},
852 )
854 next_cursor = None
855 # Extract servers based on pagination type
856 if page is not None:
857 # Page-based: pag_result is a dict
858 servers_db = pag_result["data"]
859 else:
860 # Cursor-based: pag_result is a tuple
861 servers_db, next_cursor = pag_result
863 db.commit() # Release transaction to avoid idle-in-transaction
865 # Convert to ServerRead (common for both pagination types)
866 # Team names are loaded via joinedload(DbServer.email_team)
867 result = []
868 for s in servers_db:
869 try:
870 result.append(self.convert_server_to_read(s, include_metrics=False))
871 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
872 logger.exception(f"Failed to convert server {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}")
873 # Continue with remaining servers instead of failing completely
875 # Return appropriate format based on pagination type
876 if page is not None:
877 # Page-based format
878 return {
879 "data": result,
880 "pagination": pag_result["pagination"],
881 "links": pag_result["links"],
882 }
884 # Cursor-based format
886 # Cache first page results - only for public-only queries (no user/team filtering)
887 # SECURITY: Only cache public-only results (token_teams=[]), never admin bypass or team-scoped
888 if cursor is None and user_email is None and is_public_only:
889 try:
890 cache_data = {"servers": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor}
891 await cache.set("servers", cache_data, filters_hash)
892 except AttributeError:
893 pass # Skip caching if result objects don't support model_dump (e.g., in doctests)
895 return (result, next_cursor)
897 async def list_servers_for_user(
898 self, db: Session, user_email: str, team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100
899 ) -> List[ServerRead]:
900 """
901 DEPRECATED: Use list_servers() with user_email parameter instead.
903 This method is maintained for backward compatibility but is no longer used.
904 New code should call list_servers() with user_email, team_id, and visibility parameters.
906 List servers user has access to with team filtering.
908 Args:
909 db: Database session
910 user_email: Email of the user requesting servers
911 team_id: Optional team ID to filter by specific team
912 visibility: Optional visibility filter (private, team, public)
913 include_inactive: Whether to include inactive servers
914 skip: Number of servers to skip for pagination
915 limit: Maximum number of servers to return
917 Returns:
918 List[ServerRead]: Servers the user has access to
919 """
920 # Build query following existing patterns from list_servers()
921 team_service = TeamManagementService(db)
922 user_teams = await team_service.get_user_teams(user_email)
923 team_ids = [team.id for team in user_teams]
925 # Eager load relationships to avoid N+1 queries
926 query = select(DbServer).options(
927 selectinload(DbServer.tools),
928 selectinload(DbServer.resources),
929 selectinload(DbServer.prompts),
930 selectinload(DbServer.a2a_agents),
931 joinedload(DbServer.email_team),
932 )
934 # Apply active/inactive filter
935 if not include_inactive:
936 query = query.where(DbServer.enabled)
938 if team_id:
939 if team_id not in team_ids:
940 return [] # No access to team
942 access_conditions = []
943 # Filter by specific team
944 access_conditions.append(and_(DbServer.team_id == team_id, DbServer.visibility.in_(["team", "public"])))
946 access_conditions.append(and_(DbServer.team_id == team_id, DbServer.owner_email == user_email))
948 query = query.where(or_(*access_conditions))
949 else:
950 # Get user's accessible teams
951 # Build access conditions following existing patterns
952 access_conditions = []
954 # 1. User's personal resources (owner_email matches)
955 access_conditions.append(DbServer.owner_email == user_email)
957 # 2. Team resources where user is member
958 if team_ids:
959 access_conditions.append(and_(DbServer.team_id.in_(team_ids), DbServer.visibility.in_(["team", "public"])))
961 # 3. Public resources (if visibility allows)
962 access_conditions.append(DbServer.visibility == "public")
964 query = query.where(or_(*access_conditions))
966 # Apply visibility filter if specified
967 if visibility:
968 query = query.where(DbServer.visibility == visibility)
970 # Apply pagination following existing patterns
971 query = query.offset(skip).limit(limit)
973 servers = db.execute(query).scalars().all()
975 db.commit() # Release transaction to avoid idle-in-transaction
977 # Skip metrics to avoid N+1 queries in list operations
978 # Team names are loaded via joinedload(DbServer.email_team)
979 result = []
980 for s in servers:
981 try:
982 result.append(self.convert_server_to_read(s, include_metrics=False))
983 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
984 logger.exception(f"Failed to convert server {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}")
985 # Continue with remaining servers instead of failing completely
986 return result
988 async def get_server(self, db: Session, server_id: str) -> ServerRead:
989 """Retrieve server details by ID.
991 Args:
992 db: Database session.
993 server_id: The unique identifier of the server.
995 Returns:
996 The corresponding ServerRead object.
998 Raises:
999 ServerNotFoundError: If no server with the given ID exists.
1001 Examples:
1002 >>> from mcpgateway.services.server_service import ServerService
1003 >>> from unittest.mock import MagicMock
1004 >>> service = ServerService()
1005 >>> db = MagicMock()
1006 >>> server = MagicMock()
1007 >>> db.get.return_value = server
1008 >>> service.convert_server_to_read = MagicMock(return_value='server_read')
1009 >>> import asyncio
1010 >>> asyncio.run(service.get_server(db, 'server_id'))
1011 'server_read'
1012 """
1013 server = db.execute(
1014 select(DbServer)
1015 .options(
1016 selectinload(DbServer.tools),
1017 selectinload(DbServer.resources),
1018 selectinload(DbServer.prompts),
1019 selectinload(DbServer.a2a_agents),
1020 joinedload(DbServer.email_team),
1021 )
1022 .where(DbServer.id == server_id)
1023 ).scalar_one_or_none()
1024 if not server:
1025 raise ServerNotFoundError(f"Server not found: {server_id}")
1026 server_data = {
1027 "id": server.id,
1028 "name": server.name,
1029 "description": server.description,
1030 "icon": server.icon,
1031 "created_at": server.created_at,
1032 "updated_at": server.updated_at,
1033 "enabled": server.enabled,
1034 "associated_tools": [tool.name for tool in server.tools],
1035 "associated_resources": [res.id for res in server.resources],
1036 "associated_prompts": [prompt.id for prompt in server.prompts],
1037 }
1038 logger.debug(f"Server Data: {server_data}")
1039 # Team name is loaded via server.team property from email_team relationship
1040 server_read = self.convert_server_to_read(server)
1042 self._structured_logger.log(
1043 level="INFO",
1044 message="Server retrieved successfully",
1045 event_type="server_viewed",
1046 component="server_service",
1047 server_id=server.id,
1048 server_name=server.name,
1049 team_id=getattr(server, "team_id", None),
1050 resource_type="server",
1051 resource_id=server.id,
1052 custom_fields={
1053 "enabled": server.enabled,
1054 "tool_count": len(getattr(server, "tools", []) or []),
1055 "resource_count": len(getattr(server, "resources", []) or []),
1056 "prompt_count": len(getattr(server, "prompts", []) or []),
1057 },
1058 )
1060 self._audit_trail.log_action(
1061 action="view_server",
1062 resource_type="server",
1063 resource_id=server.id,
1064 resource_name=server.name,
1065 user_id="system",
1066 team_id=getattr(server, "team_id", None),
1067 context={"enabled": server.enabled},
1068 db=db,
1069 )
1071 return server_read
1073 async def update_server(
1074 self,
1075 db: Session,
1076 server_id: str,
1077 server_update: ServerUpdate,
1078 user_email: str,
1079 modified_by: Optional[str] = None,
1080 modified_from_ip: Optional[str] = None,
1081 modified_via: Optional[str] = None,
1082 modified_user_agent: Optional[str] = None,
1083 ) -> ServerRead:
1084 """Update an existing server.
1086 Args:
1087 db: Database session.
1088 server_id: The unique identifier of the server.
1089 server_update: Server update schema with new data.
1090 user_email: email of the user performing the update (for permission checks).
1091 modified_by: Username who modified this server.
1092 modified_from_ip: IP address from which modification was made.
1093 modified_via: Source of modification (api, ui, etc.).
1094 modified_user_agent: User agent of the client making the modification.
1096 Returns:
1097 The updated ServerRead object.
1099 Raises:
1100 ServerNotFoundError: If the server is not found.
1101 PermissionError: If user doesn't own the server.
1102 ServerNameConflictError: If a new name conflicts with an existing server.
1103 ServerError: For other update errors.
1104 IntegrityError: If a database integrity error occurs.
1105 ValueError: If visibility or team constraints are violated.
1107 Examples:
1108 >>> from mcpgateway.services.server_service import ServerService
1109 >>> from unittest.mock import MagicMock, AsyncMock, patch
1110 >>> from mcpgateway.schemas import ServerRead
1111 >>> service = ServerService()
1112 >>> db = MagicMock()
1113 >>> server = MagicMock()
1114 >>> server.id = 'server_id'
1115 >>> server.name = 'test_server'
1116 >>> server.owner_email = 'user_email' # Set owner to match user performing update
1117 >>> server.team_id = None
1118 >>> server.visibility = 'public'
1119 >>> db.get.return_value = server
1120 >>> db.commit = MagicMock()
1121 >>> db.refresh = MagicMock()
1122 >>> db.execute.return_value.scalar_one_or_none.return_value = None
1123 >>> service.convert_server_to_read = MagicMock(return_value='server_read')
1124 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes
1125 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes
1126 >>> ServerRead.model_validate = MagicMock(return_value='server_read')
1127 >>> server_update = MagicMock()
1128 >>> server_update.id = None # No UUID change
1129 >>> server_update.name = None # No name change
1130 >>> server_update.description = None
1131 >>> server_update.icon = None
1132 >>> server_update.visibility = None
1133 >>> server_update.team_id = None
1134 >>> import asyncio
1135 >>> with patch('mcpgateway.services.server_service.get_for_update', return_value=server):
1136 ... asyncio.run(service.update_server(db, 'server_id', server_update, 'user_email'))
1137 'server_read'
1138 """
1139 try:
1140 server = get_for_update(
1141 db,
1142 DbServer,
1143 server_id,
1144 options=[
1145 selectinload(DbServer.tools),
1146 selectinload(DbServer.resources),
1147 selectinload(DbServer.prompts),
1148 selectinload(DbServer.a2a_agents),
1149 selectinload(DbServer.email_team),
1150 ],
1151 )
1152 if not server:
1153 raise ServerNotFoundError(f"Server not found: {server_id}")
1155 # Check ownership if user_email provided
1156 if user_email:
1157 # First-Party
1158 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
1160 permission_service = PermissionService(db)
1161 if not await permission_service.check_resource_ownership(user_email, server):
1162 raise PermissionError("Only the owner can update this server")
1164 # Check for name conflict if name is being changed and visibility is public
1165 if server_update.name and server_update.name != server.name:
1166 visibility = server_update.visibility or server.visibility
1167 team_id = server_update.team_id or server.team_id
1168 if visibility.lower() == "public":
1169 # Check for existing public server with the same name
1170 existing_server = get_for_update(db, DbServer, where=and_(DbServer.name == server_update.name, DbServer.visibility == "public", DbServer.id != server.id))
1171 if existing_server:
1172 raise ServerNameConflictError(server_update.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility)
1173 elif visibility.lower() == "team" and team_id:
1174 # Check for existing team server with the same name
1175 existing_server = get_for_update(
1176 db, DbServer, where=and_(DbServer.name == server_update.name, DbServer.visibility == "team", DbServer.team_id == team_id, DbServer.id != server.id)
1177 )
1178 if existing_server:
1179 raise ServerNameConflictError(server_update.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility)
1181 # Update simple fields
1182 if server_update.id is not None and server_update.id != server.id:
1183 # Check if the new UUID is already in use
1184 existing = db.get(DbServer, server_update.id)
1185 if existing:
1186 raise ServerError(f"Server with ID {server_update.id} already exists")
1187 server.id = server_update.id
1188 if server_update.name is not None:
1189 server.name = server_update.name
1190 if server_update.description is not None:
1191 server.description = server_update.description
1192 if server_update.icon is not None:
1193 server.icon = server_update.icon
1195 if server_update.visibility is not None:
1196 new_visibility = server_update.visibility
1198 # Validate visibility transitions
1199 if new_visibility == "team":
1200 target_team_id = server_update.team_id if server_update.team_id is not None else server.team_id
1201 _validate_server_team_assignment(db, user_email, target_team_id)
1203 elif new_visibility == "public":
1204 # Optional: Check if user has permission to make resources public
1205 # This could be a platform-level permission
1206 pass
1208 server.visibility = new_visibility
1210 if server_update.team_id is not None:
1211 if server_update.team_id != server.team_id:
1212 _validate_server_team_assignment(db, user_email, server_update.team_id)
1213 server.team_id = server_update.team_id
1215 # Update associated tools if provided using bulk query
1216 if server_update.associated_tools is not None:
1217 server.tools = []
1218 if server_update.associated_tools:
1219 tool_ids = [tool_id for tool_id in server_update.associated_tools if tool_id]
1220 if tool_ids:
1221 tools = db.execute(select(DbTool).where(DbTool.id.in_(tool_ids))).scalars().all()
1222 server.tools = list(tools)
1224 # Update associated resources if provided using bulk query
1225 if server_update.associated_resources is not None:
1226 server.resources = []
1227 if server_update.associated_resources:
1228 resource_ids = [resource_id for resource_id in server_update.associated_resources if resource_id]
1229 if resource_ids:
1230 resources = db.execute(select(DbResource).where(DbResource.id.in_(resource_ids))).scalars().all()
1231 server.resources = list(resources)
1233 # Update associated prompts if provided using bulk query
1234 if server_update.associated_prompts is not None:
1235 server.prompts = []
1236 if server_update.associated_prompts:
1237 prompt_ids = [prompt_id for prompt_id in server_update.associated_prompts if prompt_id]
1238 if prompt_ids:
1239 prompts = db.execute(select(DbPrompt).where(DbPrompt.id.in_(prompt_ids))).scalars().all()
1240 server.prompts = list(prompts)
1242 # Update tags if provided
1243 if server_update.tags is not None:
1244 server.tags = server_update.tags
1246 # Update OAuth 2.0 configuration if provided
1247 # Track if OAuth is being explicitly disabled to prevent config re-assignment
1248 oauth_being_disabled = server_update.oauth_enabled is not None and not server_update.oauth_enabled
1250 if server_update.oauth_enabled is not None:
1251 server.oauth_enabled = server_update.oauth_enabled
1252 # If OAuth is being disabled, clear the config
1253 if oauth_being_disabled:
1254 server.oauth_config = None
1256 # Only update oauth_config if OAuth is not being explicitly disabled
1257 # This prevents the case where oauth_enabled=False and oauth_config are both provided
1258 if not oauth_being_disabled:
1259 if hasattr(server_update, "model_fields_set") and "oauth_config" in server_update.model_fields_set:
1260 server.oauth_config = await protect_oauth_config_for_storage(server_update.oauth_config, existing_oauth_config=server.oauth_config)
1261 elif server_update.oauth_config is not None:
1262 server.oauth_config = await protect_oauth_config_for_storage(server_update.oauth_config, existing_oauth_config=server.oauth_config)
1264 # Update metadata fields
1265 server.updated_at = datetime.now(timezone.utc)
1266 if modified_by:
1267 server.modified_by = modified_by
1268 if modified_from_ip:
1269 server.modified_from_ip = modified_from_ip
1270 if modified_via:
1271 server.modified_via = modified_via
1272 if modified_user_agent:
1273 server.modified_user_agent = modified_user_agent
1274 if hasattr(server, "version") and server.version is not None:
1275 server.version = server.version + 1
1276 else:
1277 server.version = 1
1279 db.commit()
1280 db.refresh(server)
1281 # Force loading relationships
1282 _ = server.tools, server.resources, server.prompts
1284 # Invalidate cache after successful update
1285 cache = _get_registry_cache()
1286 await cache.invalidate_servers()
1287 # Also invalidate tags cache since server tags may have changed
1288 # First-Party
1289 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
1291 await admin_stats_cache.invalidate_tags()
1293 await self._notify_server_updated(server)
1294 logger.info(f"Updated server: {server.name}")
1296 # Structured logging: Audit trail for server update
1297 changes = []
1298 if server_update.name:
1299 changes.append(f"name: {server_update.name}")
1300 if server_update.visibility:
1301 changes.append(f"visibility: {server_update.visibility}")
1302 if server_update.team_id:
1303 changes.append(f"team_id: {server_update.team_id}")
1305 self._audit_trail.log_action(
1306 user_id=user_email or "system",
1307 action="update_server",
1308 resource_type="server",
1309 resource_id=server.id,
1310 details={
1311 "server_name": server.name,
1312 "changes": ", ".join(changes) if changes else "metadata only",
1313 "version": server.version,
1314 },
1315 metadata={
1316 "modified_from_ip": modified_from_ip,
1317 "modified_via": modified_via,
1318 "modified_user_agent": modified_user_agent,
1319 },
1320 )
1322 # Structured logging: Log successful server update
1323 self._structured_logger.log(
1324 level="INFO",
1325 message="Server updated successfully",
1326 event_type="server_updated",
1327 component="server_service",
1328 server_id=server.id,
1329 server_name=server.name,
1330 modified_by=user_email,
1331 user_email=user_email,
1332 )
1334 # Build a dictionary with associated IDs
1335 # Team name is loaded via server.team property from email_team relationship
1336 server_data = {
1337 "id": server.id,
1338 "name": server.name,
1339 "description": server.description,
1340 "icon": server.icon,
1341 "team": server.team,
1342 "created_at": server.created_at,
1343 "updated_at": server.updated_at,
1344 "enabled": server.enabled,
1345 "associated_tools": [tool.id for tool in server.tools],
1346 "associated_resources": [res.id for res in server.resources],
1347 "associated_prompts": [prompt.id for prompt in server.prompts],
1348 }
1349 logger.debug(f"Server Data: {server_data}")
1350 return self.convert_server_to_read(server)
1351 except IntegrityError as ie:
1352 db.rollback()
1353 logger.error(f"IntegrityErrors in group: {ie}")
1355 # Structured logging: Log database integrity error
1356 self._structured_logger.log(
1357 level="ERROR",
1358 message="Server update failed due to database integrity error",
1359 event_type="server_update_failed",
1360 component="server_service",
1361 server_id=server_id,
1362 error_type="IntegrityError",
1363 error_message=str(ie),
1364 modified_by=user_email,
1365 user_email=user_email,
1366 )
1367 raise ie
1368 except ServerNameConflictError as snce:
1369 db.rollback()
1370 logger.error(f"Server name conflict: {snce}")
1372 # Structured logging: Log name conflict error
1373 self._structured_logger.log(
1374 level="WARNING",
1375 message="Server update failed due to name conflict",
1376 event_type="server_name_conflict",
1377 component="server_service",
1378 server_id=server_id,
1379 modified_by=user_email,
1380 user_email=user_email,
1381 )
1382 raise snce
1383 except Exception as e:
1384 db.rollback()
1386 # Structured logging: Log generic server update failure
1387 self._structured_logger.log(
1388 level="ERROR",
1389 message="Server update failed",
1390 event_type="server_update_failed",
1391 component="server_service",
1392 server_id=server_id,
1393 error_type=type(e).__name__,
1394 error_message=str(e),
1395 modified_by=user_email,
1396 user_email=user_email,
1397 )
1398 raise ServerError(f"Failed to update server: {str(e)}")
1400 async def set_server_state(self, db: Session, server_id: str, activate: bool, user_email: Optional[str] = None) -> ServerRead:
1401 """Set the activation status of a server.
1403 Args:
1404 db: Database session.
1405 server_id: The unique identifier of the server.
1406 activate: True to activate, False to deactivate.
1407 user_email: Optional[str] The email of the user to check if the user has permission to modify.
1409 Returns:
1410 The updated ServerRead object.
1412 Raises:
1413 ServerNotFoundError: If the server is not found.
1414 ServerLockConflictError: If the server row is locked by another transaction.
1415 ServerError: For other errors.
1416 PermissionError: If user doesn't own the agent.
1418 Examples:
1419 >>> from mcpgateway.services.server_service import ServerService
1420 >>> from unittest.mock import MagicMock, AsyncMock, patch
1421 >>> from mcpgateway.schemas import ServerRead
1422 >>> service = ServerService()
1423 >>> db = MagicMock()
1424 >>> server = MagicMock()
1425 >>> db.get.return_value = server
1426 >>> db.commit = MagicMock()
1427 >>> db.refresh = MagicMock()
1428 >>> service._notify_server_activated = AsyncMock()
1429 >>> service._notify_server_deactivated = AsyncMock()
1430 >>> service.convert_server_to_read = MagicMock(return_value='server_read')
1431 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes
1432 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes
1433 >>> ServerRead.model_validate = MagicMock(return_value='server_read')
1434 >>> import asyncio
1435 >>> asyncio.run(service.set_server_state(db, 'server_id', True))
1436 'server_read'
1437 """
1438 try:
1439 # Use nowait=True to fail fast if row is locked, preventing lock contention under high load
1440 try:
1441 server = get_for_update(
1442 db,
1443 DbServer,
1444 server_id,
1445 nowait=True,
1446 options=[
1447 selectinload(DbServer.tools),
1448 selectinload(DbServer.resources),
1449 selectinload(DbServer.prompts),
1450 selectinload(DbServer.a2a_agents),
1451 selectinload(DbServer.email_team),
1452 ],
1453 )
1454 except OperationalError as lock_err:
1455 # Row is locked by another transaction - fail fast with 409
1456 db.rollback()
1457 raise ServerLockConflictError(f"Server {server_id} is currently being modified by another request") from lock_err
1458 if not server:
1459 raise ServerNotFoundError(f"Server not found: {server_id}")
1461 if user_email:
1462 # First-Party
1463 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
1465 permission_service = PermissionService(db)
1466 if not await permission_service.check_resource_ownership(user_email, server):
1467 raise PermissionError("Only the owner can activate the Server" if activate else "Only the owner can deactivate the Server")
1469 if server.enabled != activate:
1470 server.enabled = activate
1471 server.updated_at = datetime.now(timezone.utc)
1472 db.commit()
1473 db.refresh(server)
1475 # Invalidate cache after status change
1476 cache = _get_registry_cache()
1477 await cache.invalidate_servers()
1479 if activate:
1480 await self._notify_server_activated(server)
1481 else:
1482 await self._notify_server_deactivated(server)
1483 logger.info(f"Server {server.name} {'activated' if activate else 'deactivated'}")
1485 # Structured logging: Audit trail for server state change
1486 self._audit_trail.log_action(
1487 user_id=user_email or "system",
1488 action="activate_server" if activate else "deactivate_server",
1489 resource_type="server",
1490 resource_id=server.id,
1491 details={
1492 "server_name": server.name,
1493 "new_status": "active" if activate else "inactive",
1494 },
1495 )
1497 # Structured logging: Log server status change
1498 self._structured_logger.log(
1499 level="INFO",
1500 message=f"Server {'activated' if activate else 'deactivated'}",
1501 event_type="server_status_changed",
1502 component="server_service",
1503 server_id=server.id,
1504 server_name=server.name,
1505 new_status="active" if activate else "inactive",
1506 changed_by=user_email,
1507 user_email=user_email,
1508 )
1510 # Team name is loaded via server.team property from email_team relationship
1511 server_data = {
1512 "id": server.id,
1513 "name": server.name,
1514 "description": server.description,
1515 "icon": server.icon,
1516 "team": server.team,
1517 "created_at": server.created_at,
1518 "updated_at": server.updated_at,
1519 "enabled": server.enabled,
1520 "associated_tools": [tool.id for tool in server.tools],
1521 "associated_resources": [res.id for res in server.resources],
1522 "associated_prompts": [prompt.id for prompt in server.prompts],
1523 }
1524 logger.info(f"Server Data: {server_data}")
1525 return self.convert_server_to_read(server)
1526 except PermissionError as e:
1527 # Structured logging: Log permission error
1528 self._structured_logger.log(
1529 level="WARNING",
1530 message="Server state change failed due to insufficient permissions",
1531 event_type="server_state_change_permission_denied",
1532 component="server_service",
1533 server_id=server_id,
1534 user_email=user_email,
1535 )
1536 raise e
1537 except ServerLockConflictError:
1538 # Re-raise lock conflicts without wrapping - allows 409 response
1539 raise
1540 except ServerNotFoundError:
1541 # Re-raise not found without wrapping - allows 404 response
1542 raise
1543 except Exception as e:
1544 db.rollback()
1546 # Structured logging: Log generic server state change failure
1547 self._structured_logger.log(
1548 level="ERROR",
1549 message="Server state change failed",
1550 event_type="server_state_change_failed",
1551 component="server_service",
1552 server_id=server_id,
1553 error_type=type(e).__name__,
1554 error_message=str(e),
1555 user_email=user_email,
1556 )
1557 raise ServerError(f"Failed to set server state: {str(e)}")
1559 async def delete_server(self, db: Session, server_id: str, user_email: Optional[str] = None, purge_metrics: bool = False) -> None:
1560 """Permanently delete a server.
1562 Args:
1563 db: Database session.
1564 server_id: The unique identifier of the server.
1565 user_email: Email of user performing deletion (for ownership check).
1566 purge_metrics: If True, delete raw + rollup metrics for this server.
1568 Raises:
1569 ServerNotFoundError: If the server is not found.
1570 PermissionError: If user doesn't own the server.
1571 ServerError: For other deletion errors.
1573 Examples:
1574 >>> from mcpgateway.services.server_service import ServerService
1575 >>> from unittest.mock import MagicMock, AsyncMock, patch
1576 >>> service = ServerService()
1577 >>> db = MagicMock()
1578 >>> server = MagicMock()
1579 >>> db.get.return_value = server
1580 >>> db.delete = MagicMock()
1581 >>> db.commit = MagicMock()
1582 >>> service._notify_server_deleted = AsyncMock()
1583 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes
1584 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes
1585 >>> import asyncio
1586 >>> asyncio.run(service.delete_server(db, 'server_id', 'user@example.com'))
1587 """
1588 try:
1589 server = db.get(DbServer, server_id)
1590 if not server:
1591 raise ServerNotFoundError(f"Server not found: {server_id}")
1593 # Check ownership if user_email provided
1594 if user_email:
1595 # First-Party
1596 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
1598 permission_service = PermissionService(db)
1599 if not await permission_service.check_resource_ownership(user_email, server):
1600 raise PermissionError("Only the owner can delete this server")
1602 server_info = {"id": server.id, "name": server.name}
1603 if purge_metrics:
1604 with pause_rollup_during_purge(reason=f"purge_server:{server_id}"):
1605 delete_metrics_in_batches(db, ServerMetric, ServerMetric.server_id, server_id)
1606 delete_metrics_in_batches(db, ServerMetricsHourly, ServerMetricsHourly.server_id, server_id)
1607 db.delete(server)
1608 db.commit()
1610 # Invalidate cache after successful deletion
1611 cache = _get_registry_cache()
1612 await cache.invalidate_servers()
1613 # Also invalidate tags cache since server tags may have changed
1614 # First-Party
1615 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
1617 await admin_stats_cache.invalidate_tags()
1618 # First-Party
1619 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
1621 metrics_cache.invalidate_prefix("top_servers:")
1622 metrics_cache.invalidate("servers")
1624 await self._notify_server_deleted(server_info)
1625 logger.info(f"Deleted server: {server_info['name']}")
1627 # Structured logging: Audit trail for server deletion
1628 self._audit_trail.log_action(
1629 user_id=user_email or "system",
1630 action="delete_server",
1631 resource_type="server",
1632 resource_id=server_info["id"],
1633 details={
1634 "server_name": server_info["name"],
1635 },
1636 )
1638 # Structured logging: Log successful server deletion
1639 self._structured_logger.log(
1640 level="INFO",
1641 message="Server deleted successfully",
1642 event_type="server_deleted",
1643 component="server_service",
1644 server_id=server_info["id"],
1645 server_name=server_info["name"],
1646 deleted_by=user_email,
1647 user_email=user_email,
1648 purge_metrics=purge_metrics,
1649 )
1650 except PermissionError as pe:
1651 db.rollback()
1653 # Structured logging: Log permission error
1654 self._structured_logger.log(
1655 level="WARNING",
1656 message="Server deletion failed due to insufficient permissions",
1657 event_type="server_deletion_permission_denied",
1658 component="server_service",
1659 server_id=server_id,
1660 user_email=user_email,
1661 )
1662 raise pe
1663 except Exception as e:
1664 db.rollback()
1666 # Structured logging: Log generic server deletion failure
1667 self._structured_logger.log(
1668 level="ERROR",
1669 message="Server deletion failed",
1670 event_type="server_deletion_failed",
1671 component="server_service",
1672 server_id=server_id,
1673 error_type=type(e).__name__,
1674 error_message=str(e),
1675 user_email=user_email,
1676 )
1677 raise ServerError(f"Failed to delete server: {str(e)}")
1679 async def _publish_event(self, event: Dict[str, Any]) -> None:
1680 """
1681 Publish an event to all subscribed queues.
1683 Args:
1684 event: Event to publish
1685 """
1686 for queue in self._event_subscribers:
1687 await queue.put(event)
1689 async def subscribe_events(self) -> AsyncGenerator[Dict[str, Any], None]:
1690 """Subscribe to server events.
1692 Yields:
1693 Server event messages.
1694 """
1695 queue: asyncio.Queue = asyncio.Queue()
1696 self._event_subscribers.append(queue)
1697 try:
1698 while True:
1699 event = await queue.get()
1700 yield event
1701 finally:
1702 self._event_subscribers.remove(queue)
1704 async def _notify_server_added(self, server: DbServer) -> None:
1705 """
1706 Notify subscribers that a new server has been added.
1708 Args:
1709 server: Server to add
1710 """
1711 associated_tools = [tool.id for tool in server.tools] if server.tools else []
1712 associated_resources = [res.id for res in server.resources] if server.resources else []
1713 associated_prompts = [prompt.id for prompt in server.prompts] if server.prompts else []
1714 event = {
1715 "type": "server_added",
1716 "data": {
1717 "id": server.id,
1718 "name": server.name,
1719 "description": server.description,
1720 "icon": server.icon,
1721 "associated_tools": associated_tools,
1722 "associated_resources": associated_resources,
1723 "associated_prompts": associated_prompts,
1724 "enabled": server.enabled,
1725 },
1726 "timestamp": datetime.now(timezone.utc).isoformat(),
1727 }
1728 await self._publish_event(event)
1730 async def _notify_server_updated(self, server: DbServer) -> None:
1731 """
1732 Notify subscribers that a server has been updated.
1734 Args:
1735 server: Server to update
1736 """
1737 associated_tools = [tool.id for tool in server.tools] if server.tools else []
1738 associated_resources = [res.id for res in server.resources] if server.resources else []
1739 associated_prompts = [prompt.id for prompt in server.prompts] if server.prompts else []
1740 event = {
1741 "type": "server_updated",
1742 "data": {
1743 "id": server.id,
1744 "name": server.name,
1745 "description": server.description,
1746 "icon": server.icon,
1747 "associated_tools": associated_tools,
1748 "associated_resources": associated_resources,
1749 "associated_prompts": associated_prompts,
1750 "enabled": server.enabled,
1751 },
1752 "timestamp": datetime.now(timezone.utc).isoformat(),
1753 }
1754 await self._publish_event(event)
1756 async def _notify_server_activated(self, server: DbServer) -> None:
1757 """
1758 Notify subscribers that a server has been activated.
1760 Args:
1761 server: Server to activate
1762 """
1763 event = {
1764 "type": "server_activated",
1765 "data": {
1766 "id": server.id,
1767 "name": server.name,
1768 "enabled": True,
1769 },
1770 "timestamp": datetime.now(timezone.utc).isoformat(),
1771 }
1772 await self._publish_event(event)
1774 async def _notify_server_deactivated(self, server: DbServer) -> None:
1775 """
1776 Notify subscribers that a server has been deactivated.
1778 Args:
1779 server: Server to deactivate
1780 """
1781 event = {
1782 "type": "server_deactivated",
1783 "data": {
1784 "id": server.id,
1785 "name": server.name,
1786 "enabled": False,
1787 },
1788 "timestamp": datetime.now(timezone.utc).isoformat(),
1789 }
1790 await self._publish_event(event)
1792 async def _notify_server_deleted(self, server_info: Dict[str, Any]) -> None:
1793 """
1794 Notify subscribers that a server has been deleted.
1796 Args:
1797 server_info: Dictionary on server to be deleted
1798 """
1799 event = {
1800 "type": "server_deleted",
1801 "data": server_info,
1802 "timestamp": datetime.now(timezone.utc).isoformat(),
1803 }
1804 await self._publish_event(event)
1806 # --- Metrics ---
1807 async def aggregate_metrics(self, db: Session) -> ServerMetrics:
1808 """
1809 Aggregate metrics for all server invocations across all servers.
1811 Combines recent raw metrics (within retention period) with historical
1812 hourly rollups for complete historical coverage. Uses in-memory caching
1813 (10s TTL) to reduce database load under high request rates.
1815 Args:
1816 db: Database session
1818 Returns:
1819 ServerMetrics: Aggregated metrics from raw + hourly rollup tables.
1821 Examples:
1822 >>> from mcpgateway.services.server_service import ServerService
1823 >>> service = ServerService()
1824 >>> # Method exists and is callable
1825 >>> callable(service.aggregate_metrics)
1826 True
1827 """
1828 # Check cache first (if enabled)
1829 # First-Party
1830 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel
1832 if is_cache_enabled():
1833 cached = metrics_cache.get("servers")
1834 if cached is not None:
1835 return ServerMetrics(**cached)
1837 # Use combined raw + rollup query for full historical coverage
1838 # First-Party
1839 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel
1841 result = aggregate_metrics_combined(db, "server")
1843 metrics = ServerMetrics(
1844 total_executions=result.total_executions,
1845 successful_executions=result.successful_executions,
1846 failed_executions=result.failed_executions,
1847 failure_rate=result.failure_rate,
1848 min_response_time=result.min_response_time,
1849 max_response_time=result.max_response_time,
1850 avg_response_time=result.avg_response_time,
1851 last_execution_time=result.last_execution_time,
1852 )
1854 # Cache the result as dict for serialization compatibility (if enabled)
1855 if is_cache_enabled():
1856 metrics_cache.set("servers", metrics.model_dump())
1858 return metrics
1860 async def reset_metrics(self, db: Session) -> None:
1861 """
1862 Reset all server metrics by deleting raw and hourly rollup records.
1864 Args:
1865 db: Database session
1867 Examples:
1868 >>> from mcpgateway.services.server_service import ServerService
1869 >>> from unittest.mock import MagicMock
1870 >>> service = ServerService()
1871 >>> db = MagicMock()
1872 >>> db.execute = MagicMock()
1873 >>> db.commit = MagicMock()
1874 >>> import asyncio
1875 >>> asyncio.run(service.reset_metrics(db))
1876 """
1877 db.execute(delete(ServerMetric))
1878 db.execute(delete(ServerMetricsHourly))
1879 db.commit()
1881 # Invalidate metrics cache
1882 # First-Party
1883 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
1885 metrics_cache.invalidate("servers")
1886 metrics_cache.invalidate_prefix("top_servers:")
1888 def get_oauth_protected_resource_metadata(self, db: Session, server_id: str, resource_base_url: str) -> Dict[str, Any]:
1889 """
1890 Get RFC 9728 OAuth 2.0 Protected Resource Metadata for a server.
1892 This method retrieves the OAuth configuration for a server and formats it
1893 according to RFC 9728 Protected Resource Metadata specification, enabling
1894 MCP clients to discover OAuth authorization servers for browser-based SSO.
1896 Args:
1897 db: Database session.
1898 server_id: The ID of the server.
1899 resource_base_url: The base URL for the resource (e.g., "https://gateway.example.com/servers/abc123/mcp").
1901 Returns:
1902 Dict containing RFC 9728 Protected Resource Metadata:
1903 - resource: The protected resource identifier (URL with /mcp suffix)
1904 - authorization_servers: JSON array of authorization server issuer URIs (RFC 9728 Section 2)
1905 - bearer_methods_supported: Supported bearer token methods (always ["header"])
1906 - scopes_supported: Optional list of supported scopes
1908 Raises:
1909 ServerNotFoundError: If server doesn't exist, is disabled, or is non-public.
1910 ServerError: If OAuth is not enabled or not properly configured.
1912 Examples:
1913 >>> from mcpgateway.services.server_service import ServerService
1914 >>> service = ServerService()
1915 >>> # Method exists and is callable
1916 >>> callable(service.get_oauth_protected_resource_metadata)
1917 True
1918 """
1919 server = db.get(DbServer, server_id)
1921 # Return not found for non-existent, disabled, or non-public servers
1922 # (avoids leaking information about private/team servers)
1923 if not server:
1924 raise ServerNotFoundError(f"Server not found: {server_id}")
1926 if not server.enabled:
1927 raise ServerNotFoundError(f"Server not found: {server_id}")
1929 if getattr(server, "visibility", "public") != "public":
1930 raise ServerNotFoundError(f"Server not found: {server_id}")
1932 # Check OAuth configuration
1933 if not getattr(server, "oauth_enabled", False):
1934 raise ServerError(f"OAuth not enabled for server: {server_id}")
1936 oauth_config = getattr(server, "oauth_config", None)
1937 if not oauth_config:
1938 raise ServerError(f"OAuth not configured for server: {server_id}")
1940 # Extract authorization server(s) - support both list and single value in config
1941 authorization_servers = oauth_config.get("authorization_servers", [])
1942 if not authorization_servers:
1943 auth_server = oauth_config.get("authorization_server")
1944 if auth_server:
1945 authorization_servers = [auth_server] if isinstance(auth_server, str) else auth_server
1947 if not authorization_servers:
1948 raise ServerError(f"OAuth authorization_server not configured for server: {server_id}")
1950 # Build RFC 9728 Protected Resource Metadata response
1951 response_data: Dict[str, Any] = {
1952 "resource": resource_base_url,
1953 "authorization_servers": authorization_servers,
1954 "bearer_methods_supported": ["header"],
1955 }
1957 # Add optional scopes if configured (never include secrets from oauth_config)
1958 scopes = oauth_config.get("scopes_supported") or oauth_config.get("scopes")
1959 if scopes:
1960 response_data["scopes_supported"] = scopes
1962 logger.debug(f"Returning OAuth protected resource metadata for server {server_id}")
1963 return response_data
1966# Lazy singleton - created on first access, not at module import time.
1967# This avoids instantiation when only exception classes are imported.
1968_server_service_instance = None # pylint: disable=invalid-name
1971def __getattr__(name: str):
1972 """Module-level __getattr__ for lazy singleton creation.
1974 Args:
1975 name: The attribute name being accessed.
1977 Returns:
1978 The server_service singleton instance if name is "server_service".
1980 Raises:
1981 AttributeError: If the attribute name is not "server_service".
1982 """
1983 global _server_service_instance # pylint: disable=global-statement
1984 if name == "server_service":
1985 if _server_service_instance is None:
1986 _server_service_instance = ServerService()
1987 return _server_service_instance
1988 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")