Coverage for mcpgateway / services / server_service.py: 100%

572 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/server_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7ContextForge Server Service 

8 

9This module implements server management for the MCP Servers Catalog. 

10It handles server registration, listing, retrieval, updates, activation toggling, and deletion. 

11It also publishes event notifications for server changes. 

12""" 

13 

14# Standard 

15import asyncio 

16import binascii 

17from datetime import datetime, timezone 

18from typing import Any, AsyncGenerator, Dict, List, Optional, Union 

19 

20# Third-Party 

21import httpx 

22from pydantic import ValidationError 

23from sqlalchemy import and_, delete, desc, or_, select 

24from sqlalchemy.exc import IntegrityError, OperationalError 

25from sqlalchemy.orm import joinedload, selectinload, Session 

26 

27# First-Party 

28from mcpgateway.config import settings 

29from mcpgateway.db import A2AAgent as DbA2AAgent 

30from mcpgateway.db import EmailTeam as DbEmailTeam 

31from mcpgateway.db import EmailTeamMember as DbEmailTeamMember 

32from mcpgateway.db import get_for_update 

33from mcpgateway.db import Prompt as DbPrompt 

34from mcpgateway.db import Resource as DbResource 

35from mcpgateway.db import Server as DbServer 

36from mcpgateway.db import ServerMetric, ServerMetricsHourly 

37from mcpgateway.db import Tool as DbTool 

38from mcpgateway.schemas import ServerCreate, ServerMetrics, ServerRead, ServerUpdate, TopPerformer 

39from mcpgateway.services.audit_trail_service import get_audit_trail_service 

40from mcpgateway.services.base_service import BaseService 

41from mcpgateway.services.encryption_service import protect_oauth_config_for_storage 

42from mcpgateway.services.logging_service import LoggingService 

43from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge 

44from mcpgateway.services.performance_tracker import get_performance_tracker 

45from mcpgateway.services.structured_logger import get_structured_logger 

46from mcpgateway.services.team_management_service import TeamManagementService 

47from mcpgateway.utils.metrics_common import build_top_performers 

48from mcpgateway.utils.pagination import unified_paginate 

49from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr 

50 

51# Cache import (lazy to avoid circular dependencies) 

52_REGISTRY_CACHE = None 

53 

54 

55def _get_registry_cache(): 

56 """Get registry cache singleton lazily. 

57 

58 Returns: 

59 RegistryCache instance. 

60 """ 

61 global _REGISTRY_CACHE # pylint: disable=global-statement 

62 if _REGISTRY_CACHE is None: 

63 # First-Party 

64 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel 

65 

66 _REGISTRY_CACHE = registry_cache 

67 return _REGISTRY_CACHE 

68 

69 

70def _validate_server_team_assignment(db: Session, user_email: Optional[str], target_team_id: Optional[str]) -> None: 

71 """Validate team assignment and ownership requirements for server updates. 

72 

73 Args: 

74 db: Database session used for membership checks. 

75 user_email: Requesting user email. When omitted, ownership checks are skipped 

76 for system/internal update paths. 

77 target_team_id: Team identifier to validate. 

78 

79 Raises: 

80 ValueError: If team ID is missing, team does not exist, or caller is not 

81 an active team owner. 

82 """ 

83 if not target_team_id: 

84 raise ValueError("Cannot set visibility to 'team' without a team_id") 

85 

86 team = db.query(DbEmailTeam).filter(DbEmailTeam.id == target_team_id).first() 

87 if not team: 

88 raise ValueError(f"Team {target_team_id} not found") 

89 

90 # Preserve existing behavior for system/internal updates where 

91 # user context may be intentionally omitted. 

92 if not user_email: 

93 return 

94 

95 membership = ( 

96 db.query(DbEmailTeamMember) 

97 .filter(DbEmailTeamMember.team_id == target_team_id, DbEmailTeamMember.user_email == user_email, DbEmailTeamMember.is_active, DbEmailTeamMember.role == "owner") 

98 .first() 

99 ) 

100 if not membership: 

101 raise ValueError("User membership in team not sufficient for this update.") 

102 

103 

104# Initialize logging service first 

105logging_service = LoggingService() 

106logger = logging_service.get_logger(__name__) 

107 

108 

109class ServerError(Exception): 

110 """Base class for server-related errors.""" 

111 

112 

113class ServerNotFoundError(ServerError): 

114 """Raised when a requested server is not found.""" 

115 

116 

117class ServerLockConflictError(ServerError): 

118 """Raised when a server row is locked by another transaction.""" 

119 

120 

121class ServerNameConflictError(ServerError): 

122 """Raised when a server name conflicts with an existing one.""" 

123 

124 def __init__(self, name: str, enabled: bool = True, server_id: Optional[str] = None, visibility: str = "public") -> None: 

125 """ 

126 Initialize a ServerNameConflictError exception. 

127 

128 This exception indicates a server name conflict, with additional context about visibility, 

129 whether the conflicting server is active, and its ID if known. The error message starts 

130 with the visibility information. 

131 

132 Visibility rules: 

133 - public: Restricts server names globally (across all teams). 

134 - team: Restricts server names only within the same team. 

135 

136 Args: 

137 name: The server name that caused the conflict. 

138 enabled: Whether the conflicting server is currently active. Defaults to True. 

139 server_id: The ID of the conflicting server, if known. Only included in message for inactive servers. 

140 visibility: The visibility of the conflicting server (e.g., "public", "private", "team"). 

141 

142 Examples: 

143 >>> error = ServerNameConflictError("My Server") 

144 >>> str(error) 

145 'Public Server already exists with name: My Server' 

146 >>> error = ServerNameConflictError("My Server", enabled=False, server_id=123) 

147 >>> str(error) 

148 'Public Server already exists with name: My Server (currently inactive, ID: 123)' 

149 >>> error.enabled 

150 False 

151 >>> error.server_id 

152 123 

153 >>> error = ServerNameConflictError("My Server", enabled=False, visibility="team") 

154 >>> str(error) 

155 'Team Server already exists with name: My Server (currently inactive, ID: None)' 

156 >>> error.enabled 

157 False 

158 >>> error.server_id is None 

159 True 

160 """ 

161 self.name = name 

162 self.enabled = enabled 

163 self.server_id = server_id 

164 message = f"{visibility.capitalize()} Server already exists with name: {name}" 

165 if not enabled: 

166 message += f" (currently inactive, ID: {server_id})" 

167 super().__init__(message) 

168 

169 

170class ServerService(BaseService): 

171 """Service for managing MCP Servers in the catalog. 

172 

173 Provides methods to create, list, retrieve, update, set state, and delete server records. 

174 Also supports event notifications for changes in server data. 

175 """ 

176 

177 _visibility_model_cls = DbServer 

178 

179 def __init__(self) -> None: 

180 """Initialize a new ServerService instance. 

181 

182 Sets up the service with: 

183 - An empty list for event subscribers that will receive server change notifications 

184 - An HTTP client configured with timeout and SSL verification settings from config 

185 

186 The HTTP client is used for health checks and other server-related HTTP operations. 

187 Event subscribers can register to receive notifications about server additions, 

188 updates, activations, deactivations, and deletions. 

189 

190 Examples: 

191 >>> from mcpgateway.services.server_service import ServerService 

192 >>> service = ServerService() 

193 >>> isinstance(service._event_subscribers, list) 

194 True 

195 >>> len(service._event_subscribers) 

196 0 

197 >>> hasattr(service, '_http_client') 

198 True 

199 """ 

200 self._event_subscribers: List[asyncio.Queue] = [] 

201 self._http_client = httpx.AsyncClient( 

202 timeout=settings.federation_timeout, 

203 verify=not settings.skip_ssl_verify, 

204 limits=httpx.Limits( 

205 max_connections=settings.httpx_max_connections, 

206 max_keepalive_connections=settings.httpx_max_keepalive_connections, 

207 keepalive_expiry=settings.httpx_keepalive_expiry, 

208 ), 

209 ) 

210 self._structured_logger = get_structured_logger("server_service") 

211 self._audit_trail = get_audit_trail_service() 

212 self._performance_tracker = get_performance_tracker() 

213 

214 async def initialize(self) -> None: 

215 """Initialize the server service.""" 

216 logger.info("Initializing server service") 

217 

218 async def shutdown(self) -> None: 

219 """Shutdown the server service.""" 

220 await self._http_client.aclose() 

221 logger.info("Server service shutdown complete") 

222 

223 # get_top_server 

224 async def get_top_servers(self, db: Session, limit: Optional[int] = 5, include_deleted: bool = False) -> List[TopPerformer]: 

225 """Retrieve the top-performing servers based on execution count. 

226 

227 Queries the database to get servers with their metrics, ordered by the number of executions 

228 in descending order. Combines recent raw metrics with historical hourly rollups for complete 

229 historical coverage. Returns a list of TopPerformer objects containing server details and 

230 performance metrics. Results are cached for performance. 

231 

232 Args: 

233 db (Session): Database session for querying server metrics. 

234 limit (Optional[int]): Maximum number of servers to return. Defaults to 5. 

235 include_deleted (bool): Whether to include deleted servers from rollups. 

236 

237 Returns: 

238 List[TopPerformer]: A list of TopPerformer objects, each containing: 

239 - id: Server ID. 

240 - name: Server name. 

241 - execution_count: Total number of executions. 

242 - avg_response_time: Average response time in seconds, or None if no metrics. 

243 - success_rate: Success rate percentage, or None if no metrics. 

244 - last_execution: Timestamp of the last execution, or None if no metrics. 

245 """ 

246 # Check cache first (if enabled) 

247 # First-Party 

248 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel 

249 

250 effective_limit = limit or 5 

251 cache_key = f"top_servers:{effective_limit}:include_deleted={include_deleted}" 

252 

253 if is_cache_enabled(): 

254 cached = metrics_cache.get(cache_key) 

255 if cached is not None: 

256 return cached 

257 

258 # Use combined query that includes both raw metrics and rollup data 

259 # First-Party 

260 from mcpgateway.services.metrics_query_service import get_top_performers_combined # pylint: disable=import-outside-toplevel 

261 

262 results = get_top_performers_combined( 

263 db=db, 

264 metric_type="server", 

265 entity_model=DbServer, 

266 limit=effective_limit, 

267 include_deleted=include_deleted, 

268 ) 

269 top_performers = build_top_performers(results) 

270 

271 # Cache the result (if enabled) 

272 if is_cache_enabled(): 

273 metrics_cache.set(cache_key, top_performers) 

274 

275 return top_performers 

276 

277 def convert_server_to_read(self, server: DbServer, include_metrics: bool = False) -> ServerRead: 

278 """ 

279 Converts a DbServer instance into a ServerRead model, optionally including aggregated metrics. 

280 

281 Args: 

282 server (DbServer): The ORM instance of the server. 

283 include_metrics (bool): Whether to include metrics in the result. Defaults to False. 

284 Set to False for list operations to avoid N+1 query issues. 

285 

286 Returns: 

287 ServerRead: The Pydantic model representing the server, optionally including aggregated metrics. 

288 

289 Examples: 

290 >>> from types import SimpleNamespace 

291 >>> from datetime import datetime, timezone 

292 >>> svc = ServerService() 

293 >>> now = datetime.now(timezone.utc) 

294 >>> # Fake metric objects 

295 >>> m1 = SimpleNamespace(is_success=True, response_time=0.2, timestamp=now) 

296 >>> m2 = SimpleNamespace(is_success=False, response_time=0.4, timestamp=now) 

297 >>> server = SimpleNamespace( 

298 ... id='s1', name='S', description=None, icon=None, 

299 ... created_at=now, updated_at=now, enabled=True, 

300 ... associated_tools=[], associated_resources=[], associated_prompts=[], associated_a2a_agents=[], 

301 ... tags=[], metrics=[m1, m2], 

302 ... metrics_summary={"total_executions": 2, "successful_executions": 1, "failed_executions": 1, 

303 ... "failure_rate": 0.5, "min_response_time": 0.2, "max_response_time": 0.4, 

304 ... "avg_response_time": 0.3, "last_execution_time": now}, 

305 ... tools=[], resources=[], prompts=[], a2a_agents=[], 

306 ... team_id=None, owner_email=None, visibility=None, 

307 ... created_by=None, modified_by=None 

308 ... ) 

309 >>> result = svc.convert_server_to_read(server, include_metrics=True) 

310 >>> result.metrics.total_executions 

311 2 

312 >>> result.metrics.successful_executions 

313 1 

314 """ 

315 # Build dict explicitly from attributes to ensure SQLAlchemy populates them 

316 # (using __dict__.copy() can return empty dict with certain query patterns) 

317 server_dict = { 

318 "id": server.id, 

319 "name": server.name, 

320 "description": server.description, 

321 "icon": server.icon, 

322 "enabled": server.enabled, 

323 "created_at": server.created_at, 

324 "updated_at": server.updated_at, 

325 "team_id": server.team_id, 

326 "owner_email": server.owner_email, 

327 "visibility": server.visibility, 

328 "created_by": server.created_by, 

329 "created_from_ip": getattr(server, "created_from_ip", None), 

330 "created_via": getattr(server, "created_via", None), 

331 "created_user_agent": getattr(server, "created_user_agent", None), 

332 "modified_by": server.modified_by, 

333 "modified_from_ip": getattr(server, "modified_from_ip", None), 

334 "modified_via": getattr(server, "modified_via", None), 

335 "modified_user_agent": getattr(server, "modified_user_agent", None), 

336 "import_batch_id": getattr(server, "import_batch_id", None), 

337 "federation_source": getattr(server, "federation_source", None), 

338 "version": getattr(server, "version", None), 

339 "tags": server.tags or [], 

340 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata 

341 "oauth_enabled": getattr(server, "oauth_enabled", False), 

342 "oauth_config": getattr(server, "oauth_config", None), 

343 } 

344 

345 # Compute aggregated metrics only if requested (avoids N+1 queries in list operations) 

346 if include_metrics: 

347 # Use metrics_summary which combines raw + hourly rollup data (matches tool_service pattern) 

348 metrics = server.metrics_summary 

349 server_dict["metrics"] = { 

350 "total_executions": metrics["total_executions"], 

351 "successful_executions": metrics["successful_executions"], 

352 "failed_executions": metrics["failed_executions"], 

353 "failure_rate": metrics["failure_rate"], 

354 "min_response_time": metrics["min_response_time"], 

355 "max_response_time": metrics["max_response_time"], 

356 "avg_response_time": metrics["avg_response_time"], 

357 "last_execution_time": metrics["last_execution_time"], 

358 } 

359 else: 

360 server_dict["metrics"] = None 

361 # Add associated IDs from relationships 

362 server_dict["associated_tools"] = [tool.name for tool in server.tools] if server.tools else [] 

363 server_dict["associated_tool_ids"] = [str(tool.id) for tool in server.tools] if server.tools else [] 

364 server_dict["associated_resources"] = [res.id for res in server.resources] if server.resources else [] 

365 server_dict["associated_prompts"] = [prompt.id for prompt in server.prompts] if server.prompts else [] 

366 server_dict["associated_a2a_agents"] = [agent.id for agent in server.a2a_agents] if server.a2a_agents else [] 

367 

368 # Team name is loaded via server.team property from email_team relationship 

369 server_dict["team"] = getattr(server, "team", None) 

370 

371 return ServerRead.model_validate(server_dict).masked() 

372 

373 def _assemble_associated_items( 

374 self, 

375 tools: Optional[List[str]], 

376 resources: Optional[List[str]], 

377 prompts: Optional[List[str]], 

378 a2a_agents: Optional[List[str]] = None, 

379 gateways: Optional[List[str]] = None, 

380 ) -> Dict[str, Any]: 

381 """ 

382 Assemble the associated items dictionary from the separate fields. 

383 

384 Args: 

385 tools: List of tool IDs. 

386 resources: List of resource IDs. 

387 prompts: List of prompt IDs. 

388 a2a_agents: List of A2A agent IDs. 

389 gateways: List of gateway IDs. 

390 

391 Returns: 

392 A dictionary with keys "tools", "resources", "prompts", "a2a_agents", and "gateways". 

393 

394 Examples: 

395 >>> service = ServerService() 

396 >>> # Test with all None values 

397 >>> result = service._assemble_associated_items(None, None, None) 

398 >>> result 

399 {'tools': [], 'resources': [], 'prompts': [], 'a2a_agents': [], 'gateways': []} 

400 

401 >>> # Test with empty lists 

402 >>> result = service._assemble_associated_items([], [], []) 

403 >>> result 

404 {'tools': [], 'resources': [], 'prompts': [], 'a2a_agents': [], 'gateways': []} 

405 

406 >>> # Test with actual values 

407 >>> result = service._assemble_associated_items(['tool1', 'tool2'], ['res1'], ['prompt1']) 

408 >>> result 

409 {'tools': ['tool1', 'tool2'], 'resources': ['res1'], 'prompts': ['prompt1'], 'a2a_agents': [], 'gateways': []} 

410 

411 >>> # Test with mixed None and values 

412 >>> result = service._assemble_associated_items(['tool1'], None, ['prompt1']) 

413 >>> result 

414 {'tools': ['tool1'], 'resources': [], 'prompts': ['prompt1'], 'a2a_agents': [], 'gateways': []} 

415 """ 

416 return { 

417 "tools": tools or [], 

418 "resources": resources or [], 

419 "prompts": prompts or [], 

420 "a2a_agents": a2a_agents or [], 

421 "gateways": gateways or [], 

422 } 

423 

424 async def register_server( 

425 self, 

426 db: Session, 

427 server_in: ServerCreate, 

428 created_by: Optional[str] = None, 

429 created_from_ip: Optional[str] = None, 

430 created_via: Optional[str] = None, 

431 created_user_agent: Optional[str] = None, 

432 team_id: Optional[str] = None, 

433 owner_email: Optional[str] = None, 

434 visibility: Optional[str] = "public", 

435 ) -> ServerRead: 

436 """ 

437 Register a new server in the catalog and validate that all associated items exist. 

438 

439 This function performs the following steps: 

440 1. Checks if a server with the same name already exists. 

441 2. Creates a new server record. 

442 3. For each ID provided in associated_tools, associated_resources, and associated_prompts, 

443 verifies that the corresponding item exists. If an item does not exist, an error is raised. 

444 4. Associates the verified items to the new server. 

445 5. Commits the transaction, refreshes the ORM instance, and forces the loading of relationship data. 

446 6. Constructs a response dictionary that includes lists of associated item IDs. 

447 7. Notifies subscribers of the addition and returns the validated response. 

448 

449 Args: 

450 db (Session): The SQLAlchemy database session. 

451 server_in (ServerCreate): The server creation schema containing server details and lists of 

452 associated tool, resource, and prompt IDs (as strings). 

453 created_by (Optional[str]): Email of the user creating the server, used for ownership tracking. 

454 created_from_ip (Optional[str]): IP address from which the creation request originated. 

455 created_via (Optional[str]): Source of creation (api, ui, import). 

456 created_user_agent (Optional[str]): User agent string from the creation request. 

457 team_id (Optional[str]): Team ID to assign the server to. 

458 owner_email (Optional[str]): Email of the user who owns this server. 

459 visibility (str): Server visibility level (private, team, public). 

460 

461 Returns: 

462 ServerRead: The newly created server, with associated item IDs. 

463 

464 Raises: 

465 IntegrityError: If a database integrity error occurs. 

466 ServerNameConflictError: If a server name conflict occurs (public or team visibility). 

467 ServerError: If any associated tool, resource, or prompt does not exist, or if any other registration error occurs. 

468 

469 Examples: 

470 >>> from mcpgateway.services.server_service import ServerService 

471 >>> from unittest.mock import MagicMock, AsyncMock, patch 

472 >>> from mcpgateway.schemas import ServerRead 

473 >>> service = ServerService() 

474 >>> db = MagicMock() 

475 >>> server_in = MagicMock() 

476 >>> server_in.id = None # No custom UUID for this test 

477 >>> db.execute.return_value.scalar_one_or_none.return_value = None 

478 >>> db.add = MagicMock() 

479 >>> db.commit = MagicMock() 

480 >>> db.refresh = MagicMock() 

481 >>> service._notify_server_added = AsyncMock() 

482 >>> service.convert_server_to_read = MagicMock(return_value='server_read') 

483 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes 

484 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes 

485 >>> ServerRead.model_validate = MagicMock(return_value='server_read') 

486 >>> import asyncio 

487 >>> asyncio.run(service.register_server(db, server_in)) 

488 'server_read' 

489 """ 

490 try: 

491 logger.info(f"Registering server: {server_in.name}") 

492 oauth_config = await protect_oauth_config_for_storage(getattr(server_in, "oauth_config", None)) 

493 # # Create the new server record. 

494 db_server = DbServer( 

495 name=server_in.name, 

496 description=server_in.description, 

497 icon=server_in.icon, 

498 enabled=True, 

499 tags=server_in.tags or [], 

500 # Team scoping fields - use schema values if provided, otherwise fallback to parameters 

501 team_id=getattr(server_in, "team_id", None) or team_id, 

502 owner_email=getattr(server_in, "owner_email", None) or owner_email or created_by, 

503 # IMPORTANT: Prefer function parameter over schema default 

504 # The API has visibility as a separate Body param that should override schema default 

505 visibility=visibility or getattr(server_in, "visibility", None) or "public", 

506 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata 

507 oauth_enabled=getattr(server_in, "oauth_enabled", False) or False, 

508 oauth_config=oauth_config, 

509 # Metadata fields 

510 created_by=created_by, 

511 created_from_ip=created_from_ip, 

512 created_via=created_via, 

513 created_user_agent=created_user_agent, 

514 version=1, 

515 ) 

516 # Check for existing server with the same name (with row locking to prevent race conditions) 

517 # The unique constraint is on (team_id, owner_email, name), so we check based on that 

518 owner_email_to_check = getattr(server_in, "owner_email", None) or owner_email or created_by 

519 team_id_to_check = getattr(server_in, "team_id", None) or team_id 

520 

521 # Build conditions based on the actual unique constraint: (team_id, owner_email, name) 

522 conditions = [ 

523 DbServer.name == server_in.name, 

524 DbServer.team_id == team_id_to_check if team_id_to_check else DbServer.team_id.is_(None), 

525 DbServer.owner_email == owner_email_to_check if owner_email_to_check else DbServer.owner_email.is_(None), 

526 ] 

527 if server_in.id: 

528 conditions.append(DbServer.id != server_in.id) 

529 

530 existing_server = get_for_update(db, DbServer, where=and_(*conditions)) 

531 if existing_server: 

532 raise ServerNameConflictError(server_in.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility) 

533 # Set custom UUID if provided 

534 if server_in.id: 

535 logger.info(f"Setting custom UUID for server: {server_in.id}") 

536 db_server.id = server_in.id 

537 logger.info(f"Adding server to DB session: {db_server.name}") 

538 db.add(db_server) 

539 

540 # Associate tools, verifying each exists using bulk query when multiple items 

541 if server_in.associated_tools: 

542 tool_ids = [tool_id.strip() for tool_id in server_in.associated_tools if tool_id.strip()] 

543 if len(tool_ids) > 1: 

544 # Use bulk query for multiple items 

545 tools = db.execute(select(DbTool).where(DbTool.id.in_(tool_ids))).scalars().all() 

546 found_tool_ids = {tool.id for tool in tools} 

547 missing_tool_ids = set(tool_ids) - found_tool_ids 

548 if missing_tool_ids: 

549 raise ServerError(f"Tools with ids {missing_tool_ids} do not exist.") 

550 db_server.tools.extend(tools) 

551 elif tool_ids: 

552 # Use single query for single item (maintains test compatibility) 

553 tool_obj = db.get(DbTool, tool_ids[0]) 

554 if not tool_obj: 

555 raise ServerError(f"Tool with id {tool_ids[0]} does not exist.") 

556 db_server.tools.append(tool_obj) 

557 

558 # Associate resources, verifying each exists using bulk query when multiple items 

559 if server_in.associated_resources: 

560 resource_ids = [resource_id.strip() for resource_id in server_in.associated_resources if resource_id.strip()] 

561 if len(resource_ids) > 1: 

562 # Use bulk query for multiple items 

563 resources = db.execute(select(DbResource).where(DbResource.id.in_(resource_ids))).scalars().all() 

564 found_resource_ids = {resource.id for resource in resources} 

565 missing_resource_ids = set(resource_ids) - found_resource_ids 

566 if missing_resource_ids: 

567 raise ServerError(f"Resources with ids {missing_resource_ids} do not exist.") 

568 db_server.resources.extend(resources) 

569 elif resource_ids: 

570 # Use single query for single item (maintains test compatibility) 

571 resource_obj = db.get(DbResource, resource_ids[0]) 

572 if not resource_obj: 

573 raise ServerError(f"Resource with id {resource_ids[0]} does not exist.") 

574 db_server.resources.append(resource_obj) 

575 

576 # Associate prompts, verifying each exists using bulk query when multiple items 

577 if server_in.associated_prompts: 

578 prompt_ids = [prompt_id.strip() for prompt_id in server_in.associated_prompts if prompt_id.strip()] 

579 if len(prompt_ids) > 1: 

580 # Use bulk query for multiple items 

581 prompts = db.execute(select(DbPrompt).where(DbPrompt.id.in_(prompt_ids))).scalars().all() 

582 found_prompt_ids = {prompt.id for prompt in prompts} 

583 missing_prompt_ids = set(prompt_ids) - found_prompt_ids 

584 if missing_prompt_ids: 

585 raise ServerError(f"Prompts with ids {missing_prompt_ids} do not exist.") 

586 db_server.prompts.extend(prompts) 

587 elif prompt_ids: 

588 # Use single query for single item (maintains test compatibility) 

589 prompt_obj = db.get(DbPrompt, prompt_ids[0]) 

590 if not prompt_obj: 

591 raise ServerError(f"Prompt with id {prompt_ids[0]} does not exist.") 

592 db_server.prompts.append(prompt_obj) 

593 

594 # Associate A2A agents, verifying each exists using bulk query when multiple items 

595 if server_in.associated_a2a_agents: 

596 agent_ids = [agent_id.strip() for agent_id in server_in.associated_a2a_agents if agent_id.strip()] 

597 if len(agent_ids) > 1: 

598 # Use bulk query for multiple items 

599 agents = db.execute(select(DbA2AAgent).where(DbA2AAgent.id.in_(agent_ids))).scalars().all() 

600 found_agent_ids = {agent.id for agent in agents} 

601 missing_agent_ids = set(agent_ids) - found_agent_ids 

602 if missing_agent_ids: 

603 raise ServerError(f"A2A Agents with ids {missing_agent_ids} do not exist.") 

604 db_server.a2a_agents.extend(agents) 

605 

606 # Note: Auto-tool creation for A2A agents should be handled 

607 # by a separate service or background task to avoid circular imports 

608 for agent in agents: 

609 logger.info(f"A2A agent {agent.name} associated with server {db_server.name}") 

610 elif agent_ids: 

611 # Use single query for single item (maintains test compatibility) 

612 agent_obj = db.get(DbA2AAgent, agent_ids[0]) 

613 if not agent_obj: 

614 raise ServerError(f"A2A Agent with id {agent_ids[0]} does not exist.") 

615 db_server.a2a_agents.append(agent_obj) 

616 logger.info(f"A2A agent {agent_obj.name} associated with server {db_server.name}") 

617 

618 # Commit the new record and refresh. 

619 db.commit() 

620 db.refresh(db_server) 

621 # Force load the relationship attributes. 

622 _ = db_server.tools, db_server.resources, db_server.prompts, db_server.a2a_agents 

623 

624 # Assemble response data with associated item IDs. 

625 server_data = { 

626 "id": db_server.id, 

627 "name": db_server.name, 

628 "description": db_server.description, 

629 "icon": db_server.icon, 

630 "created_at": db_server.created_at, 

631 "updated_at": db_server.updated_at, 

632 "enabled": db_server.enabled, 

633 "associated_tools": [str(tool.id) for tool in db_server.tools], 

634 "associated_resources": [str(resource.id) for resource in db_server.resources], 

635 "associated_prompts": [str(prompt.id) for prompt in db_server.prompts], 

636 } 

637 logger.debug(f"Server Data: {server_data}") 

638 await self._notify_server_added(db_server) 

639 logger.info(f"Registered server: {server_in.name}") 

640 

641 # Structured logging: Audit trail for server creation 

642 self._audit_trail.log_action( 

643 user_id=created_by or "system", 

644 action="create_server", 

645 resource_type="server", 

646 resource_id=db_server.id, 

647 details={ 

648 "server_name": db_server.name, 

649 "visibility": visibility, 

650 "team_id": team_id, 

651 "associated_tools_count": len(db_server.tools), 

652 "associated_resources_count": len(db_server.resources), 

653 "associated_prompts_count": len(db_server.prompts), 

654 "associated_a2a_agents_count": len(db_server.a2a_agents), 

655 }, 

656 metadata={ 

657 "created_from_ip": created_from_ip, 

658 "created_via": created_via, 

659 "created_user_agent": created_user_agent, 

660 }, 

661 ) 

662 

663 # Structured logging: Log successful server creation 

664 self._structured_logger.log( 

665 level="INFO", 

666 message="Server created successfully", 

667 event_type="server_created", 

668 component="server_service", 

669 server_id=db_server.id, 

670 server_name=db_server.name, 

671 visibility=visibility, 

672 created_by=created_by, 

673 user_email=created_by, 

674 ) 

675 

676 # Team name is loaded via db_server.team property from email_team relationship 

677 return self.convert_server_to_read(db_server) 

678 except IntegrityError as ie: 

679 db.rollback() 

680 logger.error(f"IntegrityErrors in group: {ie}") 

681 

682 # Structured logging: Log database integrity error 

683 self._structured_logger.log( 

684 level="ERROR", 

685 message="Server creation failed due to database integrity error", 

686 event_type="server_creation_failed", 

687 component="server_service", 

688 server_name=server_in.name, 

689 error_type="IntegrityError", 

690 error_message=str(ie), 

691 created_by=created_by, 

692 user_email=created_by, 

693 ) 

694 raise ie 

695 except ServerNameConflictError as se: 

696 db.rollback() 

697 

698 # Structured logging: Log name conflict error 

699 self._structured_logger.log( 

700 level="WARNING", 

701 message="Server creation failed due to name conflict", 

702 event_type="server_name_conflict", 

703 component="server_service", 

704 server_name=server_in.name, 

705 visibility=visibility, 

706 created_by=created_by, 

707 user_email=created_by, 

708 ) 

709 raise se 

710 except Exception as ex: 

711 db.rollback() 

712 

713 # Structured logging: Log generic server creation failure 

714 self._structured_logger.log( 

715 level="ERROR", 

716 message="Server creation failed", 

717 event_type="server_creation_failed", 

718 component="server_service", 

719 server_name=server_in.name, 

720 error_type=type(ex).__name__, 

721 error_message=str(ex), 

722 created_by=created_by, 

723 user_email=created_by, 

724 ) 

725 raise ServerError(f"Failed to register server: {str(ex)}") 

726 

727 async def list_servers( 

728 self, 

729 db: Session, 

730 include_inactive: bool = False, 

731 include_metrics: bool = False, 

732 tags: Optional[List[str]] = None, 

733 cursor: Optional[str] = None, 

734 limit: Optional[int] = None, 

735 page: Optional[int] = None, 

736 per_page: Optional[int] = None, 

737 user_email: Optional[str] = None, 

738 team_id: Optional[str] = None, 

739 visibility: Optional[str] = None, 

740 token_teams: Optional[List[str]] = None, 

741 ) -> Union[tuple[List[ServerRead], Optional[str]], Dict[str, Any]]: 

742 """List all registered servers with cursor or page-based pagination and optional team filtering. 

743 

744 Args: 

745 db: Database session. 

746 include_inactive: Whether to include inactive servers. 

747 include_metrics: Whether to include aggregated metrics in the results. 

748 tags: Filter servers by tags. If provided, only servers with at least one matching tag will be returned. 

749 cursor: Cursor for pagination (encoded last created_at and id). 

750 limit: Maximum number of servers to return. None for default, 0 for unlimited. 

751 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

752 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size. 

753 user_email: Email of user for team-based access control. None for no access control. 

754 team_id: Optional team ID to filter by specific team (requires user_email). 

755 visibility: Optional visibility filter (private, team, public) (requires user_email). 

756 token_teams: Optional list of team IDs from the token (None=unrestricted, []=public-only). 

757 

758 Returns: 

759 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}} 

760 If cursor is provided or neither: tuple of (list of ServerRead objects, next_cursor). 

761 

762 Examples: 

763 >>> from mcpgateway.services.server_service import ServerService 

764 >>> from unittest.mock import MagicMock 

765 >>> service = ServerService() 

766 >>> db = MagicMock() 

767 >>> server_read = MagicMock() 

768 >>> service.convert_server_to_read = MagicMock(return_value=server_read) 

769 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()] 

770 >>> import asyncio 

771 >>> servers, cursor = asyncio.run(service.list_servers(db)) 

772 >>> isinstance(servers, list) and cursor is None 

773 True 

774 """ 

775 # Check cache for first page only 

776 # SECURITY: Only cache public-only results (token_teams=[]) 

777 # - token_teams=None (admin bypass): Don't cache - admin sees all, should be fresh 

778 # - token_teams=[] (public-only): Cache - same result for all public-only users 

779 # - token_teams=[...] (team-scoped): Don't cache - results vary by team 

780 # - user_email set: Don't cache - results vary by user ownership 

781 cache = _get_registry_cache() 

782 is_public_only = token_teams is not None and len(token_teams) == 0 

783 use_cache = cursor is None and user_email is None and page is None and is_public_only 

784 if use_cache: 

785 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None, visibility=visibility) 

786 cached = await cache.get("servers", filters_hash) 

787 if cached is not None: 

788 # Reconstruct ServerRead objects from cached dicts 

789 cached_servers = [ServerRead.model_validate(s).masked() for s in cached["servers"]] 

790 return (cached_servers, cached.get("next_cursor")) 

791 

792 # Build base query with ordering and eager load relationships to avoid N+1 

793 query = ( 

794 select(DbServer) 

795 .options( 

796 selectinload(DbServer.tools), 

797 selectinload(DbServer.resources), 

798 selectinload(DbServer.prompts), 

799 selectinload(DbServer.a2a_agents), 

800 joinedload(DbServer.email_team), 

801 ) 

802 .order_by(desc(DbServer.created_at), desc(DbServer.id)) 

803 ) 

804 

805 # Eager load metrics relationships to prevent N+1 queries when include_metrics=true 

806 if include_metrics: 

807 query = query.options(selectinload(DbServer.metrics), selectinload(DbServer.metrics_hourly)) 

808 

809 # Apply active/inactive filter 

810 if not include_inactive: 

811 query = query.where(DbServer.enabled) 

812 

813 query = await self._apply_access_control(query, db, user_email, token_teams, team_id) 

814 

815 if visibility: 

816 query = query.where(DbServer.visibility == visibility) 

817 

818 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats) 

819 if tags: 

820 query = query.where(json_contains_tag_expr(db, DbServer.tags, tags, match_any=True)) 

821 

822 # Use unified pagination helper - handles both page and cursor pagination 

823 pag_result = await unified_paginate( 

824 db=db, 

825 query=query, 

826 page=page, 

827 per_page=per_page, 

828 cursor=cursor, 

829 limit=limit, 

830 base_url="/admin/servers", # Used for page-based links 

831 query_params={"include_inactive": include_inactive} if include_inactive else {}, 

832 ) 

833 

834 next_cursor = None 

835 # Extract servers based on pagination type 

836 if page is not None: 

837 # Page-based: pag_result is a dict 

838 servers_db = pag_result["data"] 

839 else: 

840 # Cursor-based: pag_result is a tuple 

841 servers_db, next_cursor = pag_result 

842 

843 db.commit() # Release transaction to avoid idle-in-transaction 

844 

845 # Convert to ServerRead (common for both pagination types) 

846 # Team names are loaded via joinedload(DbServer.email_team) 

847 result = [] 

848 for s in servers_db: 

849 try: 

850 result.append(self.convert_server_to_read(s, include_metrics=include_metrics)) 

851 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

852 logger.exception(f"Failed to convert server {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}") 

853 # Continue with remaining servers instead of failing completely 

854 

855 # Return appropriate format based on pagination type 

856 if page is not None: 

857 # Page-based format 

858 return { 

859 "data": result, 

860 "pagination": pag_result["pagination"], 

861 "links": pag_result["links"], 

862 } 

863 

864 # Cursor-based format 

865 

866 # Cache first page results - only for public-only queries (no user/team filtering) 

867 # SECURITY: Only cache public-only results (token_teams=[]), never admin bypass or team-scoped 

868 if cursor is None and user_email is None and is_public_only: 

869 try: 

870 cache_data = {"servers": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor} 

871 await cache.set("servers", cache_data, filters_hash) 

872 except AttributeError: 

873 pass # Skip caching if result objects don't support model_dump (e.g., in doctests) 

874 

875 return (result, next_cursor) 

876 

877 async def list_servers_for_user( 

878 self, db: Session, user_email: str, team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100 

879 ) -> List[ServerRead]: 

880 """ 

881 DEPRECATED: Use list_servers() with user_email parameter instead. 

882 

883 This method is maintained for backward compatibility but is no longer used. 

884 New code should call list_servers() with user_email, team_id, and visibility parameters. 

885 

886 List servers user has access to with team filtering. 

887 

888 Args: 

889 db: Database session 

890 user_email: Email of the user requesting servers 

891 team_id: Optional team ID to filter by specific team 

892 visibility: Optional visibility filter (private, team, public) 

893 include_inactive: Whether to include inactive servers 

894 skip: Number of servers to skip for pagination 

895 limit: Maximum number of servers to return 

896 

897 Returns: 

898 List[ServerRead]: Servers the user has access to 

899 """ 

900 # Build query following existing patterns from list_servers() 

901 team_service = TeamManagementService(db) 

902 user_teams = await team_service.get_user_teams(user_email) 

903 team_ids = [team.id for team in user_teams] 

904 

905 # Eager load relationships to avoid N+1 queries 

906 query = select(DbServer).options( 

907 selectinload(DbServer.tools), 

908 selectinload(DbServer.resources), 

909 selectinload(DbServer.prompts), 

910 selectinload(DbServer.a2a_agents), 

911 joinedload(DbServer.email_team), 

912 ) 

913 

914 # Apply active/inactive filter 

915 if not include_inactive: 

916 query = query.where(DbServer.enabled) 

917 

918 if team_id: 

919 if team_id not in team_ids: 

920 return [] # No access to team 

921 

922 access_conditions = [] 

923 # Filter by specific team 

924 access_conditions.append(and_(DbServer.team_id == team_id, DbServer.visibility.in_(["team", "public"]))) 

925 

926 access_conditions.append(and_(DbServer.team_id == team_id, DbServer.owner_email == user_email)) 

927 

928 query = query.where(or_(*access_conditions)) 

929 else: 

930 # Get user's accessible teams 

931 # Build access conditions following existing patterns 

932 access_conditions = [] 

933 

934 # 1. User's personal resources (owner_email matches) 

935 access_conditions.append(DbServer.owner_email == user_email) 

936 

937 # 2. Team resources where user is member 

938 if team_ids: 

939 access_conditions.append(and_(DbServer.team_id.in_(team_ids), DbServer.visibility.in_(["team", "public"]))) 

940 

941 # 3. Public resources (if visibility allows) 

942 access_conditions.append(DbServer.visibility == "public") 

943 

944 query = query.where(or_(*access_conditions)) 

945 

946 # Apply visibility filter if specified 

947 if visibility: 

948 query = query.where(DbServer.visibility == visibility) 

949 

950 # Apply pagination following existing patterns 

951 query = query.offset(skip).limit(limit) 

952 

953 servers = db.execute(query).scalars().all() 

954 

955 db.commit() # Release transaction to avoid idle-in-transaction 

956 

957 # Skip metrics to avoid N+1 queries in list operations 

958 # Team names are loaded via joinedload(DbServer.email_team) 

959 result = [] 

960 for s in servers: 

961 try: 

962 result.append(self.convert_server_to_read(s, include_metrics=False)) 

963 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

964 logger.exception(f"Failed to convert server {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}") 

965 # Continue with remaining servers instead of failing completely 

966 return result 

967 

968 async def get_server(self, db: Session, server_id: str) -> ServerRead: 

969 """Retrieve server details by ID. 

970 

971 Args: 

972 db: Database session. 

973 server_id: The unique identifier of the server. 

974 

975 Returns: 

976 The corresponding ServerRead object. 

977 

978 Raises: 

979 ServerNotFoundError: If no server with the given ID exists. 

980 

981 Examples: 

982 >>> from mcpgateway.services.server_service import ServerService 

983 >>> from unittest.mock import MagicMock 

984 >>> service = ServerService() 

985 >>> db = MagicMock() 

986 >>> server = MagicMock() 

987 >>> db.get.return_value = server 

988 >>> service.convert_server_to_read = MagicMock(return_value='server_read') 

989 >>> import asyncio 

990 >>> asyncio.run(service.get_server(db, 'server_id')) 

991 'server_read' 

992 """ 

993 server = db.execute( 

994 select(DbServer) 

995 .options( 

996 selectinload(DbServer.tools), 

997 selectinload(DbServer.resources), 

998 selectinload(DbServer.prompts), 

999 selectinload(DbServer.a2a_agents), 

1000 joinedload(DbServer.email_team), 

1001 ) 

1002 .where(DbServer.id == server_id) 

1003 ).scalar_one_or_none() 

1004 if not server: 

1005 raise ServerNotFoundError(f"Server not found: {server_id}") 

1006 server_data = { 

1007 "id": server.id, 

1008 "name": server.name, 

1009 "description": server.description, 

1010 "icon": server.icon, 

1011 "created_at": server.created_at, 

1012 "updated_at": server.updated_at, 

1013 "enabled": server.enabled, 

1014 "associated_tools": [tool.name for tool in server.tools], 

1015 "associated_resources": [res.id for res in server.resources], 

1016 "associated_prompts": [prompt.id for prompt in server.prompts], 

1017 } 

1018 logger.debug(f"Server Data: {server_data}") 

1019 # Team name is loaded via server.team property from email_team relationship 

1020 server_read = self.convert_server_to_read(server) 

1021 

1022 self._structured_logger.log( 

1023 level="INFO", 

1024 message="Server retrieved successfully", 

1025 event_type="server_viewed", 

1026 component="server_service", 

1027 server_id=server.id, 

1028 server_name=server.name, 

1029 team_id=getattr(server, "team_id", None), 

1030 resource_type="server", 

1031 resource_id=server.id, 

1032 custom_fields={ 

1033 "enabled": server.enabled, 

1034 "tool_count": len(getattr(server, "tools", []) or []), 

1035 "resource_count": len(getattr(server, "resources", []) or []), 

1036 "prompt_count": len(getattr(server, "prompts", []) or []), 

1037 }, 

1038 ) 

1039 

1040 self._audit_trail.log_action( 

1041 action="view_server", 

1042 resource_type="server", 

1043 resource_id=server.id, 

1044 resource_name=server.name, 

1045 user_id="system", 

1046 team_id=getattr(server, "team_id", None), 

1047 context={"enabled": server.enabled}, 

1048 db=db, 

1049 ) 

1050 

1051 return server_read 

1052 

1053 async def update_server( 

1054 self, 

1055 db: Session, 

1056 server_id: str, 

1057 server_update: ServerUpdate, 

1058 user_email: str, 

1059 modified_by: Optional[str] = None, 

1060 modified_from_ip: Optional[str] = None, 

1061 modified_via: Optional[str] = None, 

1062 modified_user_agent: Optional[str] = None, 

1063 ) -> ServerRead: 

1064 """Update an existing server. 

1065 

1066 Args: 

1067 db: Database session. 

1068 server_id: The unique identifier of the server. 

1069 server_update: Server update schema with new data. 

1070 user_email: email of the user performing the update (for permission checks). 

1071 modified_by: Username who modified this server. 

1072 modified_from_ip: IP address from which modification was made. 

1073 modified_via: Source of modification (api, ui, etc.). 

1074 modified_user_agent: User agent of the client making the modification. 

1075 

1076 Returns: 

1077 The updated ServerRead object. 

1078 

1079 Raises: 

1080 ServerNotFoundError: If the server is not found. 

1081 PermissionError: If user doesn't own the server. 

1082 ServerNameConflictError: If a new name conflicts with an existing server. 

1083 ServerError: For other update errors. 

1084 IntegrityError: If a database integrity error occurs. 

1085 ValueError: If visibility or team constraints are violated. 

1086 

1087 Examples: 

1088 >>> from mcpgateway.services.server_service import ServerService 

1089 >>> from unittest.mock import MagicMock, AsyncMock, patch 

1090 >>> from mcpgateway.schemas import ServerRead 

1091 >>> service = ServerService() 

1092 >>> db = MagicMock() 

1093 >>> server = MagicMock() 

1094 >>> server.id = 'server_id' 

1095 >>> server.name = 'test_server' 

1096 >>> server.owner_email = 'user_email' # Set owner to match user performing update 

1097 >>> server.team_id = None 

1098 >>> server.visibility = 'public' 

1099 >>> db.get.return_value = server 

1100 >>> db.commit = MagicMock() 

1101 >>> db.refresh = MagicMock() 

1102 >>> db.execute.return_value.scalar_one_or_none.return_value = None 

1103 >>> service.convert_server_to_read = MagicMock(return_value='server_read') 

1104 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes 

1105 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes 

1106 >>> ServerRead.model_validate = MagicMock(return_value='server_read') 

1107 >>> server_update = MagicMock() 

1108 >>> server_update.id = None # No UUID change 

1109 >>> server_update.name = None # No name change 

1110 >>> server_update.description = None 

1111 >>> server_update.icon = None 

1112 >>> server_update.visibility = None 

1113 >>> server_update.team_id = None 

1114 >>> import asyncio 

1115 >>> with patch('mcpgateway.services.server_service.get_for_update', return_value=server): 

1116 ... asyncio.run(service.update_server(db, 'server_id', server_update, 'user_email')) 

1117 'server_read' 

1118 """ 

1119 try: 

1120 server = get_for_update( 

1121 db, 

1122 DbServer, 

1123 server_id, 

1124 options=[ 

1125 selectinload(DbServer.tools), 

1126 selectinload(DbServer.resources), 

1127 selectinload(DbServer.prompts), 

1128 selectinload(DbServer.a2a_agents), 

1129 selectinload(DbServer.email_team), 

1130 ], 

1131 ) 

1132 if not server: 

1133 raise ServerNotFoundError(f"Server not found: {server_id}") 

1134 

1135 # Check ownership if user_email provided 

1136 if user_email: 

1137 # First-Party 

1138 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

1139 

1140 permission_service = PermissionService(db) 

1141 if not await permission_service.check_resource_ownership(user_email, server): 

1142 raise PermissionError("Only the owner can update this server") 

1143 

1144 # Check for name conflict if name is being changed and visibility is public 

1145 if server_update.name and server_update.name != server.name: 

1146 visibility = server_update.visibility or server.visibility 

1147 team_id = server_update.team_id or server.team_id 

1148 if visibility.lower() == "public": 

1149 # Check for existing public server with the same name 

1150 existing_server = get_for_update(db, DbServer, where=and_(DbServer.name == server_update.name, DbServer.visibility == "public", DbServer.id != server.id)) 

1151 if existing_server: 

1152 raise ServerNameConflictError(server_update.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility) 

1153 elif visibility.lower() == "team" and team_id: 

1154 # Check for existing team server with the same name 

1155 existing_server = get_for_update( 

1156 db, DbServer, where=and_(DbServer.name == server_update.name, DbServer.visibility == "team", DbServer.team_id == team_id, DbServer.id != server.id) 

1157 ) 

1158 if existing_server: 

1159 raise ServerNameConflictError(server_update.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility) 

1160 

1161 # Update simple fields 

1162 if server_update.id is not None and server_update.id != server.id: 

1163 # Check if the new UUID is already in use 

1164 existing = db.get(DbServer, server_update.id) 

1165 if existing: 

1166 raise ServerError(f"Server with ID {server_update.id} already exists") 

1167 server.id = server_update.id 

1168 if server_update.name is not None: 

1169 server.name = server_update.name 

1170 if server_update.description is not None: 

1171 server.description = server_update.description 

1172 if server_update.icon is not None: 

1173 server.icon = server_update.icon 

1174 

1175 if server_update.visibility is not None: 

1176 new_visibility = server_update.visibility 

1177 

1178 # Validate visibility transitions 

1179 if new_visibility == "team": 

1180 target_team_id = server_update.team_id if server_update.team_id is not None else server.team_id 

1181 _validate_server_team_assignment(db, user_email, target_team_id) 

1182 

1183 elif new_visibility == "public": 

1184 # Optional: Check if user has permission to make resources public 

1185 # This could be a platform-level permission 

1186 pass 

1187 

1188 server.visibility = new_visibility 

1189 

1190 if server_update.team_id is not None: 

1191 if server_update.team_id != server.team_id: 

1192 _validate_server_team_assignment(db, user_email, server_update.team_id) 

1193 server.team_id = server_update.team_id 

1194 

1195 # Update associated tools if provided using bulk query 

1196 if server_update.associated_tools is not None: 

1197 server.tools = [] 

1198 if server_update.associated_tools: 

1199 tool_ids = [tool_id for tool_id in server_update.associated_tools if tool_id] 

1200 if tool_ids: 

1201 tools = db.execute(select(DbTool).where(DbTool.id.in_(tool_ids))).scalars().all() 

1202 server.tools = list(tools) 

1203 

1204 # Update associated resources if provided using bulk query 

1205 if server_update.associated_resources is not None: 

1206 server.resources = [] 

1207 if server_update.associated_resources: 

1208 resource_ids = [resource_id for resource_id in server_update.associated_resources if resource_id] 

1209 if resource_ids: 

1210 resources = db.execute(select(DbResource).where(DbResource.id.in_(resource_ids))).scalars().all() 

1211 server.resources = list(resources) 

1212 

1213 # Update associated prompts if provided using bulk query 

1214 if server_update.associated_prompts is not None: 

1215 server.prompts = [] 

1216 if server_update.associated_prompts: 

1217 prompt_ids = [prompt_id for prompt_id in server_update.associated_prompts if prompt_id] 

1218 if prompt_ids: 

1219 prompts = db.execute(select(DbPrompt).where(DbPrompt.id.in_(prompt_ids))).scalars().all() 

1220 server.prompts = list(prompts) 

1221 

1222 # Update tags if provided 

1223 if server_update.tags is not None: 

1224 server.tags = server_update.tags 

1225 

1226 # Update OAuth 2.0 configuration if provided 

1227 # Track if OAuth is being explicitly disabled to prevent config re-assignment 

1228 oauth_being_disabled = server_update.oauth_enabled is not None and not server_update.oauth_enabled 

1229 

1230 if server_update.oauth_enabled is not None: 

1231 server.oauth_enabled = server_update.oauth_enabled 

1232 # If OAuth is being disabled, clear the config 

1233 if oauth_being_disabled: 

1234 server.oauth_config = None 

1235 

1236 # Only update oauth_config if OAuth is not being explicitly disabled 

1237 # This prevents the case where oauth_enabled=False and oauth_config are both provided 

1238 if not oauth_being_disabled: 

1239 if hasattr(server_update, "model_fields_set") and "oauth_config" in server_update.model_fields_set: 

1240 server.oauth_config = await protect_oauth_config_for_storage(server_update.oauth_config, existing_oauth_config=server.oauth_config) 

1241 elif server_update.oauth_config is not None: 

1242 server.oauth_config = await protect_oauth_config_for_storage(server_update.oauth_config, existing_oauth_config=server.oauth_config) 

1243 

1244 # Update metadata fields 

1245 server.updated_at = datetime.now(timezone.utc) 

1246 if modified_by: 

1247 server.modified_by = modified_by 

1248 if modified_from_ip: 

1249 server.modified_from_ip = modified_from_ip 

1250 if modified_via: 

1251 server.modified_via = modified_via 

1252 if modified_user_agent: 

1253 server.modified_user_agent = modified_user_agent 

1254 if hasattr(server, "version") and server.version is not None: 

1255 server.version = server.version + 1 

1256 else: 

1257 server.version = 1 

1258 

1259 db.commit() 

1260 db.refresh(server) 

1261 # Force loading relationships 

1262 _ = server.tools, server.resources, server.prompts 

1263 

1264 # Invalidate cache after successful update 

1265 cache = _get_registry_cache() 

1266 await cache.invalidate_servers() 

1267 # Also invalidate tags cache since server tags may have changed 

1268 # First-Party 

1269 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

1270 

1271 await admin_stats_cache.invalidate_tags() 

1272 

1273 await self._notify_server_updated(server) 

1274 logger.info(f"Updated server: {server.name}") 

1275 

1276 # Structured logging: Audit trail for server update 

1277 changes = [] 

1278 if server_update.name: 

1279 changes.append(f"name: {server_update.name}") 

1280 if server_update.visibility: 

1281 changes.append(f"visibility: {server_update.visibility}") 

1282 if server_update.team_id: 

1283 changes.append(f"team_id: {server_update.team_id}") 

1284 

1285 self._audit_trail.log_action( 

1286 user_id=user_email or "system", 

1287 action="update_server", 

1288 resource_type="server", 

1289 resource_id=server.id, 

1290 details={ 

1291 "server_name": server.name, 

1292 "changes": ", ".join(changes) if changes else "metadata only", 

1293 "version": server.version, 

1294 }, 

1295 metadata={ 

1296 "modified_from_ip": modified_from_ip, 

1297 "modified_via": modified_via, 

1298 "modified_user_agent": modified_user_agent, 

1299 }, 

1300 ) 

1301 

1302 # Structured logging: Log successful server update 

1303 self._structured_logger.log( 

1304 level="INFO", 

1305 message="Server updated successfully", 

1306 event_type="server_updated", 

1307 component="server_service", 

1308 server_id=server.id, 

1309 server_name=server.name, 

1310 modified_by=user_email, 

1311 user_email=user_email, 

1312 ) 

1313 

1314 # Build a dictionary with associated IDs 

1315 # Team name is loaded via server.team property from email_team relationship 

1316 server_data = { 

1317 "id": server.id, 

1318 "name": server.name, 

1319 "description": server.description, 

1320 "icon": server.icon, 

1321 "team": server.team, 

1322 "created_at": server.created_at, 

1323 "updated_at": server.updated_at, 

1324 "enabled": server.enabled, 

1325 "associated_tools": [tool.id for tool in server.tools], 

1326 "associated_resources": [res.id for res in server.resources], 

1327 "associated_prompts": [prompt.id for prompt in server.prompts], 

1328 } 

1329 logger.debug(f"Server Data: {server_data}") 

1330 return self.convert_server_to_read(server) 

1331 except IntegrityError as ie: 

1332 db.rollback() 

1333 logger.error(f"IntegrityErrors in group: {ie}") 

1334 

1335 # Structured logging: Log database integrity error 

1336 self._structured_logger.log( 

1337 level="ERROR", 

1338 message="Server update failed due to database integrity error", 

1339 event_type="server_update_failed", 

1340 component="server_service", 

1341 server_id=server_id, 

1342 error_type="IntegrityError", 

1343 error_message=str(ie), 

1344 modified_by=user_email, 

1345 user_email=user_email, 

1346 ) 

1347 raise ie 

1348 except ServerNameConflictError as snce: 

1349 db.rollback() 

1350 logger.error(f"Server name conflict: {snce}") 

1351 

1352 # Structured logging: Log name conflict error 

1353 self._structured_logger.log( 

1354 level="WARNING", 

1355 message="Server update failed due to name conflict", 

1356 event_type="server_name_conflict", 

1357 component="server_service", 

1358 server_id=server_id, 

1359 modified_by=user_email, 

1360 user_email=user_email, 

1361 ) 

1362 raise snce 

1363 except Exception as e: 

1364 db.rollback() 

1365 

1366 # Structured logging: Log generic server update failure 

1367 self._structured_logger.log( 

1368 level="ERROR", 

1369 message="Server update failed", 

1370 event_type="server_update_failed", 

1371 component="server_service", 

1372 server_id=server_id, 

1373 error_type=type(e).__name__, 

1374 error_message=str(e), 

1375 modified_by=user_email, 

1376 user_email=user_email, 

1377 ) 

1378 raise ServerError(f"Failed to update server: {str(e)}") 

1379 

1380 async def set_server_state(self, db: Session, server_id: str, activate: bool, user_email: Optional[str] = None) -> ServerRead: 

1381 """Set the activation status of a server. 

1382 

1383 Args: 

1384 db: Database session. 

1385 server_id: The unique identifier of the server. 

1386 activate: True to activate, False to deactivate. 

1387 user_email: Optional[str] The email of the user to check if the user has permission to modify. 

1388 

1389 Returns: 

1390 The updated ServerRead object. 

1391 

1392 Raises: 

1393 ServerNotFoundError: If the server is not found. 

1394 ServerLockConflictError: If the server row is locked by another transaction. 

1395 ServerError: For other errors. 

1396 PermissionError: If user doesn't own the agent. 

1397 

1398 Examples: 

1399 >>> from mcpgateway.services.server_service import ServerService 

1400 >>> from unittest.mock import MagicMock, AsyncMock, patch 

1401 >>> from mcpgateway.schemas import ServerRead 

1402 >>> service = ServerService() 

1403 >>> db = MagicMock() 

1404 >>> server = MagicMock() 

1405 >>> db.get.return_value = server 

1406 >>> db.commit = MagicMock() 

1407 >>> db.refresh = MagicMock() 

1408 >>> service._notify_server_activated = AsyncMock() 

1409 >>> service._notify_server_deactivated = AsyncMock() 

1410 >>> service.convert_server_to_read = MagicMock(return_value='server_read') 

1411 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes 

1412 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes 

1413 >>> ServerRead.model_validate = MagicMock(return_value='server_read') 

1414 >>> import asyncio 

1415 >>> asyncio.run(service.set_server_state(db, 'server_id', True)) 

1416 'server_read' 

1417 """ 

1418 try: 

1419 # Use nowait=True to fail fast if row is locked, preventing lock contention under high load 

1420 try: 

1421 server = get_for_update( 

1422 db, 

1423 DbServer, 

1424 server_id, 

1425 nowait=True, 

1426 options=[ 

1427 selectinload(DbServer.tools), 

1428 selectinload(DbServer.resources), 

1429 selectinload(DbServer.prompts), 

1430 selectinload(DbServer.a2a_agents), 

1431 selectinload(DbServer.email_team), 

1432 ], 

1433 ) 

1434 except OperationalError as lock_err: 

1435 # Row is locked by another transaction - fail fast with 409 

1436 db.rollback() 

1437 raise ServerLockConflictError(f"Server {server_id} is currently being modified by another request") from lock_err 

1438 if not server: 

1439 raise ServerNotFoundError(f"Server not found: {server_id}") 

1440 

1441 if user_email: 

1442 # First-Party 

1443 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

1444 

1445 permission_service = PermissionService(db) 

1446 if not await permission_service.check_resource_ownership(user_email, server): 

1447 raise PermissionError("Only the owner can activate the Server" if activate else "Only the owner can deactivate the Server") 

1448 

1449 if server.enabled != activate: 

1450 server.enabled = activate 

1451 server.updated_at = datetime.now(timezone.utc) 

1452 db.commit() 

1453 db.refresh(server) 

1454 

1455 # Invalidate cache after status change 

1456 cache = _get_registry_cache() 

1457 await cache.invalidate_servers() 

1458 

1459 if activate: 

1460 await self._notify_server_activated(server) 

1461 else: 

1462 await self._notify_server_deactivated(server) 

1463 logger.info(f"Server {server.name} {'activated' if activate else 'deactivated'}") 

1464 

1465 # Structured logging: Audit trail for server state change 

1466 self._audit_trail.log_action( 

1467 user_id=user_email or "system", 

1468 action="activate_server" if activate else "deactivate_server", 

1469 resource_type="server", 

1470 resource_id=server.id, 

1471 details={ 

1472 "server_name": server.name, 

1473 "new_status": "active" if activate else "inactive", 

1474 }, 

1475 ) 

1476 

1477 # Structured logging: Log server status change 

1478 self._structured_logger.log( 

1479 level="INFO", 

1480 message=f"Server {'activated' if activate else 'deactivated'}", 

1481 event_type="server_status_changed", 

1482 component="server_service", 

1483 server_id=server.id, 

1484 server_name=server.name, 

1485 new_status="active" if activate else "inactive", 

1486 changed_by=user_email, 

1487 user_email=user_email, 

1488 ) 

1489 

1490 # Team name is loaded via server.team property from email_team relationship 

1491 server_data = { 

1492 "id": server.id, 

1493 "name": server.name, 

1494 "description": server.description, 

1495 "icon": server.icon, 

1496 "team": server.team, 

1497 "created_at": server.created_at, 

1498 "updated_at": server.updated_at, 

1499 "enabled": server.enabled, 

1500 "associated_tools": [tool.id for tool in server.tools], 

1501 "associated_resources": [res.id for res in server.resources], 

1502 "associated_prompts": [prompt.id for prompt in server.prompts], 

1503 } 

1504 logger.info(f"Server Data: {server_data}") 

1505 return self.convert_server_to_read(server) 

1506 except PermissionError as e: 

1507 # Structured logging: Log permission error 

1508 self._structured_logger.log( 

1509 level="WARNING", 

1510 message="Server state change failed due to insufficient permissions", 

1511 event_type="server_state_change_permission_denied", 

1512 component="server_service", 

1513 server_id=server_id, 

1514 user_email=user_email, 

1515 ) 

1516 raise e 

1517 except ServerLockConflictError: 

1518 # Re-raise lock conflicts without wrapping - allows 409 response 

1519 raise 

1520 except ServerNotFoundError: 

1521 # Re-raise not found without wrapping - allows 404 response 

1522 raise 

1523 except Exception as e: 

1524 db.rollback() 

1525 

1526 # Structured logging: Log generic server state change failure 

1527 self._structured_logger.log( 

1528 level="ERROR", 

1529 message="Server state change failed", 

1530 event_type="server_state_change_failed", 

1531 component="server_service", 

1532 server_id=server_id, 

1533 error_type=type(e).__name__, 

1534 error_message=str(e), 

1535 user_email=user_email, 

1536 ) 

1537 raise ServerError(f"Failed to set server state: {str(e)}") 

1538 

1539 async def delete_server(self, db: Session, server_id: str, user_email: Optional[str] = None, purge_metrics: bool = False) -> None: 

1540 """Permanently delete a server. 

1541 

1542 Args: 

1543 db: Database session. 

1544 server_id: The unique identifier of the server. 

1545 user_email: Email of user performing deletion (for ownership check). 

1546 purge_metrics: If True, delete raw + rollup metrics for this server. 

1547 

1548 Raises: 

1549 ServerNotFoundError: If the server is not found. 

1550 PermissionError: If user doesn't own the server. 

1551 ServerError: For other deletion errors. 

1552 

1553 Examples: 

1554 >>> from mcpgateway.services.server_service import ServerService 

1555 >>> from unittest.mock import MagicMock, AsyncMock, patch 

1556 >>> service = ServerService() 

1557 >>> db = MagicMock() 

1558 >>> server = MagicMock() 

1559 >>> db.get.return_value = server 

1560 >>> db.delete = MagicMock() 

1561 >>> db.commit = MagicMock() 

1562 >>> service._notify_server_deleted = AsyncMock() 

1563 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes 

1564 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes 

1565 >>> import asyncio 

1566 >>> asyncio.run(service.delete_server(db, 'server_id', 'user@example.com')) 

1567 """ 

1568 try: 

1569 server = db.get(DbServer, server_id) 

1570 if not server: 

1571 raise ServerNotFoundError(f"Server not found: {server_id}") 

1572 

1573 # Check ownership if user_email provided 

1574 if user_email: 

1575 # First-Party 

1576 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

1577 

1578 permission_service = PermissionService(db) 

1579 if not await permission_service.check_resource_ownership(user_email, server): 

1580 raise PermissionError("Only the owner can delete this server") 

1581 

1582 server_info = {"id": server.id, "name": server.name} 

1583 if purge_metrics: 

1584 with pause_rollup_during_purge(reason=f"purge_server:{server_id}"): 

1585 delete_metrics_in_batches(db, ServerMetric, ServerMetric.server_id, server_id) 

1586 delete_metrics_in_batches(db, ServerMetricsHourly, ServerMetricsHourly.server_id, server_id) 

1587 db.delete(server) 

1588 db.commit() 

1589 

1590 # Invalidate cache after successful deletion 

1591 cache = _get_registry_cache() 

1592 await cache.invalidate_servers() 

1593 # Also invalidate tags cache since server tags may have changed 

1594 # First-Party 

1595 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

1596 

1597 await admin_stats_cache.invalidate_tags() 

1598 # First-Party 

1599 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

1600 

1601 metrics_cache.invalidate_prefix("top_servers:") 

1602 metrics_cache.invalidate("servers") 

1603 

1604 await self._notify_server_deleted(server_info) 

1605 logger.info(f"Deleted server: {server_info['name']}") 

1606 

1607 # Structured logging: Audit trail for server deletion 

1608 self._audit_trail.log_action( 

1609 user_id=user_email or "system", 

1610 action="delete_server", 

1611 resource_type="server", 

1612 resource_id=server_info["id"], 

1613 details={ 

1614 "server_name": server_info["name"], 

1615 }, 

1616 ) 

1617 

1618 # Structured logging: Log successful server deletion 

1619 self._structured_logger.log( 

1620 level="INFO", 

1621 message="Server deleted successfully", 

1622 event_type="server_deleted", 

1623 component="server_service", 

1624 server_id=server_info["id"], 

1625 server_name=server_info["name"], 

1626 deleted_by=user_email, 

1627 user_email=user_email, 

1628 purge_metrics=purge_metrics, 

1629 ) 

1630 except PermissionError as pe: 

1631 db.rollback() 

1632 

1633 # Structured logging: Log permission error 

1634 self._structured_logger.log( 

1635 level="WARNING", 

1636 message="Server deletion failed due to insufficient permissions", 

1637 event_type="server_deletion_permission_denied", 

1638 component="server_service", 

1639 server_id=server_id, 

1640 user_email=user_email, 

1641 ) 

1642 raise pe 

1643 except Exception as e: 

1644 db.rollback() 

1645 

1646 # Structured logging: Log generic server deletion failure 

1647 self._structured_logger.log( 

1648 level="ERROR", 

1649 message="Server deletion failed", 

1650 event_type="server_deletion_failed", 

1651 component="server_service", 

1652 server_id=server_id, 

1653 error_type=type(e).__name__, 

1654 error_message=str(e), 

1655 user_email=user_email, 

1656 ) 

1657 raise ServerError(f"Failed to delete server: {str(e)}") 

1658 

1659 async def _publish_event(self, event: Dict[str, Any]) -> None: 

1660 """ 

1661 Publish an event to all subscribed queues. 

1662 

1663 Args: 

1664 event: Event to publish 

1665 """ 

1666 for queue in self._event_subscribers: 

1667 await queue.put(event) 

1668 

1669 async def subscribe_events(self) -> AsyncGenerator[Dict[str, Any], None]: 

1670 """Subscribe to server events. 

1671 

1672 Yields: 

1673 Server event messages. 

1674 """ 

1675 queue: asyncio.Queue = asyncio.Queue() 

1676 self._event_subscribers.append(queue) 

1677 try: 

1678 while True: 

1679 event = await queue.get() 

1680 yield event 

1681 finally: 

1682 self._event_subscribers.remove(queue) 

1683 

1684 async def _notify_server_added(self, server: DbServer) -> None: 

1685 """ 

1686 Notify subscribers that a new server has been added. 

1687 

1688 Args: 

1689 server: Server to add 

1690 """ 

1691 associated_tools = [tool.id for tool in server.tools] if server.tools else [] 

1692 associated_resources = [res.id for res in server.resources] if server.resources else [] 

1693 associated_prompts = [prompt.id for prompt in server.prompts] if server.prompts else [] 

1694 event = { 

1695 "type": "server_added", 

1696 "data": { 

1697 "id": server.id, 

1698 "name": server.name, 

1699 "description": server.description, 

1700 "icon": server.icon, 

1701 "associated_tools": associated_tools, 

1702 "associated_resources": associated_resources, 

1703 "associated_prompts": associated_prompts, 

1704 "enabled": server.enabled, 

1705 }, 

1706 "timestamp": datetime.now(timezone.utc).isoformat(), 

1707 } 

1708 await self._publish_event(event) 

1709 

1710 async def _notify_server_updated(self, server: DbServer) -> None: 

1711 """ 

1712 Notify subscribers that a server has been updated. 

1713 

1714 Args: 

1715 server: Server to update 

1716 """ 

1717 associated_tools = [tool.id for tool in server.tools] if server.tools else [] 

1718 associated_resources = [res.id for res in server.resources] if server.resources else [] 

1719 associated_prompts = [prompt.id for prompt in server.prompts] if server.prompts else [] 

1720 event = { 

1721 "type": "server_updated", 

1722 "data": { 

1723 "id": server.id, 

1724 "name": server.name, 

1725 "description": server.description, 

1726 "icon": server.icon, 

1727 "associated_tools": associated_tools, 

1728 "associated_resources": associated_resources, 

1729 "associated_prompts": associated_prompts, 

1730 "enabled": server.enabled, 

1731 }, 

1732 "timestamp": datetime.now(timezone.utc).isoformat(), 

1733 } 

1734 await self._publish_event(event) 

1735 

1736 async def _notify_server_activated(self, server: DbServer) -> None: 

1737 """ 

1738 Notify subscribers that a server has been activated. 

1739 

1740 Args: 

1741 server: Server to activate 

1742 """ 

1743 event = { 

1744 "type": "server_activated", 

1745 "data": { 

1746 "id": server.id, 

1747 "name": server.name, 

1748 "enabled": True, 

1749 }, 

1750 "timestamp": datetime.now(timezone.utc).isoformat(), 

1751 } 

1752 await self._publish_event(event) 

1753 

1754 async def _notify_server_deactivated(self, server: DbServer) -> None: 

1755 """ 

1756 Notify subscribers that a server has been deactivated. 

1757 

1758 Args: 

1759 server: Server to deactivate 

1760 """ 

1761 event = { 

1762 "type": "server_deactivated", 

1763 "data": { 

1764 "id": server.id, 

1765 "name": server.name, 

1766 "enabled": False, 

1767 }, 

1768 "timestamp": datetime.now(timezone.utc).isoformat(), 

1769 } 

1770 await self._publish_event(event) 

1771 

1772 async def _notify_server_deleted(self, server_info: Dict[str, Any]) -> None: 

1773 """ 

1774 Notify subscribers that a server has been deleted. 

1775 

1776 Args: 

1777 server_info: Dictionary on server to be deleted 

1778 """ 

1779 event = { 

1780 "type": "server_deleted", 

1781 "data": server_info, 

1782 "timestamp": datetime.now(timezone.utc).isoformat(), 

1783 } 

1784 await self._publish_event(event) 

1785 

1786 # --- Metrics --- 

1787 async def aggregate_metrics(self, db: Session) -> ServerMetrics: 

1788 """ 

1789 Aggregate metrics for all server invocations across all servers. 

1790 

1791 Combines recent raw metrics (within retention period) with historical 

1792 hourly rollups for complete historical coverage. Uses in-memory caching 

1793 (10s TTL) to reduce database load under high request rates. 

1794 

1795 Args: 

1796 db: Database session 

1797 

1798 Returns: 

1799 ServerMetrics: Aggregated metrics from raw + hourly rollup tables. 

1800 

1801 Examples: 

1802 >>> from mcpgateway.services.server_service import ServerService 

1803 >>> service = ServerService() 

1804 >>> # Method exists and is callable 

1805 >>> callable(service.aggregate_metrics) 

1806 True 

1807 """ 

1808 # Check cache first (if enabled) 

1809 # First-Party 

1810 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel 

1811 

1812 if is_cache_enabled(): 

1813 cached = metrics_cache.get("servers") 

1814 if cached is not None: 

1815 return ServerMetrics(**cached) 

1816 

1817 # Use combined raw + rollup query for full historical coverage 

1818 # First-Party 

1819 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel 

1820 

1821 result = aggregate_metrics_combined(db, "server") 

1822 

1823 metrics = ServerMetrics( 

1824 total_executions=result.total_executions, 

1825 successful_executions=result.successful_executions, 

1826 failed_executions=result.failed_executions, 

1827 failure_rate=result.failure_rate, 

1828 min_response_time=result.min_response_time, 

1829 max_response_time=result.max_response_time, 

1830 avg_response_time=result.avg_response_time, 

1831 last_execution_time=result.last_execution_time, 

1832 ) 

1833 

1834 # Cache the result as dict for serialization compatibility (if enabled) 

1835 if is_cache_enabled(): 

1836 metrics_cache.set("servers", metrics.model_dump()) 

1837 

1838 return metrics 

1839 

1840 async def reset_metrics(self, db: Session) -> None: 

1841 """ 

1842 Reset all server metrics by deleting raw and hourly rollup records. 

1843 

1844 Args: 

1845 db: Database session 

1846 

1847 Examples: 

1848 >>> from mcpgateway.services.server_service import ServerService 

1849 >>> from unittest.mock import MagicMock 

1850 >>> service = ServerService() 

1851 >>> db = MagicMock() 

1852 >>> db.execute = MagicMock() 

1853 >>> db.commit = MagicMock() 

1854 >>> import asyncio 

1855 >>> asyncio.run(service.reset_metrics(db)) 

1856 """ 

1857 db.execute(delete(ServerMetric)) 

1858 db.execute(delete(ServerMetricsHourly)) 

1859 db.commit() 

1860 

1861 # Invalidate metrics cache 

1862 # First-Party 

1863 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

1864 

1865 metrics_cache.invalidate("servers") 

1866 metrics_cache.invalidate_prefix("top_servers:") 

1867 

1868 def get_oauth_protected_resource_metadata(self, db: Session, server_id: str, resource_base_url: str) -> Dict[str, Any]: 

1869 """ 

1870 Get RFC 9728 OAuth 2.0 Protected Resource Metadata for a server. 

1871 

1872 This method retrieves the OAuth configuration for a server and formats it 

1873 according to RFC 9728 Protected Resource Metadata specification, enabling 

1874 MCP clients to discover OAuth authorization servers for browser-based SSO. 

1875 

1876 Args: 

1877 db: Database session. 

1878 server_id: The ID of the server. 

1879 resource_base_url: The base URL for the resource (e.g., "https://gateway.example.com/servers/abc123/mcp"). 

1880 

1881 Returns: 

1882 Dict containing RFC 9728 Protected Resource Metadata: 

1883 - resource: The protected resource identifier (URL with /mcp suffix) 

1884 - authorization_servers: JSON array of authorization server issuer URIs (RFC 9728 Section 2) 

1885 - bearer_methods_supported: Supported bearer token methods (always ["header"]) 

1886 - scopes_supported: Optional list of supported scopes 

1887 

1888 Raises: 

1889 ServerNotFoundError: If server doesn't exist, is disabled, or is non-public. 

1890 ServerError: If OAuth is not enabled or not properly configured. 

1891 

1892 Examples: 

1893 >>> from mcpgateway.services.server_service import ServerService 

1894 >>> service = ServerService() 

1895 >>> # Method exists and is callable 

1896 >>> callable(service.get_oauth_protected_resource_metadata) 

1897 True 

1898 """ 

1899 server = db.get(DbServer, server_id) 

1900 

1901 # Return not found for non-existent, disabled, or non-public servers 

1902 # (avoids leaking information about private/team servers) 

1903 if not server: 

1904 raise ServerNotFoundError(f"Server not found: {server_id}") 

1905 

1906 if not server.enabled: 

1907 raise ServerNotFoundError(f"Server not found: {server_id}") 

1908 

1909 if getattr(server, "visibility", "public") != "public": 

1910 raise ServerNotFoundError(f"Server not found: {server_id}") 

1911 

1912 # Check OAuth configuration 

1913 if not getattr(server, "oauth_enabled", False): 

1914 raise ServerError(f"OAuth not enabled for server: {server_id}") 

1915 

1916 oauth_config = getattr(server, "oauth_config", None) 

1917 if not oauth_config: 

1918 raise ServerError(f"OAuth not configured for server: {server_id}") 

1919 

1920 # Extract authorization server(s) - support both list and single value in config 

1921 authorization_servers = oauth_config.get("authorization_servers", []) 

1922 if not authorization_servers: 

1923 auth_server = oauth_config.get("authorization_server") 

1924 if auth_server: 

1925 authorization_servers = [auth_server] if isinstance(auth_server, str) else auth_server 

1926 

1927 if not authorization_servers: 

1928 raise ServerError(f"OAuth authorization_server not configured for server: {server_id}") 

1929 

1930 # Build RFC 9728 Protected Resource Metadata response 

1931 response_data: Dict[str, Any] = { 

1932 "resource": resource_base_url, 

1933 "authorization_servers": authorization_servers, 

1934 "bearer_methods_supported": ["header"], 

1935 } 

1936 

1937 # Add optional scopes if configured (never include secrets from oauth_config) 

1938 scopes = oauth_config.get("scopes_supported") or oauth_config.get("scopes") 

1939 if scopes: 

1940 response_data["scopes_supported"] = scopes 

1941 

1942 logger.debug(f"Returning OAuth protected resource metadata for server {server_id}") 

1943 return response_data 

1944 

1945 

1946# Lazy singleton - created on first access, not at module import time. 

1947# This avoids instantiation when only exception classes are imported. 

1948_server_service_instance = None # pylint: disable=invalid-name 

1949 

1950 

1951def __getattr__(name: str): 

1952 """Module-level __getattr__ for lazy singleton creation. 

1953 

1954 Args: 

1955 name: The attribute name being accessed. 

1956 

1957 Returns: 

1958 The server_service singleton instance if name is "server_service". 

1959 

1960 Raises: 

1961 AttributeError: If the attribute name is not "server_service". 

1962 """ 

1963 global _server_service_instance # pylint: disable=global-statement 

1964 if name == "server_service": 

1965 if _server_service_instance is None: 

1966 _server_service_instance = ServerService() 

1967 return _server_service_instance 

1968 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")