Coverage for mcpgateway / services / server_service.py: 100%
605 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/server_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7MCP Gateway Server Service
9This module implements server management for the MCP Servers Catalog.
10It handles server registration, listing, retrieval, updates, activation toggling, and deletion.
11It also publishes event notifications for server changes.
12"""
14# Standard
15import asyncio
16import binascii
17from datetime import datetime, timezone
18from typing import Any, AsyncGenerator, Dict, List, Optional, Union
20# Third-Party
21import httpx
22from pydantic import ValidationError
23from sqlalchemy import and_, delete, desc, or_, select
24from sqlalchemy.exc import IntegrityError, OperationalError
25from sqlalchemy.orm import joinedload, selectinload, Session
27# First-Party
28from mcpgateway.config import settings
29from mcpgateway.db import A2AAgent as DbA2AAgent
30from mcpgateway.db import EmailTeam as DbEmailTeam
31from mcpgateway.db import EmailTeamMember as DbEmailTeamMember
32from mcpgateway.db import get_for_update
33from mcpgateway.db import Prompt as DbPrompt
34from mcpgateway.db import Resource as DbResource
35from mcpgateway.db import Server as DbServer
36from mcpgateway.db import ServerMetric, ServerMetricsHourly
37from mcpgateway.db import Tool as DbTool
38from mcpgateway.schemas import ServerCreate, ServerMetrics, ServerRead, ServerUpdate, TopPerformer
39from mcpgateway.services.audit_trail_service import get_audit_trail_service
40from mcpgateway.services.logging_service import LoggingService
41from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge
42from mcpgateway.services.performance_tracker import get_performance_tracker
43from mcpgateway.services.structured_logger import get_structured_logger
44from mcpgateway.services.team_management_service import TeamManagementService
45from mcpgateway.utils.metrics_common import build_top_performers
46from mcpgateway.utils.pagination import unified_paginate
47from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr
49# Cache import (lazy to avoid circular dependencies)
50_REGISTRY_CACHE = None
53def _get_registry_cache():
54 """Get registry cache singleton lazily.
56 Returns:
57 RegistryCache instance.
58 """
59 global _REGISTRY_CACHE # pylint: disable=global-statement
60 if _REGISTRY_CACHE is None:
61 # First-Party
62 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel
64 _REGISTRY_CACHE = registry_cache
65 return _REGISTRY_CACHE
68# Initialize logging service first
69logging_service = LoggingService()
70logger = logging_service.get_logger(__name__)
73class ServerError(Exception):
74 """Base class for server-related errors."""
77class ServerNotFoundError(ServerError):
78 """Raised when a requested server is not found."""
81class ServerLockConflictError(ServerError):
82 """Raised when a server row is locked by another transaction."""
85class ServerNameConflictError(ServerError):
86 """Raised when a server name conflicts with an existing one."""
88 def __init__(self, name: str, enabled: bool = True, server_id: Optional[str] = None, visibility: str = "public") -> None:
89 """
90 Initialize a ServerNameConflictError exception.
92 This exception indicates a server name conflict, with additional context about visibility,
93 whether the conflicting server is active, and its ID if known. The error message starts
94 with the visibility information.
96 Visibility rules:
97 - public: Restricts server names globally (across all teams).
98 - team: Restricts server names only within the same team.
100 Args:
101 name: The server name that caused the conflict.
102 enabled: Whether the conflicting server is currently active. Defaults to True.
103 server_id: The ID of the conflicting server, if known. Only included in message for inactive servers.
104 visibility: The visibility of the conflicting server (e.g., "public", "private", "team").
106 Examples:
107 >>> error = ServerNameConflictError("My Server")
108 >>> str(error)
109 'Public Server already exists with name: My Server'
110 >>> error = ServerNameConflictError("My Server", enabled=False, server_id=123)
111 >>> str(error)
112 'Public Server already exists with name: My Server (currently inactive, ID: 123)'
113 >>> error.enabled
114 False
115 >>> error.server_id
116 123
117 >>> error = ServerNameConflictError("My Server", enabled=False, visibility="team")
118 >>> str(error)
119 'Team Server already exists with name: My Server (currently inactive, ID: None)'
120 >>> error.enabled
121 False
122 >>> error.server_id is None
123 True
124 """
125 self.name = name
126 self.enabled = enabled
127 self.server_id = server_id
128 message = f"{visibility.capitalize()} Server already exists with name: {name}"
129 if not enabled:
130 message += f" (currently inactive, ID: {server_id})"
131 super().__init__(message)
134class ServerService:
135 """Service for managing MCP Servers in the catalog.
137 Provides methods to create, list, retrieve, update, set state, and delete server records.
138 Also supports event notifications for changes in server data.
139 """
141 def __init__(self) -> None:
142 """Initialize a new ServerService instance.
144 Sets up the service with:
145 - An empty list for event subscribers that will receive server change notifications
146 - An HTTP client configured with timeout and SSL verification settings from config
148 The HTTP client is used for health checks and other server-related HTTP operations.
149 Event subscribers can register to receive notifications about server additions,
150 updates, activations, deactivations, and deletions.
152 Examples:
153 >>> from mcpgateway.services.server_service import ServerService
154 >>> service = ServerService()
155 >>> isinstance(service._event_subscribers, list)
156 True
157 >>> len(service._event_subscribers)
158 0
159 >>> hasattr(service, '_http_client')
160 True
161 """
162 self._event_subscribers: List[asyncio.Queue] = []
163 self._http_client = httpx.AsyncClient(
164 timeout=settings.federation_timeout,
165 verify=not settings.skip_ssl_verify,
166 limits=httpx.Limits(
167 max_connections=settings.httpx_max_connections,
168 max_keepalive_connections=settings.httpx_max_keepalive_connections,
169 keepalive_expiry=settings.httpx_keepalive_expiry,
170 ),
171 )
172 self._structured_logger = get_structured_logger("server_service")
173 self._audit_trail = get_audit_trail_service()
174 self._performance_tracker = get_performance_tracker()
176 async def initialize(self) -> None:
177 """Initialize the server service."""
178 logger.info("Initializing server service")
180 async def shutdown(self) -> None:
181 """Shutdown the server service."""
182 await self._http_client.aclose()
183 logger.info("Server service shutdown complete")
185 # get_top_server
186 async def get_top_servers(self, db: Session, limit: Optional[int] = 5, include_deleted: bool = False) -> List[TopPerformer]:
187 """Retrieve the top-performing servers based on execution count.
189 Queries the database to get servers with their metrics, ordered by the number of executions
190 in descending order. Combines recent raw metrics with historical hourly rollups for complete
191 historical coverage. Returns a list of TopPerformer objects containing server details and
192 performance metrics. Results are cached for performance.
194 Args:
195 db (Session): Database session for querying server metrics.
196 limit (Optional[int]): Maximum number of servers to return. Defaults to 5.
197 include_deleted (bool): Whether to include deleted servers from rollups.
199 Returns:
200 List[TopPerformer]: A list of TopPerformer objects, each containing:
201 - id: Server ID.
202 - name: Server name.
203 - execution_count: Total number of executions.
204 - avg_response_time: Average response time in seconds, or None if no metrics.
205 - success_rate: Success rate percentage, or None if no metrics.
206 - last_execution: Timestamp of the last execution, or None if no metrics.
207 """
208 # Check cache first (if enabled)
209 # First-Party
210 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel
212 effective_limit = limit or 5
213 cache_key = f"top_servers:{effective_limit}:include_deleted={include_deleted}"
215 if is_cache_enabled():
216 cached = metrics_cache.get(cache_key)
217 if cached is not None:
218 return cached
220 # Use combined query that includes both raw metrics and rollup data
221 # First-Party
222 from mcpgateway.services.metrics_query_service import get_top_performers_combined # pylint: disable=import-outside-toplevel
224 results = get_top_performers_combined(
225 db=db,
226 metric_type="server",
227 entity_model=DbServer,
228 limit=effective_limit,
229 include_deleted=include_deleted,
230 )
231 top_performers = build_top_performers(results)
233 # Cache the result (if enabled)
234 if is_cache_enabled():
235 metrics_cache.set(cache_key, top_performers)
237 return top_performers
239 def convert_server_to_read(self, server: DbServer, include_metrics: bool = False) -> ServerRead:
240 """
241 Converts a DbServer instance into a ServerRead model, optionally including aggregated metrics.
243 Args:
244 server (DbServer): The ORM instance of the server.
245 include_metrics (bool): Whether to include metrics in the result. Defaults to False.
246 Set to False for list operations to avoid N+1 query issues.
248 Returns:
249 ServerRead: The Pydantic model representing the server, optionally including aggregated metrics.
251 Examples:
252 >>> from types import SimpleNamespace
253 >>> from datetime import datetime, timezone
254 >>> svc = ServerService()
255 >>> now = datetime.now(timezone.utc)
256 >>> # Fake metric objects
257 >>> m1 = SimpleNamespace(is_success=True, response_time=0.2, timestamp=now)
258 >>> m2 = SimpleNamespace(is_success=False, response_time=0.4, timestamp=now)
259 >>> server = SimpleNamespace(
260 ... id='s1', name='S', description=None, icon=None,
261 ... created_at=now, updated_at=now, enabled=True,
262 ... associated_tools=[], associated_resources=[], associated_prompts=[], associated_a2a_agents=[],
263 ... tags=[], metrics=[m1, m2],
264 ... tools=[], resources=[], prompts=[], a2a_agents=[],
265 ... team_id=None, owner_email=None, visibility=None,
266 ... created_by=None, modified_by=None
267 ... )
268 >>> result = svc.convert_server_to_read(server, include_metrics=True)
269 >>> result.metrics.total_executions
270 2
271 >>> result.metrics.successful_executions
272 1
273 """
274 # Build dict explicitly from attributes to ensure SQLAlchemy populates them
275 # (using __dict__.copy() can return empty dict with certain query patterns)
276 server_dict = {
277 "id": server.id,
278 "name": server.name,
279 "description": server.description,
280 "icon": server.icon,
281 "enabled": server.enabled,
282 "created_at": server.created_at,
283 "updated_at": server.updated_at,
284 "team_id": server.team_id,
285 "owner_email": server.owner_email,
286 "visibility": server.visibility,
287 "created_by": server.created_by,
288 "created_from_ip": getattr(server, "created_from_ip", None),
289 "created_via": getattr(server, "created_via", None),
290 "created_user_agent": getattr(server, "created_user_agent", None),
291 "modified_by": server.modified_by,
292 "modified_from_ip": getattr(server, "modified_from_ip", None),
293 "modified_via": getattr(server, "modified_via", None),
294 "modified_user_agent": getattr(server, "modified_user_agent", None),
295 "import_batch_id": getattr(server, "import_batch_id", None),
296 "federation_source": getattr(server, "federation_source", None),
297 "version": getattr(server, "version", None),
298 "tags": server.tags or [],
299 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata
300 "oauth_enabled": getattr(server, "oauth_enabled", False),
301 "oauth_config": getattr(server, "oauth_config", None),
302 }
304 # Compute aggregated metrics only if requested (avoids N+1 queries in list operations)
305 if include_metrics:
306 total = 0
307 successful = 0
308 failed = 0
309 min_rt = None
310 max_rt = None
311 sum_rt = 0.0
312 last_time = None
314 if hasattr(server, "metrics") and server.metrics:
315 for m in server.metrics:
316 total += 1
317 if m.is_success:
318 successful += 1
319 else:
320 failed += 1
322 # Track min/max response times
323 if min_rt is None or m.response_time < min_rt:
324 min_rt = m.response_time
325 if max_rt is None or m.response_time > max_rt:
326 max_rt = m.response_time
328 sum_rt += m.response_time
330 # Track last execution time
331 if last_time is None or m.timestamp > last_time:
332 last_time = m.timestamp
334 failure_rate = (failed / total) if total > 0 else 0.0
335 avg_rt = (sum_rt / total) if total > 0 else None
337 server_dict["metrics"] = {
338 "total_executions": total,
339 "successful_executions": successful,
340 "failed_executions": failed,
341 "failure_rate": failure_rate,
342 "min_response_time": min_rt,
343 "max_response_time": max_rt,
344 "avg_response_time": avg_rt,
345 "last_execution_time": last_time,
346 }
347 else:
348 server_dict["metrics"] = None
349 # Add associated IDs from relationships
350 server_dict["associated_tools"] = [tool.name for tool in server.tools] if server.tools else []
351 server_dict["associated_resources"] = [res.id for res in server.resources] if server.resources else []
352 server_dict["associated_prompts"] = [prompt.id for prompt in server.prompts] if server.prompts else []
353 server_dict["associated_a2a_agents"] = [agent.id for agent in server.a2a_agents] if server.a2a_agents else []
355 # Team name is loaded via server.team property from email_team relationship
356 server_dict["team"] = getattr(server, "team", None)
358 return ServerRead.model_validate(server_dict)
360 def _assemble_associated_items(
361 self,
362 tools: Optional[List[str]],
363 resources: Optional[List[str]],
364 prompts: Optional[List[str]],
365 a2a_agents: Optional[List[str]] = None,
366 gateways: Optional[List[str]] = None,
367 ) -> Dict[str, Any]:
368 """
369 Assemble the associated items dictionary from the separate fields.
371 Args:
372 tools: List of tool IDs.
373 resources: List of resource IDs.
374 prompts: List of prompt IDs.
375 a2a_agents: List of A2A agent IDs.
376 gateways: List of gateway IDs.
378 Returns:
379 A dictionary with keys "tools", "resources", "prompts", "a2a_agents", and "gateways".
381 Examples:
382 >>> service = ServerService()
383 >>> # Test with all None values
384 >>> result = service._assemble_associated_items(None, None, None)
385 >>> result
386 {'tools': [], 'resources': [], 'prompts': [], 'a2a_agents': [], 'gateways': []}
388 >>> # Test with empty lists
389 >>> result = service._assemble_associated_items([], [], [])
390 >>> result
391 {'tools': [], 'resources': [], 'prompts': [], 'a2a_agents': [], 'gateways': []}
393 >>> # Test with actual values
394 >>> result = service._assemble_associated_items(['tool1', 'tool2'], ['res1'], ['prompt1'])
395 >>> result
396 {'tools': ['tool1', 'tool2'], 'resources': ['res1'], 'prompts': ['prompt1'], 'a2a_agents': [], 'gateways': []}
398 >>> # Test with mixed None and values
399 >>> result = service._assemble_associated_items(['tool1'], None, ['prompt1'])
400 >>> result
401 {'tools': ['tool1'], 'resources': [], 'prompts': ['prompt1'], 'a2a_agents': [], 'gateways': []}
402 """
403 return {
404 "tools": tools or [],
405 "resources": resources or [],
406 "prompts": prompts or [],
407 "a2a_agents": a2a_agents or [],
408 "gateways": gateways or [],
409 }
411 async def register_server(
412 self,
413 db: Session,
414 server_in: ServerCreate,
415 created_by: Optional[str] = None,
416 created_from_ip: Optional[str] = None,
417 created_via: Optional[str] = None,
418 created_user_agent: Optional[str] = None,
419 team_id: Optional[str] = None,
420 owner_email: Optional[str] = None,
421 visibility: Optional[str] = "public",
422 ) -> ServerRead:
423 """
424 Register a new server in the catalog and validate that all associated items exist.
426 This function performs the following steps:
427 1. Checks if a server with the same name already exists.
428 2. Creates a new server record.
429 3. For each ID provided in associated_tools, associated_resources, and associated_prompts,
430 verifies that the corresponding item exists. If an item does not exist, an error is raised.
431 4. Associates the verified items to the new server.
432 5. Commits the transaction, refreshes the ORM instance, and forces the loading of relationship data.
433 6. Constructs a response dictionary that includes lists of associated item IDs.
434 7. Notifies subscribers of the addition and returns the validated response.
436 Args:
437 db (Session): The SQLAlchemy database session.
438 server_in (ServerCreate): The server creation schema containing server details and lists of
439 associated tool, resource, and prompt IDs (as strings).
440 created_by (Optional[str]): Email of the user creating the server, used for ownership tracking.
441 created_from_ip (Optional[str]): IP address from which the creation request originated.
442 created_via (Optional[str]): Source of creation (api, ui, import).
443 created_user_agent (Optional[str]): User agent string from the creation request.
444 team_id (Optional[str]): Team ID to assign the server to.
445 owner_email (Optional[str]): Email of the user who owns this server.
446 visibility (str): Server visibility level (private, team, public).
448 Returns:
449 ServerRead: The newly created server, with associated item IDs.
451 Raises:
452 IntegrityError: If a database integrity error occurs.
453 ServerNameConflictError: If a server name conflict occurs (public or team visibility).
454 ServerError: If any associated tool, resource, or prompt does not exist, or if any other registration error occurs.
456 Examples:
457 >>> from mcpgateway.services.server_service import ServerService
458 >>> from unittest.mock import MagicMock, AsyncMock, patch
459 >>> from mcpgateway.schemas import ServerRead
460 >>> service = ServerService()
461 >>> db = MagicMock()
462 >>> server_in = MagicMock()
463 >>> server_in.id = None # No custom UUID for this test
464 >>> db.execute.return_value.scalar_one_or_none.return_value = None
465 >>> db.add = MagicMock()
466 >>> db.commit = MagicMock()
467 >>> db.refresh = MagicMock()
468 >>> service._notify_server_added = AsyncMock()
469 >>> service.convert_server_to_read = MagicMock(return_value='server_read')
470 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes
471 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes
472 >>> ServerRead.model_validate = MagicMock(return_value='server_read')
473 >>> import asyncio
474 >>> asyncio.run(service.register_server(db, server_in))
475 'server_read'
476 """
477 try:
478 logger.info(f"Registering server: {server_in.name}")
479 # # Create the new server record.
480 db_server = DbServer(
481 name=server_in.name,
482 description=server_in.description,
483 icon=server_in.icon,
484 enabled=True,
485 tags=server_in.tags or [],
486 # Team scoping fields - use schema values if provided, otherwise fallback to parameters
487 team_id=getattr(server_in, "team_id", None) or team_id,
488 owner_email=getattr(server_in, "owner_email", None) or owner_email or created_by,
489 # IMPORTANT: Prefer function parameter over schema default
490 # The API has visibility as a separate Body param that should override schema default
491 visibility=visibility or getattr(server_in, "visibility", None) or "public",
492 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata
493 oauth_enabled=getattr(server_in, "oauth_enabled", False) or False,
494 oauth_config=getattr(server_in, "oauth_config", None),
495 # Metadata fields
496 created_by=created_by,
497 created_from_ip=created_from_ip,
498 created_via=created_via,
499 created_user_agent=created_user_agent,
500 version=1,
501 )
502 # Check for existing server with the same name (with row locking to prevent race conditions)
503 # The unique constraint is on (team_id, owner_email, name), so we check based on that
504 owner_email_to_check = getattr(server_in, "owner_email", None) or owner_email or created_by
505 team_id_to_check = getattr(server_in, "team_id", None) or team_id
507 # Build conditions based on the actual unique constraint: (team_id, owner_email, name)
508 conditions = [
509 DbServer.name == server_in.name,
510 DbServer.team_id == team_id_to_check if team_id_to_check else DbServer.team_id.is_(None),
511 DbServer.owner_email == owner_email_to_check if owner_email_to_check else DbServer.owner_email.is_(None),
512 ]
513 if server_in.id:
514 conditions.append(DbServer.id != server_in.id)
516 existing_server = get_for_update(db, DbServer, where=and_(*conditions))
517 if existing_server:
518 raise ServerNameConflictError(server_in.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility)
519 # Set custom UUID if provided
520 if server_in.id:
521 logger.info(f"Setting custom UUID for server: {server_in.id}")
522 db_server.id = server_in.id
523 logger.info(f"Adding server to DB session: {db_server.name}")
524 db.add(db_server)
526 # Associate tools, verifying each exists using bulk query when multiple items
527 if server_in.associated_tools:
528 tool_ids = [tool_id.strip() for tool_id in server_in.associated_tools if tool_id.strip()]
529 if len(tool_ids) > 1:
530 # Use bulk query for multiple items
531 tools = db.execute(select(DbTool).where(DbTool.id.in_(tool_ids))).scalars().all()
532 found_tool_ids = {tool.id for tool in tools}
533 missing_tool_ids = set(tool_ids) - found_tool_ids
534 if missing_tool_ids:
535 raise ServerError(f"Tools with ids {missing_tool_ids} do not exist.")
536 db_server.tools.extend(tools)
537 elif tool_ids:
538 # Use single query for single item (maintains test compatibility)
539 tool_obj = db.get(DbTool, tool_ids[0])
540 if not tool_obj:
541 raise ServerError(f"Tool with id {tool_ids[0]} does not exist.")
542 db_server.tools.append(tool_obj)
544 # Associate resources, verifying each exists using bulk query when multiple items
545 if server_in.associated_resources:
546 resource_ids = [resource_id.strip() for resource_id in server_in.associated_resources if resource_id.strip()]
547 if len(resource_ids) > 1:
548 # Use bulk query for multiple items
549 resources = db.execute(select(DbResource).where(DbResource.id.in_(resource_ids))).scalars().all()
550 found_resource_ids = {resource.id for resource in resources}
551 missing_resource_ids = set(resource_ids) - found_resource_ids
552 if missing_resource_ids:
553 raise ServerError(f"Resources with ids {missing_resource_ids} do not exist.")
554 db_server.resources.extend(resources)
555 elif resource_ids:
556 # Use single query for single item (maintains test compatibility)
557 resource_obj = db.get(DbResource, resource_ids[0])
558 if not resource_obj:
559 raise ServerError(f"Resource with id {resource_ids[0]} does not exist.")
560 db_server.resources.append(resource_obj)
562 # Associate prompts, verifying each exists using bulk query when multiple items
563 if server_in.associated_prompts:
564 prompt_ids = [prompt_id.strip() for prompt_id in server_in.associated_prompts if prompt_id.strip()]
565 if len(prompt_ids) > 1:
566 # Use bulk query for multiple items
567 prompts = db.execute(select(DbPrompt).where(DbPrompt.id.in_(prompt_ids))).scalars().all()
568 found_prompt_ids = {prompt.id for prompt in prompts}
569 missing_prompt_ids = set(prompt_ids) - found_prompt_ids
570 if missing_prompt_ids:
571 raise ServerError(f"Prompts with ids {missing_prompt_ids} do not exist.")
572 db_server.prompts.extend(prompts)
573 elif prompt_ids:
574 # Use single query for single item (maintains test compatibility)
575 prompt_obj = db.get(DbPrompt, prompt_ids[0])
576 if not prompt_obj:
577 raise ServerError(f"Prompt with id {prompt_ids[0]} does not exist.")
578 db_server.prompts.append(prompt_obj)
580 # Associate A2A agents, verifying each exists using bulk query when multiple items
581 if server_in.associated_a2a_agents:
582 agent_ids = [agent_id.strip() for agent_id in server_in.associated_a2a_agents if agent_id.strip()]
583 if len(agent_ids) > 1:
584 # Use bulk query for multiple items
585 agents = db.execute(select(DbA2AAgent).where(DbA2AAgent.id.in_(agent_ids))).scalars().all()
586 found_agent_ids = {agent.id for agent in agents}
587 missing_agent_ids = set(agent_ids) - found_agent_ids
588 if missing_agent_ids:
589 raise ServerError(f"A2A Agents with ids {missing_agent_ids} do not exist.")
590 db_server.a2a_agents.extend(agents)
592 # Note: Auto-tool creation for A2A agents should be handled
593 # by a separate service or background task to avoid circular imports
594 for agent in agents:
595 logger.info(f"A2A agent {agent.name} associated with server {db_server.name}")
596 elif agent_ids:
597 # Use single query for single item (maintains test compatibility)
598 agent_obj = db.get(DbA2AAgent, agent_ids[0])
599 if not agent_obj:
600 raise ServerError(f"A2A Agent with id {agent_ids[0]} does not exist.")
601 db_server.a2a_agents.append(agent_obj)
602 logger.info(f"A2A agent {agent_obj.name} associated with server {db_server.name}")
604 # Commit the new record and refresh.
605 db.commit()
606 db.refresh(db_server)
607 # Force load the relationship attributes.
608 _ = db_server.tools, db_server.resources, db_server.prompts, db_server.a2a_agents
610 # Assemble response data with associated item IDs.
611 server_data = {
612 "id": db_server.id,
613 "name": db_server.name,
614 "description": db_server.description,
615 "icon": db_server.icon,
616 "created_at": db_server.created_at,
617 "updated_at": db_server.updated_at,
618 "enabled": db_server.enabled,
619 "associated_tools": [str(tool.id) for tool in db_server.tools],
620 "associated_resources": [str(resource.id) for resource in db_server.resources],
621 "associated_prompts": [str(prompt.id) for prompt in db_server.prompts],
622 }
623 logger.debug(f"Server Data: {server_data}")
624 await self._notify_server_added(db_server)
625 logger.info(f"Registered server: {server_in.name}")
627 # Structured logging: Audit trail for server creation
628 self._audit_trail.log_action(
629 user_id=created_by or "system",
630 action="create_server",
631 resource_type="server",
632 resource_id=db_server.id,
633 details={
634 "server_name": db_server.name,
635 "visibility": visibility,
636 "team_id": team_id,
637 "associated_tools_count": len(db_server.tools),
638 "associated_resources_count": len(db_server.resources),
639 "associated_prompts_count": len(db_server.prompts),
640 "associated_a2a_agents_count": len(db_server.a2a_agents),
641 },
642 metadata={
643 "created_from_ip": created_from_ip,
644 "created_via": created_via,
645 "created_user_agent": created_user_agent,
646 },
647 )
649 # Structured logging: Log successful server creation
650 self._structured_logger.log(
651 level="INFO",
652 message="Server created successfully",
653 event_type="server_created",
654 component="server_service",
655 server_id=db_server.id,
656 server_name=db_server.name,
657 visibility=visibility,
658 created_by=created_by,
659 user_email=created_by,
660 )
662 # Team name is loaded via db_server.team property from email_team relationship
663 return self.convert_server_to_read(db_server)
664 except IntegrityError as ie:
665 db.rollback()
666 logger.error(f"IntegrityErrors in group: {ie}")
668 # Structured logging: Log database integrity error
669 self._structured_logger.log(
670 level="ERROR",
671 message="Server creation failed due to database integrity error",
672 event_type="server_creation_failed",
673 component="server_service",
674 server_name=server_in.name,
675 error_type="IntegrityError",
676 error_message=str(ie),
677 created_by=created_by,
678 user_email=created_by,
679 )
680 raise ie
681 except ServerNameConflictError as se:
682 db.rollback()
684 # Structured logging: Log name conflict error
685 self._structured_logger.log(
686 level="WARNING",
687 message="Server creation failed due to name conflict",
688 event_type="server_name_conflict",
689 component="server_service",
690 server_name=server_in.name,
691 visibility=visibility,
692 created_by=created_by,
693 user_email=created_by,
694 )
695 raise se
696 except Exception as ex:
697 db.rollback()
699 # Structured logging: Log generic server creation failure
700 self._structured_logger.log(
701 level="ERROR",
702 message="Server creation failed",
703 event_type="server_creation_failed",
704 component="server_service",
705 server_name=server_in.name,
706 error_type=type(ex).__name__,
707 error_message=str(ex),
708 created_by=created_by,
709 user_email=created_by,
710 )
711 raise ServerError(f"Failed to register server: {str(ex)}")
713 async def list_servers(
714 self,
715 db: Session,
716 include_inactive: bool = False,
717 tags: Optional[List[str]] = None,
718 cursor: Optional[str] = None,
719 limit: Optional[int] = None,
720 page: Optional[int] = None,
721 per_page: Optional[int] = None,
722 user_email: Optional[str] = None,
723 team_id: Optional[str] = None,
724 visibility: Optional[str] = None,
725 token_teams: Optional[List[str]] = None,
726 ) -> Union[tuple[List[ServerRead], Optional[str]], Dict[str, Any]]:
727 """List all registered servers with cursor or page-based pagination and optional team filtering.
729 Args:
730 db: Database session.
731 include_inactive: Whether to include inactive servers.
732 tags: Filter servers by tags. If provided, only servers with at least one matching tag will be returned.
733 cursor: Cursor for pagination (encoded last created_at and id).
734 limit: Maximum number of servers to return. None for default, 0 for unlimited.
735 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
736 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size.
737 user_email: Email of user for team-based access control. None for no access control.
738 team_id: Optional team ID to filter by specific team (requires user_email).
739 visibility: Optional visibility filter (private, team, public) (requires user_email).
740 token_teams: Optional list of team IDs from the token (None=unrestricted, []=public-only).
742 Returns:
743 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}}
744 If cursor is provided or neither: tuple of (list of ServerRead objects, next_cursor).
746 Examples:
747 >>> from mcpgateway.services.server_service import ServerService
748 >>> from unittest.mock import MagicMock
749 >>> service = ServerService()
750 >>> db = MagicMock()
751 >>> server_read = MagicMock()
752 >>> service.convert_server_to_read = MagicMock(return_value=server_read)
753 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()]
754 >>> import asyncio
755 >>> servers, cursor = asyncio.run(service.list_servers(db))
756 >>> isinstance(servers, list) and cursor is None
757 True
758 """
759 # Check cache for first page only
760 # SECURITY: Only cache public-only results (token_teams=[])
761 # - token_teams=None (admin bypass): Don't cache - admin sees all, should be fresh
762 # - token_teams=[] (public-only): Cache - same result for all public-only users
763 # - token_teams=[...] (team-scoped): Don't cache - results vary by team
764 # - user_email set: Don't cache - results vary by user ownership
765 cache = _get_registry_cache()
766 is_public_only = token_teams is not None and len(token_teams) == 0
767 use_cache = cursor is None and user_email is None and page is None and is_public_only
768 if use_cache:
769 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None)
770 cached = await cache.get("servers", filters_hash)
771 if cached is not None:
772 # Reconstruct ServerRead objects from cached dicts
773 cached_servers = [ServerRead.model_validate(s) for s in cached["servers"]]
774 return (cached_servers, cached.get("next_cursor"))
776 # Build base query with ordering and eager load relationships to avoid N+1
777 query = (
778 select(DbServer)
779 .options(
780 selectinload(DbServer.tools),
781 selectinload(DbServer.resources),
782 selectinload(DbServer.prompts),
783 selectinload(DbServer.a2a_agents),
784 joinedload(DbServer.email_team),
785 )
786 .order_by(desc(DbServer.created_at), desc(DbServer.id))
787 )
789 # Apply active/inactive filter
790 if not include_inactive:
791 query = query.where(DbServer.enabled)
793 # SECURITY: Apply token-based access control based on normalized token_teams
794 # - token_teams is None: admin bypass (is_admin=true with explicit null teams) - sees all
795 # - token_teams is []: public-only access (missing teams or explicit empty)
796 # - token_teams is [...]: access to specified teams + public + user's own
797 if token_teams is not None:
798 if len(token_teams) == 0:
799 # Public-only token: only access public servers
800 query = query.where(DbServer.visibility == "public")
801 else:
802 # Team-scoped token: public servers + servers in allowed teams + user's own
803 access_conditions = [
804 DbServer.visibility == "public",
805 and_(DbServer.team_id.in_(token_teams), DbServer.visibility.in_(["team", "public"])),
806 ]
807 if user_email:
808 access_conditions.append(and_(DbServer.owner_email == user_email, DbServer.visibility == "private"))
809 query = query.where(or_(*access_conditions))
811 if visibility:
812 query = query.where(DbServer.visibility == visibility)
814 # Apply team-based access control if user_email is provided (and no token_teams filtering)
815 elif user_email:
816 team_service = TeamManagementService(db)
817 user_teams = await team_service.get_user_teams(user_email)
818 team_ids = [team.id for team in user_teams]
820 if team_id:
821 # User requesting specific team - verify access
822 if team_id not in team_ids:
823 return ([], None)
824 access_conditions = [
825 and_(DbServer.team_id == team_id, DbServer.visibility.in_(["team", "public"])),
826 and_(DbServer.team_id == team_id, DbServer.owner_email == user_email),
827 ]
828 query = query.where(or_(*access_conditions))
829 else:
830 # General access: user's servers + public servers + team servers
831 access_conditions = [
832 DbServer.owner_email == user_email,
833 DbServer.visibility == "public",
834 ]
835 if team_ids:
836 access_conditions.append(and_(DbServer.team_id.in_(team_ids), DbServer.visibility.in_(["team", "public"])))
837 query = query.where(or_(*access_conditions))
839 if visibility:
840 query = query.where(DbServer.visibility == visibility)
842 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats)
843 if tags:
844 query = query.where(json_contains_tag_expr(db, DbServer.tags, tags, match_any=True))
846 # Use unified pagination helper - handles both page and cursor pagination
847 pag_result = await unified_paginate(
848 db=db,
849 query=query,
850 page=page,
851 per_page=per_page,
852 cursor=cursor,
853 limit=limit,
854 base_url="/admin/servers", # Used for page-based links
855 query_params={"include_inactive": include_inactive} if include_inactive else {},
856 )
858 next_cursor = None
859 # Extract servers based on pagination type
860 if page is not None:
861 # Page-based: pag_result is a dict
862 servers_db = pag_result["data"]
863 else:
864 # Cursor-based: pag_result is a tuple
865 servers_db, next_cursor = pag_result
867 db.commit() # Release transaction to avoid idle-in-transaction
869 # Convert to ServerRead (common for both pagination types)
870 # Team names are loaded via joinedload(DbServer.email_team)
871 result = []
872 for s in servers_db:
873 try:
874 result.append(self.convert_server_to_read(s, include_metrics=False))
875 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
876 logger.exception(f"Failed to convert server {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}")
877 # Continue with remaining servers instead of failing completely
879 # Return appropriate format based on pagination type
880 if page is not None:
881 # Page-based format
882 return {
883 "data": result,
884 "pagination": pag_result["pagination"],
885 "links": pag_result["links"],
886 }
888 # Cursor-based format
890 # Cache first page results - only for public-only queries (no user/team filtering)
891 # SECURITY: Only cache public-only results (token_teams=[]), never admin bypass or team-scoped
892 if cursor is None and user_email is None and is_public_only:
893 try:
894 cache_data = {"servers": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor}
895 await cache.set("servers", cache_data, filters_hash)
896 except AttributeError:
897 pass # Skip caching if result objects don't support model_dump (e.g., in doctests)
899 return (result, next_cursor)
901 async def list_servers_for_user(
902 self, db: Session, user_email: str, team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100
903 ) -> List[ServerRead]:
904 """
905 DEPRECATED: Use list_servers() with user_email parameter instead.
907 This method is maintained for backward compatibility but is no longer used.
908 New code should call list_servers() with user_email, team_id, and visibility parameters.
910 List servers user has access to with team filtering.
912 Args:
913 db: Database session
914 user_email: Email of the user requesting servers
915 team_id: Optional team ID to filter by specific team
916 visibility: Optional visibility filter (private, team, public)
917 include_inactive: Whether to include inactive servers
918 skip: Number of servers to skip for pagination
919 limit: Maximum number of servers to return
921 Returns:
922 List[ServerRead]: Servers the user has access to
923 """
924 # Build query following existing patterns from list_servers()
925 team_service = TeamManagementService(db)
926 user_teams = await team_service.get_user_teams(user_email)
927 team_ids = [team.id for team in user_teams]
929 # Eager load relationships to avoid N+1 queries
930 query = select(DbServer).options(
931 selectinload(DbServer.tools),
932 selectinload(DbServer.resources),
933 selectinload(DbServer.prompts),
934 selectinload(DbServer.a2a_agents),
935 joinedload(DbServer.email_team),
936 )
938 # Apply active/inactive filter
939 if not include_inactive:
940 query = query.where(DbServer.enabled)
942 if team_id:
943 if team_id not in team_ids:
944 return [] # No access to team
946 access_conditions = []
947 # Filter by specific team
948 access_conditions.append(and_(DbServer.team_id == team_id, DbServer.visibility.in_(["team", "public"])))
950 access_conditions.append(and_(DbServer.team_id == team_id, DbServer.owner_email == user_email))
952 query = query.where(or_(*access_conditions))
953 else:
954 # Get user's accessible teams
955 # Build access conditions following existing patterns
956 access_conditions = []
958 # 1. User's personal resources (owner_email matches)
959 access_conditions.append(DbServer.owner_email == user_email)
961 # 2. Team resources where user is member
962 if team_ids:
963 access_conditions.append(and_(DbServer.team_id.in_(team_ids), DbServer.visibility.in_(["team", "public"])))
965 # 3. Public resources (if visibility allows)
966 access_conditions.append(DbServer.visibility == "public")
968 query = query.where(or_(*access_conditions))
970 # Apply visibility filter if specified
971 if visibility:
972 query = query.where(DbServer.visibility == visibility)
974 # Apply pagination following existing patterns
975 query = query.offset(skip).limit(limit)
977 servers = db.execute(query).scalars().all()
979 db.commit() # Release transaction to avoid idle-in-transaction
981 # Skip metrics to avoid N+1 queries in list operations
982 # Team names are loaded via joinedload(DbServer.email_team)
983 result = []
984 for s in servers:
985 try:
986 result.append(self.convert_server_to_read(s, include_metrics=False))
987 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
988 logger.exception(f"Failed to convert server {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}")
989 # Continue with remaining servers instead of failing completely
990 return result
992 async def get_server(self, db: Session, server_id: str) -> ServerRead:
993 """Retrieve server details by ID.
995 Args:
996 db: Database session.
997 server_id: The unique identifier of the server.
999 Returns:
1000 The corresponding ServerRead object.
1002 Raises:
1003 ServerNotFoundError: If no server with the given ID exists.
1005 Examples:
1006 >>> from mcpgateway.services.server_service import ServerService
1007 >>> from unittest.mock import MagicMock
1008 >>> service = ServerService()
1009 >>> db = MagicMock()
1010 >>> server = MagicMock()
1011 >>> db.get.return_value = server
1012 >>> service.convert_server_to_read = MagicMock(return_value='server_read')
1013 >>> import asyncio
1014 >>> asyncio.run(service.get_server(db, 'server_id'))
1015 'server_read'
1016 """
1017 server = db.execute(
1018 select(DbServer)
1019 .options(
1020 selectinload(DbServer.tools),
1021 selectinload(DbServer.resources),
1022 selectinload(DbServer.prompts),
1023 selectinload(DbServer.a2a_agents),
1024 joinedload(DbServer.email_team),
1025 )
1026 .where(DbServer.id == server_id)
1027 ).scalar_one_or_none()
1028 if not server:
1029 raise ServerNotFoundError(f"Server not found: {server_id}")
1030 server_data = {
1031 "id": server.id,
1032 "name": server.name,
1033 "description": server.description,
1034 "icon": server.icon,
1035 "created_at": server.created_at,
1036 "updated_at": server.updated_at,
1037 "enabled": server.enabled,
1038 "associated_tools": [tool.name for tool in server.tools],
1039 "associated_resources": [res.id for res in server.resources],
1040 "associated_prompts": [prompt.id for prompt in server.prompts],
1041 }
1042 logger.debug(f"Server Data: {server_data}")
1043 # Team name is loaded via server.team property from email_team relationship
1044 server_read = self.convert_server_to_read(server)
1046 self._structured_logger.log(
1047 level="INFO",
1048 message="Server retrieved successfully",
1049 event_type="server_viewed",
1050 component="server_service",
1051 server_id=server.id,
1052 server_name=server.name,
1053 team_id=getattr(server, "team_id", None),
1054 resource_type="server",
1055 resource_id=server.id,
1056 custom_fields={
1057 "enabled": server.enabled,
1058 "tool_count": len(getattr(server, "tools", []) or []),
1059 "resource_count": len(getattr(server, "resources", []) or []),
1060 "prompt_count": len(getattr(server, "prompts", []) or []),
1061 },
1062 db=db,
1063 )
1065 self._audit_trail.log_action(
1066 action="view_server",
1067 resource_type="server",
1068 resource_id=server.id,
1069 resource_name=server.name,
1070 user_id="system",
1071 team_id=getattr(server, "team_id", None),
1072 context={"enabled": server.enabled},
1073 db=db,
1074 )
1076 return server_read
1078 async def update_server(
1079 self,
1080 db: Session,
1081 server_id: str,
1082 server_update: ServerUpdate,
1083 user_email: str,
1084 modified_by: Optional[str] = None,
1085 modified_from_ip: Optional[str] = None,
1086 modified_via: Optional[str] = None,
1087 modified_user_agent: Optional[str] = None,
1088 ) -> ServerRead:
1089 """Update an existing server.
1091 Args:
1092 db: Database session.
1093 server_id: The unique identifier of the server.
1094 server_update: Server update schema with new data.
1095 user_email: email of the user performing the update (for permission checks).
1096 modified_by: Username who modified this server.
1097 modified_from_ip: IP address from which modification was made.
1098 modified_via: Source of modification (api, ui, etc.).
1099 modified_user_agent: User agent of the client making the modification.
1101 Returns:
1102 The updated ServerRead object.
1104 Raises:
1105 ServerNotFoundError: If the server is not found.
1106 PermissionError: If user doesn't own the server.
1107 ServerNameConflictError: If a new name conflicts with an existing server.
1108 ServerError: For other update errors.
1109 IntegrityError: If a database integrity error occurs.
1110 ValueError: If visibility or team constraints are violated.
1112 Examples:
1113 >>> from mcpgateway.services.server_service import ServerService
1114 >>> from unittest.mock import MagicMock, AsyncMock, patch
1115 >>> from mcpgateway.schemas import ServerRead
1116 >>> service = ServerService()
1117 >>> db = MagicMock()
1118 >>> server = MagicMock()
1119 >>> server.id = 'server_id'
1120 >>> server.name = 'test_server'
1121 >>> server.owner_email = 'user_email' # Set owner to match user performing update
1122 >>> server.team_id = None
1123 >>> server.visibility = 'public'
1124 >>> db.get.return_value = server
1125 >>> db.commit = MagicMock()
1126 >>> db.refresh = MagicMock()
1127 >>> db.execute.return_value.scalar_one_or_none.return_value = None
1128 >>> service.convert_server_to_read = MagicMock(return_value='server_read')
1129 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes
1130 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes
1131 >>> ServerRead.model_validate = MagicMock(return_value='server_read')
1132 >>> server_update = MagicMock()
1133 >>> server_update.id = None # No UUID change
1134 >>> server_update.name = None # No name change
1135 >>> server_update.description = None
1136 >>> server_update.icon = None
1137 >>> server_update.visibility = None
1138 >>> server_update.team_id = None
1139 >>> import asyncio
1140 >>> with patch('mcpgateway.services.server_service.get_for_update', return_value=server):
1141 ... asyncio.run(service.update_server(db, 'server_id', server_update, 'user_email'))
1142 'server_read'
1143 """
1144 try:
1145 server = get_for_update(
1146 db,
1147 DbServer,
1148 server_id,
1149 options=[
1150 selectinload(DbServer.tools),
1151 selectinload(DbServer.resources),
1152 selectinload(DbServer.prompts),
1153 selectinload(DbServer.a2a_agents),
1154 selectinload(DbServer.email_team),
1155 ],
1156 )
1157 if not server:
1158 raise ServerNotFoundError(f"Server not found: {server_id}")
1160 # Check ownership if user_email provided
1161 if user_email:
1162 # First-Party
1163 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
1165 permission_service = PermissionService(db)
1166 if not await permission_service.check_resource_ownership(user_email, server):
1167 raise PermissionError("Only the owner can update this server")
1169 # Check for name conflict if name is being changed and visibility is public
1170 if server_update.name and server_update.name != server.name:
1171 visibility = server_update.visibility or server.visibility
1172 team_id = server_update.team_id or server.team_id
1173 if visibility.lower() == "public":
1174 # Check for existing public server with the same name
1175 existing_server = get_for_update(db, DbServer, where=and_(DbServer.name == server_update.name, DbServer.visibility == "public", DbServer.id != server.id))
1176 if existing_server:
1177 raise ServerNameConflictError(server_update.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility)
1178 elif visibility.lower() == "team" and team_id:
1179 # Check for existing team server with the same name
1180 existing_server = get_for_update(
1181 db, DbServer, where=and_(DbServer.name == server_update.name, DbServer.visibility == "team", DbServer.team_id == team_id, DbServer.id != server.id)
1182 )
1183 if existing_server:
1184 raise ServerNameConflictError(server_update.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility)
1186 # Update simple fields
1187 if server_update.id is not None and server_update.id != server.id:
1188 # Check if the new UUID is already in use
1189 existing = db.get(DbServer, server_update.id)
1190 if existing:
1191 raise ServerError(f"Server with ID {server_update.id} already exists")
1192 server.id = server_update.id
1193 if server_update.name is not None:
1194 server.name = server_update.name
1195 if server_update.description is not None:
1196 server.description = server_update.description
1197 if server_update.icon is not None:
1198 server.icon = server_update.icon
1200 if server_update.visibility is not None:
1201 new_visibility = server_update.visibility
1203 # Validate visibility transitions
1204 if new_visibility == "team":
1205 if not server.team_id and not server_update.team_id:
1206 raise ValueError("Cannot set visibility to 'team' without a team_id")
1208 # Verify team exists and user is a member
1209 if server.team_id:
1210 team_id = server.team_id
1211 else:
1212 team_id = server_update.team_id
1214 team = db.query(DbEmailTeam).filter(DbEmailTeam.id == team_id).first()
1215 if not team:
1216 raise ValueError(f"Team {team_id} not found")
1218 # Verify user is a member of the team
1219 membership = (
1220 db.query(DbEmailTeamMember)
1221 .filter(DbEmailTeamMember.team_id == team_id, DbEmailTeamMember.user_email == user_email, DbEmailTeamMember.is_active, DbEmailTeamMember.role == "owner")
1222 .first()
1223 )
1224 if not membership:
1225 raise ValueError("User membership in team not sufficient for this update.")
1227 elif new_visibility == "public":
1228 # Optional: Check if user has permission to make resources public
1229 # This could be a platform-level permission
1230 pass
1232 server.visibility = new_visibility
1234 if server_update.team_id is not None:
1235 server.team_id = server_update.team_id
1237 if server_update.owner_email is not None:
1238 server.owner_email = server_update.owner_email
1240 # Update associated tools if provided using bulk query
1241 if server_update.associated_tools is not None:
1242 server.tools = []
1243 if server_update.associated_tools:
1244 tool_ids = [tool_id for tool_id in server_update.associated_tools if tool_id]
1245 if tool_ids:
1246 tools = db.execute(select(DbTool).where(DbTool.id.in_(tool_ids))).scalars().all()
1247 server.tools = list(tools)
1249 # Update associated resources if provided using bulk query
1250 if server_update.associated_resources is not None:
1251 server.resources = []
1252 if server_update.associated_resources:
1253 resource_ids = [resource_id for resource_id in server_update.associated_resources if resource_id]
1254 if resource_ids:
1255 resources = db.execute(select(DbResource).where(DbResource.id.in_(resource_ids))).scalars().all()
1256 server.resources = list(resources)
1258 # Update associated prompts if provided using bulk query
1259 if server_update.associated_prompts is not None:
1260 server.prompts = []
1261 if server_update.associated_prompts:
1262 prompt_ids = [prompt_id for prompt_id in server_update.associated_prompts if prompt_id]
1263 if prompt_ids:
1264 prompts = db.execute(select(DbPrompt).where(DbPrompt.id.in_(prompt_ids))).scalars().all()
1265 server.prompts = list(prompts)
1267 # Update tags if provided
1268 if server_update.tags is not None:
1269 server.tags = server_update.tags
1271 # Update OAuth 2.0 configuration if provided
1272 # Track if OAuth is being explicitly disabled to prevent config re-assignment
1273 oauth_being_disabled = server_update.oauth_enabled is not None and not server_update.oauth_enabled
1275 if server_update.oauth_enabled is not None:
1276 server.oauth_enabled = server_update.oauth_enabled
1277 # If OAuth is being disabled, clear the config
1278 if oauth_being_disabled:
1279 server.oauth_config = None
1281 # Only update oauth_config if OAuth is not being explicitly disabled
1282 # This prevents the case where oauth_enabled=False and oauth_config are both provided
1283 if not oauth_being_disabled:
1284 if hasattr(server_update, "model_fields_set") and "oauth_config" in server_update.model_fields_set:
1285 server.oauth_config = server_update.oauth_config
1286 elif server_update.oauth_config is not None:
1287 server.oauth_config = server_update.oauth_config
1289 # Update metadata fields
1290 server.updated_at = datetime.now(timezone.utc)
1291 if modified_by:
1292 server.modified_by = modified_by
1293 if modified_from_ip:
1294 server.modified_from_ip = modified_from_ip
1295 if modified_via:
1296 server.modified_via = modified_via
1297 if modified_user_agent:
1298 server.modified_user_agent = modified_user_agent
1299 if hasattr(server, "version") and server.version is not None:
1300 server.version = server.version + 1
1301 else:
1302 server.version = 1
1304 db.commit()
1305 db.refresh(server)
1306 # Force loading relationships
1307 _ = server.tools, server.resources, server.prompts
1309 # Invalidate cache after successful update
1310 cache = _get_registry_cache()
1311 await cache.invalidate_servers()
1312 # Also invalidate tags cache since server tags may have changed
1313 # First-Party
1314 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
1316 await admin_stats_cache.invalidate_tags()
1318 await self._notify_server_updated(server)
1319 logger.info(f"Updated server: {server.name}")
1321 # Structured logging: Audit trail for server update
1322 changes = []
1323 if server_update.name:
1324 changes.append(f"name: {server_update.name}")
1325 if server_update.visibility:
1326 changes.append(f"visibility: {server_update.visibility}")
1327 if server_update.team_id:
1328 changes.append(f"team_id: {server_update.team_id}")
1330 self._audit_trail.log_action(
1331 user_id=user_email or "system",
1332 action="update_server",
1333 resource_type="server",
1334 resource_id=server.id,
1335 details={
1336 "server_name": server.name,
1337 "changes": ", ".join(changes) if changes else "metadata only",
1338 "version": server.version,
1339 },
1340 metadata={
1341 "modified_from_ip": modified_from_ip,
1342 "modified_via": modified_via,
1343 "modified_user_agent": modified_user_agent,
1344 },
1345 )
1347 # Structured logging: Log successful server update
1348 self._structured_logger.log(
1349 level="INFO",
1350 message="Server updated successfully",
1351 event_type="server_updated",
1352 component="server_service",
1353 server_id=server.id,
1354 server_name=server.name,
1355 modified_by=user_email,
1356 user_email=user_email,
1357 )
1359 # Build a dictionary with associated IDs
1360 # Team name is loaded via server.team property from email_team relationship
1361 server_data = {
1362 "id": server.id,
1363 "name": server.name,
1364 "description": server.description,
1365 "icon": server.icon,
1366 "team": server.team,
1367 "created_at": server.created_at,
1368 "updated_at": server.updated_at,
1369 "enabled": server.enabled,
1370 "associated_tools": [tool.id for tool in server.tools],
1371 "associated_resources": [res.id for res in server.resources],
1372 "associated_prompts": [prompt.id for prompt in server.prompts],
1373 }
1374 logger.debug(f"Server Data: {server_data}")
1375 return self.convert_server_to_read(server)
1376 except IntegrityError as ie:
1377 db.rollback()
1378 logger.error(f"IntegrityErrors in group: {ie}")
1380 # Structured logging: Log database integrity error
1381 self._structured_logger.log(
1382 level="ERROR",
1383 message="Server update failed due to database integrity error",
1384 event_type="server_update_failed",
1385 component="server_service",
1386 server_id=server_id,
1387 error_type="IntegrityError",
1388 error_message=str(ie),
1389 modified_by=user_email,
1390 user_email=user_email,
1391 )
1392 raise ie
1393 except ServerNameConflictError as snce:
1394 db.rollback()
1395 logger.error(f"Server name conflict: {snce}")
1397 # Structured logging: Log name conflict error
1398 self._structured_logger.log(
1399 level="WARNING",
1400 message="Server update failed due to name conflict",
1401 event_type="server_name_conflict",
1402 component="server_service",
1403 server_id=server_id,
1404 modified_by=user_email,
1405 user_email=user_email,
1406 )
1407 raise snce
1408 except Exception as e:
1409 db.rollback()
1411 # Structured logging: Log generic server update failure
1412 self._structured_logger.log(
1413 level="ERROR",
1414 message="Server update failed",
1415 event_type="server_update_failed",
1416 component="server_service",
1417 server_id=server_id,
1418 error_type=type(e).__name__,
1419 error_message=str(e),
1420 modified_by=user_email,
1421 user_email=user_email,
1422 )
1423 raise ServerError(f"Failed to update server: {str(e)}")
1425 async def set_server_state(self, db: Session, server_id: str, activate: bool, user_email: Optional[str] = None) -> ServerRead:
1426 """Set the activation status of a server.
1428 Args:
1429 db: Database session.
1430 server_id: The unique identifier of the server.
1431 activate: True to activate, False to deactivate.
1432 user_email: Optional[str] The email of the user to check if the user has permission to modify.
1434 Returns:
1435 The updated ServerRead object.
1437 Raises:
1438 ServerNotFoundError: If the server is not found.
1439 ServerLockConflictError: If the server row is locked by another transaction.
1440 ServerError: For other errors.
1441 PermissionError: If user doesn't own the agent.
1443 Examples:
1444 >>> from mcpgateway.services.server_service import ServerService
1445 >>> from unittest.mock import MagicMock, AsyncMock, patch
1446 >>> from mcpgateway.schemas import ServerRead
1447 >>> service = ServerService()
1448 >>> db = MagicMock()
1449 >>> server = MagicMock()
1450 >>> db.get.return_value = server
1451 >>> db.commit = MagicMock()
1452 >>> db.refresh = MagicMock()
1453 >>> service._notify_server_activated = AsyncMock()
1454 >>> service._notify_server_deactivated = AsyncMock()
1455 >>> service.convert_server_to_read = MagicMock(return_value='server_read')
1456 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes
1457 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes
1458 >>> ServerRead.model_validate = MagicMock(return_value='server_read')
1459 >>> import asyncio
1460 >>> asyncio.run(service.set_server_state(db, 'server_id', True))
1461 'server_read'
1462 """
1463 try:
1464 # Use nowait=True to fail fast if row is locked, preventing lock contention under high load
1465 try:
1466 server = get_for_update(
1467 db,
1468 DbServer,
1469 server_id,
1470 nowait=True,
1471 options=[
1472 selectinload(DbServer.tools),
1473 selectinload(DbServer.resources),
1474 selectinload(DbServer.prompts),
1475 selectinload(DbServer.a2a_agents),
1476 selectinload(DbServer.email_team),
1477 ],
1478 )
1479 except OperationalError as lock_err:
1480 # Row is locked by another transaction - fail fast with 409
1481 db.rollback()
1482 raise ServerLockConflictError(f"Server {server_id} is currently being modified by another request") from lock_err
1483 if not server:
1484 raise ServerNotFoundError(f"Server not found: {server_id}")
1486 if user_email:
1487 # First-Party
1488 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
1490 permission_service = PermissionService(db)
1491 if not await permission_service.check_resource_ownership(user_email, server):
1492 raise PermissionError("Only the owner can activate the Server" if activate else "Only the owner can deactivate the Server")
1494 if server.enabled != activate:
1495 server.enabled = activate
1496 server.updated_at = datetime.now(timezone.utc)
1497 db.commit()
1498 db.refresh(server)
1500 # Invalidate cache after status change
1501 cache = _get_registry_cache()
1502 await cache.invalidate_servers()
1504 if activate:
1505 await self._notify_server_activated(server)
1506 else:
1507 await self._notify_server_deactivated(server)
1508 logger.info(f"Server {server.name} {'activated' if activate else 'deactivated'}")
1510 # Structured logging: Audit trail for server state change
1511 self._audit_trail.log_action(
1512 user_id=user_email or "system",
1513 action="activate_server" if activate else "deactivate_server",
1514 resource_type="server",
1515 resource_id=server.id,
1516 details={
1517 "server_name": server.name,
1518 "new_status": "active" if activate else "inactive",
1519 },
1520 )
1522 # Structured logging: Log server status change
1523 self._structured_logger.log(
1524 level="INFO",
1525 message=f"Server {'activated' if activate else 'deactivated'}",
1526 event_type="server_status_changed",
1527 component="server_service",
1528 server_id=server.id,
1529 server_name=server.name,
1530 new_status="active" if activate else "inactive",
1531 changed_by=user_email,
1532 user_email=user_email,
1533 )
1535 # Team name is loaded via server.team property from email_team relationship
1536 server_data = {
1537 "id": server.id,
1538 "name": server.name,
1539 "description": server.description,
1540 "icon": server.icon,
1541 "team": server.team,
1542 "created_at": server.created_at,
1543 "updated_at": server.updated_at,
1544 "enabled": server.enabled,
1545 "associated_tools": [tool.id for tool in server.tools],
1546 "associated_resources": [res.id for res in server.resources],
1547 "associated_prompts": [prompt.id for prompt in server.prompts],
1548 }
1549 logger.info(f"Server Data: {server_data}")
1550 return self.convert_server_to_read(server)
1551 except PermissionError as e:
1552 # Structured logging: Log permission error
1553 self._structured_logger.log(
1554 level="WARNING",
1555 message="Server state change failed due to insufficient permissions",
1556 event_type="server_state_change_permission_denied",
1557 component="server_service",
1558 server_id=server_id,
1559 user_email=user_email,
1560 )
1561 raise e
1562 except ServerLockConflictError:
1563 # Re-raise lock conflicts without wrapping - allows 409 response
1564 raise
1565 except ServerNotFoundError:
1566 # Re-raise not found without wrapping - allows 404 response
1567 raise
1568 except Exception as e:
1569 db.rollback()
1571 # Structured logging: Log generic server state change failure
1572 self._structured_logger.log(
1573 level="ERROR",
1574 message="Server state change failed",
1575 event_type="server_state_change_failed",
1576 component="server_service",
1577 server_id=server_id,
1578 error_type=type(e).__name__,
1579 error_message=str(e),
1580 user_email=user_email,
1581 )
1582 raise ServerError(f"Failed to set server state: {str(e)}")
1584 async def delete_server(self, db: Session, server_id: str, user_email: Optional[str] = None, purge_metrics: bool = False) -> None:
1585 """Permanently delete a server.
1587 Args:
1588 db: Database session.
1589 server_id: The unique identifier of the server.
1590 user_email: Email of user performing deletion (for ownership check).
1591 purge_metrics: If True, delete raw + rollup metrics for this server.
1593 Raises:
1594 ServerNotFoundError: If the server is not found.
1595 PermissionError: If user doesn't own the server.
1596 ServerError: For other deletion errors.
1598 Examples:
1599 >>> from mcpgateway.services.server_service import ServerService
1600 >>> from unittest.mock import MagicMock, AsyncMock, patch
1601 >>> service = ServerService()
1602 >>> db = MagicMock()
1603 >>> server = MagicMock()
1604 >>> db.get.return_value = server
1605 >>> db.delete = MagicMock()
1606 >>> db.commit = MagicMock()
1607 >>> service._notify_server_deleted = AsyncMock()
1608 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes
1609 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes
1610 >>> import asyncio
1611 >>> asyncio.run(service.delete_server(db, 'server_id', 'user@example.com'))
1612 """
1613 try:
1614 server = db.get(DbServer, server_id)
1615 if not server:
1616 raise ServerNotFoundError(f"Server not found: {server_id}")
1618 # Check ownership if user_email provided
1619 if user_email:
1620 # First-Party
1621 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
1623 permission_service = PermissionService(db)
1624 if not await permission_service.check_resource_ownership(user_email, server):
1625 raise PermissionError("Only the owner can delete this server")
1627 server_info = {"id": server.id, "name": server.name}
1628 if purge_metrics:
1629 with pause_rollup_during_purge(reason=f"purge_server:{server_id}"):
1630 delete_metrics_in_batches(db, ServerMetric, ServerMetric.server_id, server_id)
1631 delete_metrics_in_batches(db, ServerMetricsHourly, ServerMetricsHourly.server_id, server_id)
1632 db.delete(server)
1633 db.commit()
1635 # Invalidate cache after successful deletion
1636 cache = _get_registry_cache()
1637 await cache.invalidate_servers()
1638 # Also invalidate tags cache since server tags may have changed
1639 # First-Party
1640 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
1642 await admin_stats_cache.invalidate_tags()
1643 # First-Party
1644 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
1646 metrics_cache.invalidate_prefix("top_servers:")
1647 metrics_cache.invalidate("servers")
1649 await self._notify_server_deleted(server_info)
1650 logger.info(f"Deleted server: {server_info['name']}")
1652 # Structured logging: Audit trail for server deletion
1653 self._audit_trail.log_action(
1654 user_id=user_email or "system",
1655 action="delete_server",
1656 resource_type="server",
1657 resource_id=server_info["id"],
1658 details={
1659 "server_name": server_info["name"],
1660 },
1661 )
1663 # Structured logging: Log successful server deletion
1664 self._structured_logger.log(
1665 level="INFO",
1666 message="Server deleted successfully",
1667 event_type="server_deleted",
1668 component="server_service",
1669 server_id=server_info["id"],
1670 server_name=server_info["name"],
1671 deleted_by=user_email,
1672 user_email=user_email,
1673 purge_metrics=purge_metrics,
1674 )
1675 except PermissionError as pe:
1676 db.rollback()
1678 # Structured logging: Log permission error
1679 self._structured_logger.log(
1680 level="WARNING",
1681 message="Server deletion failed due to insufficient permissions",
1682 event_type="server_deletion_permission_denied",
1683 component="server_service",
1684 server_id=server_id,
1685 user_email=user_email,
1686 )
1687 raise pe
1688 except Exception as e:
1689 db.rollback()
1691 # Structured logging: Log generic server deletion failure
1692 self._structured_logger.log(
1693 level="ERROR",
1694 message="Server deletion failed",
1695 event_type="server_deletion_failed",
1696 component="server_service",
1697 server_id=server_id,
1698 error_type=type(e).__name__,
1699 error_message=str(e),
1700 user_email=user_email,
1701 )
1702 raise ServerError(f"Failed to delete server: {str(e)}")
1704 async def _publish_event(self, event: Dict[str, Any]) -> None:
1705 """
1706 Publish an event to all subscribed queues.
1708 Args:
1709 event: Event to publish
1710 """
1711 for queue in self._event_subscribers:
1712 await queue.put(event)
1714 async def subscribe_events(self) -> AsyncGenerator[Dict[str, Any], None]:
1715 """Subscribe to server events.
1717 Yields:
1718 Server event messages.
1719 """
1720 queue: asyncio.Queue = asyncio.Queue()
1721 self._event_subscribers.append(queue)
1722 try:
1723 while True:
1724 event = await queue.get()
1725 yield event
1726 finally:
1727 self._event_subscribers.remove(queue)
1729 async def _notify_server_added(self, server: DbServer) -> None:
1730 """
1731 Notify subscribers that a new server has been added.
1733 Args:
1734 server: Server to add
1735 """
1736 associated_tools = [tool.id for tool in server.tools] if server.tools else []
1737 associated_resources = [res.id for res in server.resources] if server.resources else []
1738 associated_prompts = [prompt.id for prompt in server.prompts] if server.prompts else []
1739 event = {
1740 "type": "server_added",
1741 "data": {
1742 "id": server.id,
1743 "name": server.name,
1744 "description": server.description,
1745 "icon": server.icon,
1746 "associated_tools": associated_tools,
1747 "associated_resources": associated_resources,
1748 "associated_prompts": associated_prompts,
1749 "enabled": server.enabled,
1750 },
1751 "timestamp": datetime.now(timezone.utc).isoformat(),
1752 }
1753 await self._publish_event(event)
1755 async def _notify_server_updated(self, server: DbServer) -> None:
1756 """
1757 Notify subscribers that a server has been updated.
1759 Args:
1760 server: Server to update
1761 """
1762 associated_tools = [tool.id for tool in server.tools] if server.tools else []
1763 associated_resources = [res.id for res in server.resources] if server.resources else []
1764 associated_prompts = [prompt.id for prompt in server.prompts] if server.prompts else []
1765 event = {
1766 "type": "server_updated",
1767 "data": {
1768 "id": server.id,
1769 "name": server.name,
1770 "description": server.description,
1771 "icon": server.icon,
1772 "associated_tools": associated_tools,
1773 "associated_resources": associated_resources,
1774 "associated_prompts": associated_prompts,
1775 "enabled": server.enabled,
1776 },
1777 "timestamp": datetime.now(timezone.utc).isoformat(),
1778 }
1779 await self._publish_event(event)
1781 async def _notify_server_activated(self, server: DbServer) -> None:
1782 """
1783 Notify subscribers that a server has been activated.
1785 Args:
1786 server: Server to activate
1787 """
1788 event = {
1789 "type": "server_activated",
1790 "data": {
1791 "id": server.id,
1792 "name": server.name,
1793 "enabled": True,
1794 },
1795 "timestamp": datetime.now(timezone.utc).isoformat(),
1796 }
1797 await self._publish_event(event)
1799 async def _notify_server_deactivated(self, server: DbServer) -> None:
1800 """
1801 Notify subscribers that a server has been deactivated.
1803 Args:
1804 server: Server to deactivate
1805 """
1806 event = {
1807 "type": "server_deactivated",
1808 "data": {
1809 "id": server.id,
1810 "name": server.name,
1811 "enabled": False,
1812 },
1813 "timestamp": datetime.now(timezone.utc).isoformat(),
1814 }
1815 await self._publish_event(event)
1817 async def _notify_server_deleted(self, server_info: Dict[str, Any]) -> None:
1818 """
1819 Notify subscribers that a server has been deleted.
1821 Args:
1822 server_info: Dictionary on server to be deleted
1823 """
1824 event = {
1825 "type": "server_deleted",
1826 "data": server_info,
1827 "timestamp": datetime.now(timezone.utc).isoformat(),
1828 }
1829 await self._publish_event(event)
1831 # --- Metrics ---
1832 async def aggregate_metrics(self, db: Session) -> ServerMetrics:
1833 """
1834 Aggregate metrics for all server invocations across all servers.
1836 Combines recent raw metrics (within retention period) with historical
1837 hourly rollups for complete historical coverage. Uses in-memory caching
1838 (10s TTL) to reduce database load under high request rates.
1840 Args:
1841 db: Database session
1843 Returns:
1844 ServerMetrics: Aggregated metrics from raw + hourly rollup tables.
1846 Examples:
1847 >>> from mcpgateway.services.server_service import ServerService
1848 >>> service = ServerService()
1849 >>> # Method exists and is callable
1850 >>> callable(service.aggregate_metrics)
1851 True
1852 """
1853 # Check cache first (if enabled)
1854 # First-Party
1855 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel
1857 if is_cache_enabled():
1858 cached = metrics_cache.get("servers")
1859 if cached is not None:
1860 return ServerMetrics(**cached)
1862 # Use combined raw + rollup query for full historical coverage
1863 # First-Party
1864 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel
1866 result = aggregate_metrics_combined(db, "server")
1868 metrics = ServerMetrics(
1869 total_executions=result.total_executions,
1870 successful_executions=result.successful_executions,
1871 failed_executions=result.failed_executions,
1872 failure_rate=result.failure_rate,
1873 min_response_time=result.min_response_time,
1874 max_response_time=result.max_response_time,
1875 avg_response_time=result.avg_response_time,
1876 last_execution_time=result.last_execution_time,
1877 )
1879 # Cache the result as dict for serialization compatibility (if enabled)
1880 if is_cache_enabled():
1881 metrics_cache.set("servers", metrics.model_dump())
1883 return metrics
1885 async def reset_metrics(self, db: Session) -> None:
1886 """
1887 Reset all server metrics by deleting raw and hourly rollup records.
1889 Args:
1890 db: Database session
1892 Examples:
1893 >>> from mcpgateway.services.server_service import ServerService
1894 >>> from unittest.mock import MagicMock
1895 >>> service = ServerService()
1896 >>> db = MagicMock()
1897 >>> db.execute = MagicMock()
1898 >>> db.commit = MagicMock()
1899 >>> import asyncio
1900 >>> asyncio.run(service.reset_metrics(db))
1901 """
1902 db.execute(delete(ServerMetric))
1903 db.execute(delete(ServerMetricsHourly))
1904 db.commit()
1906 # Invalidate metrics cache
1907 # First-Party
1908 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
1910 metrics_cache.invalidate("servers")
1911 metrics_cache.invalidate_prefix("top_servers:")
1913 def get_oauth_protected_resource_metadata(self, db: Session, server_id: str, resource_base_url: str) -> Dict[str, Any]:
1914 """
1915 Get RFC 9728 OAuth 2.0 Protected Resource Metadata for a server.
1917 This method retrieves the OAuth configuration for a server and formats it
1918 according to RFC 9728 Protected Resource Metadata specification, enabling
1919 MCP clients to discover OAuth authorization servers for browser-based SSO.
1921 Args:
1922 db: Database session.
1923 server_id: The ID of the server.
1924 resource_base_url: The base URL for the resource (e.g., "https://gateway.example.com/servers/abc123").
1926 Returns:
1927 Dict containing RFC 9728 Protected Resource Metadata:
1928 - resource: The protected resource identifier (URL)
1929 - authorization_servers: List of authorization server issuer URIs
1930 - bearer_methods_supported: Supported bearer token methods
1931 - scopes_supported: Optional list of supported scopes
1933 Raises:
1934 ServerNotFoundError: If server doesn't exist, is disabled, or is non-public.
1935 ServerError: If OAuth is not enabled or not properly configured.
1937 Examples:
1938 >>> from mcpgateway.services.server_service import ServerService
1939 >>> service = ServerService()
1940 >>> # Method exists and is callable
1941 >>> callable(service.get_oauth_protected_resource_metadata)
1942 True
1943 """
1944 server = db.get(DbServer, server_id)
1946 # Return not found for non-existent, disabled, or non-public servers
1947 # (avoids leaking information about private/team servers)
1948 if not server:
1949 raise ServerNotFoundError(f"Server not found: {server_id}")
1951 if not server.enabled:
1952 raise ServerNotFoundError(f"Server not found: {server_id}")
1954 if getattr(server, "visibility", "public") != "public":
1955 raise ServerNotFoundError(f"Server not found: {server_id}")
1957 # Check OAuth configuration
1958 if not getattr(server, "oauth_enabled", False):
1959 raise ServerError(f"OAuth not enabled for server: {server_id}")
1961 oauth_config = getattr(server, "oauth_config", None)
1962 if not oauth_config:
1963 raise ServerError(f"OAuth not configured for server: {server_id}")
1965 # Extract authorization server(s) - support both list and single value
1966 authorization_servers = oauth_config.get("authorization_servers", [])
1967 if not authorization_servers:
1968 auth_server = oauth_config.get("authorization_server")
1969 if auth_server:
1970 authorization_servers = [auth_server]
1972 if not authorization_servers:
1973 raise ServerError(f"OAuth authorization_server not configured for server: {server_id}")
1975 # Build RFC 9728 Protected Resource Metadata response
1976 response_data: Dict[str, Any] = {
1977 "resource": resource_base_url,
1978 "authorization_servers": authorization_servers,
1979 "bearer_methods_supported": ["header"],
1980 }
1982 # Add optional scopes if configured (never include secrets from oauth_config)
1983 scopes = oauth_config.get("scopes_supported") or oauth_config.get("scopes")
1984 if scopes:
1985 response_data["scopes_supported"] = scopes
1987 logger.debug(f"Returning OAuth protected resource metadata for server {server_id}")
1988 return response_data
1991# Lazy singleton - created on first access, not at module import time.
1992# This avoids instantiation when only exception classes are imported.
1993_server_service_instance = None # pylint: disable=invalid-name
1996def __getattr__(name: str):
1997 """Module-level __getattr__ for lazy singleton creation.
1999 Args:
2000 name: The attribute name being accessed.
2002 Returns:
2003 The server_service singleton instance if name is "server_service".
2005 Raises:
2006 AttributeError: If the attribute name is not "server_service".
2007 """
2008 global _server_service_instance # pylint: disable=global-statement
2009 if name == "server_service":
2010 if _server_service_instance is None:
2011 _server_service_instance = ServerService()
2012 return _server_service_instance
2013 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")