Coverage for mcpgateway / services / server_service.py: 100%

605 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/server_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7MCP Gateway Server Service 

8 

9This module implements server management for the MCP Servers Catalog. 

10It handles server registration, listing, retrieval, updates, activation toggling, and deletion. 

11It also publishes event notifications for server changes. 

12""" 

13 

14# Standard 

15import asyncio 

16import binascii 

17from datetime import datetime, timezone 

18from typing import Any, AsyncGenerator, Dict, List, Optional, Union 

19 

20# Third-Party 

21import httpx 

22from pydantic import ValidationError 

23from sqlalchemy import and_, delete, desc, or_, select 

24from sqlalchemy.exc import IntegrityError, OperationalError 

25from sqlalchemy.orm import joinedload, selectinload, Session 

26 

27# First-Party 

28from mcpgateway.config import settings 

29from mcpgateway.db import A2AAgent as DbA2AAgent 

30from mcpgateway.db import EmailTeam as DbEmailTeam 

31from mcpgateway.db import EmailTeamMember as DbEmailTeamMember 

32from mcpgateway.db import get_for_update 

33from mcpgateway.db import Prompt as DbPrompt 

34from mcpgateway.db import Resource as DbResource 

35from mcpgateway.db import Server as DbServer 

36from mcpgateway.db import ServerMetric, ServerMetricsHourly 

37from mcpgateway.db import Tool as DbTool 

38from mcpgateway.schemas import ServerCreate, ServerMetrics, ServerRead, ServerUpdate, TopPerformer 

39from mcpgateway.services.audit_trail_service import get_audit_trail_service 

40from mcpgateway.services.logging_service import LoggingService 

41from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge 

42from mcpgateway.services.performance_tracker import get_performance_tracker 

43from mcpgateway.services.structured_logger import get_structured_logger 

44from mcpgateway.services.team_management_service import TeamManagementService 

45from mcpgateway.utils.metrics_common import build_top_performers 

46from mcpgateway.utils.pagination import unified_paginate 

47from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr 

48 

49# Cache import (lazy to avoid circular dependencies) 

50_REGISTRY_CACHE = None 

51 

52 

53def _get_registry_cache(): 

54 """Get registry cache singleton lazily. 

55 

56 Returns: 

57 RegistryCache instance. 

58 """ 

59 global _REGISTRY_CACHE # pylint: disable=global-statement 

60 if _REGISTRY_CACHE is None: 

61 # First-Party 

62 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel 

63 

64 _REGISTRY_CACHE = registry_cache 

65 return _REGISTRY_CACHE 

66 

67 

68# Initialize logging service first 

69logging_service = LoggingService() 

70logger = logging_service.get_logger(__name__) 

71 

72 

73class ServerError(Exception): 

74 """Base class for server-related errors.""" 

75 

76 

77class ServerNotFoundError(ServerError): 

78 """Raised when a requested server is not found.""" 

79 

80 

81class ServerLockConflictError(ServerError): 

82 """Raised when a server row is locked by another transaction.""" 

83 

84 

85class ServerNameConflictError(ServerError): 

86 """Raised when a server name conflicts with an existing one.""" 

87 

88 def __init__(self, name: str, enabled: bool = True, server_id: Optional[str] = None, visibility: str = "public") -> None: 

89 """ 

90 Initialize a ServerNameConflictError exception. 

91 

92 This exception indicates a server name conflict, with additional context about visibility, 

93 whether the conflicting server is active, and its ID if known. The error message starts 

94 with the visibility information. 

95 

96 Visibility rules: 

97 - public: Restricts server names globally (across all teams). 

98 - team: Restricts server names only within the same team. 

99 

100 Args: 

101 name: The server name that caused the conflict. 

102 enabled: Whether the conflicting server is currently active. Defaults to True. 

103 server_id: The ID of the conflicting server, if known. Only included in message for inactive servers. 

104 visibility: The visibility of the conflicting server (e.g., "public", "private", "team"). 

105 

106 Examples: 

107 >>> error = ServerNameConflictError("My Server") 

108 >>> str(error) 

109 'Public Server already exists with name: My Server' 

110 >>> error = ServerNameConflictError("My Server", enabled=False, server_id=123) 

111 >>> str(error) 

112 'Public Server already exists with name: My Server (currently inactive, ID: 123)' 

113 >>> error.enabled 

114 False 

115 >>> error.server_id 

116 123 

117 >>> error = ServerNameConflictError("My Server", enabled=False, visibility="team") 

118 >>> str(error) 

119 'Team Server already exists with name: My Server (currently inactive, ID: None)' 

120 >>> error.enabled 

121 False 

122 >>> error.server_id is None 

123 True 

124 """ 

125 self.name = name 

126 self.enabled = enabled 

127 self.server_id = server_id 

128 message = f"{visibility.capitalize()} Server already exists with name: {name}" 

129 if not enabled: 

130 message += f" (currently inactive, ID: {server_id})" 

131 super().__init__(message) 

132 

133 

134class ServerService: 

135 """Service for managing MCP Servers in the catalog. 

136 

137 Provides methods to create, list, retrieve, update, set state, and delete server records. 

138 Also supports event notifications for changes in server data. 

139 """ 

140 

141 def __init__(self) -> None: 

142 """Initialize a new ServerService instance. 

143 

144 Sets up the service with: 

145 - An empty list for event subscribers that will receive server change notifications 

146 - An HTTP client configured with timeout and SSL verification settings from config 

147 

148 The HTTP client is used for health checks and other server-related HTTP operations. 

149 Event subscribers can register to receive notifications about server additions, 

150 updates, activations, deactivations, and deletions. 

151 

152 Examples: 

153 >>> from mcpgateway.services.server_service import ServerService 

154 >>> service = ServerService() 

155 >>> isinstance(service._event_subscribers, list) 

156 True 

157 >>> len(service._event_subscribers) 

158 0 

159 >>> hasattr(service, '_http_client') 

160 True 

161 """ 

162 self._event_subscribers: List[asyncio.Queue] = [] 

163 self._http_client = httpx.AsyncClient( 

164 timeout=settings.federation_timeout, 

165 verify=not settings.skip_ssl_verify, 

166 limits=httpx.Limits( 

167 max_connections=settings.httpx_max_connections, 

168 max_keepalive_connections=settings.httpx_max_keepalive_connections, 

169 keepalive_expiry=settings.httpx_keepalive_expiry, 

170 ), 

171 ) 

172 self._structured_logger = get_structured_logger("server_service") 

173 self._audit_trail = get_audit_trail_service() 

174 self._performance_tracker = get_performance_tracker() 

175 

176 async def initialize(self) -> None: 

177 """Initialize the server service.""" 

178 logger.info("Initializing server service") 

179 

180 async def shutdown(self) -> None: 

181 """Shutdown the server service.""" 

182 await self._http_client.aclose() 

183 logger.info("Server service shutdown complete") 

184 

185 # get_top_server 

186 async def get_top_servers(self, db: Session, limit: Optional[int] = 5, include_deleted: bool = False) -> List[TopPerformer]: 

187 """Retrieve the top-performing servers based on execution count. 

188 

189 Queries the database to get servers with their metrics, ordered by the number of executions 

190 in descending order. Combines recent raw metrics with historical hourly rollups for complete 

191 historical coverage. Returns a list of TopPerformer objects containing server details and 

192 performance metrics. Results are cached for performance. 

193 

194 Args: 

195 db (Session): Database session for querying server metrics. 

196 limit (Optional[int]): Maximum number of servers to return. Defaults to 5. 

197 include_deleted (bool): Whether to include deleted servers from rollups. 

198 

199 Returns: 

200 List[TopPerformer]: A list of TopPerformer objects, each containing: 

201 - id: Server ID. 

202 - name: Server name. 

203 - execution_count: Total number of executions. 

204 - avg_response_time: Average response time in seconds, or None if no metrics. 

205 - success_rate: Success rate percentage, or None if no metrics. 

206 - last_execution: Timestamp of the last execution, or None if no metrics. 

207 """ 

208 # Check cache first (if enabled) 

209 # First-Party 

210 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel 

211 

212 effective_limit = limit or 5 

213 cache_key = f"top_servers:{effective_limit}:include_deleted={include_deleted}" 

214 

215 if is_cache_enabled(): 

216 cached = metrics_cache.get(cache_key) 

217 if cached is not None: 

218 return cached 

219 

220 # Use combined query that includes both raw metrics and rollup data 

221 # First-Party 

222 from mcpgateway.services.metrics_query_service import get_top_performers_combined # pylint: disable=import-outside-toplevel 

223 

224 results = get_top_performers_combined( 

225 db=db, 

226 metric_type="server", 

227 entity_model=DbServer, 

228 limit=effective_limit, 

229 include_deleted=include_deleted, 

230 ) 

231 top_performers = build_top_performers(results) 

232 

233 # Cache the result (if enabled) 

234 if is_cache_enabled(): 

235 metrics_cache.set(cache_key, top_performers) 

236 

237 return top_performers 

238 

239 def convert_server_to_read(self, server: DbServer, include_metrics: bool = False) -> ServerRead: 

240 """ 

241 Converts a DbServer instance into a ServerRead model, optionally including aggregated metrics. 

242 

243 Args: 

244 server (DbServer): The ORM instance of the server. 

245 include_metrics (bool): Whether to include metrics in the result. Defaults to False. 

246 Set to False for list operations to avoid N+1 query issues. 

247 

248 Returns: 

249 ServerRead: The Pydantic model representing the server, optionally including aggregated metrics. 

250 

251 Examples: 

252 >>> from types import SimpleNamespace 

253 >>> from datetime import datetime, timezone 

254 >>> svc = ServerService() 

255 >>> now = datetime.now(timezone.utc) 

256 >>> # Fake metric objects 

257 >>> m1 = SimpleNamespace(is_success=True, response_time=0.2, timestamp=now) 

258 >>> m2 = SimpleNamespace(is_success=False, response_time=0.4, timestamp=now) 

259 >>> server = SimpleNamespace( 

260 ... id='s1', name='S', description=None, icon=None, 

261 ... created_at=now, updated_at=now, enabled=True, 

262 ... associated_tools=[], associated_resources=[], associated_prompts=[], associated_a2a_agents=[], 

263 ... tags=[], metrics=[m1, m2], 

264 ... tools=[], resources=[], prompts=[], a2a_agents=[], 

265 ... team_id=None, owner_email=None, visibility=None, 

266 ... created_by=None, modified_by=None 

267 ... ) 

268 >>> result = svc.convert_server_to_read(server, include_metrics=True) 

269 >>> result.metrics.total_executions 

270 2 

271 >>> result.metrics.successful_executions 

272 1 

273 """ 

274 # Build dict explicitly from attributes to ensure SQLAlchemy populates them 

275 # (using __dict__.copy() can return empty dict with certain query patterns) 

276 server_dict = { 

277 "id": server.id, 

278 "name": server.name, 

279 "description": server.description, 

280 "icon": server.icon, 

281 "enabled": server.enabled, 

282 "created_at": server.created_at, 

283 "updated_at": server.updated_at, 

284 "team_id": server.team_id, 

285 "owner_email": server.owner_email, 

286 "visibility": server.visibility, 

287 "created_by": server.created_by, 

288 "created_from_ip": getattr(server, "created_from_ip", None), 

289 "created_via": getattr(server, "created_via", None), 

290 "created_user_agent": getattr(server, "created_user_agent", None), 

291 "modified_by": server.modified_by, 

292 "modified_from_ip": getattr(server, "modified_from_ip", None), 

293 "modified_via": getattr(server, "modified_via", None), 

294 "modified_user_agent": getattr(server, "modified_user_agent", None), 

295 "import_batch_id": getattr(server, "import_batch_id", None), 

296 "federation_source": getattr(server, "federation_source", None), 

297 "version": getattr(server, "version", None), 

298 "tags": server.tags or [], 

299 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata 

300 "oauth_enabled": getattr(server, "oauth_enabled", False), 

301 "oauth_config": getattr(server, "oauth_config", None), 

302 } 

303 

304 # Compute aggregated metrics only if requested (avoids N+1 queries in list operations) 

305 if include_metrics: 

306 total = 0 

307 successful = 0 

308 failed = 0 

309 min_rt = None 

310 max_rt = None 

311 sum_rt = 0.0 

312 last_time = None 

313 

314 if hasattr(server, "metrics") and server.metrics: 

315 for m in server.metrics: 

316 total += 1 

317 if m.is_success: 

318 successful += 1 

319 else: 

320 failed += 1 

321 

322 # Track min/max response times 

323 if min_rt is None or m.response_time < min_rt: 

324 min_rt = m.response_time 

325 if max_rt is None or m.response_time > max_rt: 

326 max_rt = m.response_time 

327 

328 sum_rt += m.response_time 

329 

330 # Track last execution time 

331 if last_time is None or m.timestamp > last_time: 

332 last_time = m.timestamp 

333 

334 failure_rate = (failed / total) if total > 0 else 0.0 

335 avg_rt = (sum_rt / total) if total > 0 else None 

336 

337 server_dict["metrics"] = { 

338 "total_executions": total, 

339 "successful_executions": successful, 

340 "failed_executions": failed, 

341 "failure_rate": failure_rate, 

342 "min_response_time": min_rt, 

343 "max_response_time": max_rt, 

344 "avg_response_time": avg_rt, 

345 "last_execution_time": last_time, 

346 } 

347 else: 

348 server_dict["metrics"] = None 

349 # Add associated IDs from relationships 

350 server_dict["associated_tools"] = [tool.name for tool in server.tools] if server.tools else [] 

351 server_dict["associated_resources"] = [res.id for res in server.resources] if server.resources else [] 

352 server_dict["associated_prompts"] = [prompt.id for prompt in server.prompts] if server.prompts else [] 

353 server_dict["associated_a2a_agents"] = [agent.id for agent in server.a2a_agents] if server.a2a_agents else [] 

354 

355 # Team name is loaded via server.team property from email_team relationship 

356 server_dict["team"] = getattr(server, "team", None) 

357 

358 return ServerRead.model_validate(server_dict) 

359 

360 def _assemble_associated_items( 

361 self, 

362 tools: Optional[List[str]], 

363 resources: Optional[List[str]], 

364 prompts: Optional[List[str]], 

365 a2a_agents: Optional[List[str]] = None, 

366 gateways: Optional[List[str]] = None, 

367 ) -> Dict[str, Any]: 

368 """ 

369 Assemble the associated items dictionary from the separate fields. 

370 

371 Args: 

372 tools: List of tool IDs. 

373 resources: List of resource IDs. 

374 prompts: List of prompt IDs. 

375 a2a_agents: List of A2A agent IDs. 

376 gateways: List of gateway IDs. 

377 

378 Returns: 

379 A dictionary with keys "tools", "resources", "prompts", "a2a_agents", and "gateways". 

380 

381 Examples: 

382 >>> service = ServerService() 

383 >>> # Test with all None values 

384 >>> result = service._assemble_associated_items(None, None, None) 

385 >>> result 

386 {'tools': [], 'resources': [], 'prompts': [], 'a2a_agents': [], 'gateways': []} 

387 

388 >>> # Test with empty lists 

389 >>> result = service._assemble_associated_items([], [], []) 

390 >>> result 

391 {'tools': [], 'resources': [], 'prompts': [], 'a2a_agents': [], 'gateways': []} 

392 

393 >>> # Test with actual values 

394 >>> result = service._assemble_associated_items(['tool1', 'tool2'], ['res1'], ['prompt1']) 

395 >>> result 

396 {'tools': ['tool1', 'tool2'], 'resources': ['res1'], 'prompts': ['prompt1'], 'a2a_agents': [], 'gateways': []} 

397 

398 >>> # Test with mixed None and values 

399 >>> result = service._assemble_associated_items(['tool1'], None, ['prompt1']) 

400 >>> result 

401 {'tools': ['tool1'], 'resources': [], 'prompts': ['prompt1'], 'a2a_agents': [], 'gateways': []} 

402 """ 

403 return { 

404 "tools": tools or [], 

405 "resources": resources or [], 

406 "prompts": prompts or [], 

407 "a2a_agents": a2a_agents or [], 

408 "gateways": gateways or [], 

409 } 

410 

411 async def register_server( 

412 self, 

413 db: Session, 

414 server_in: ServerCreate, 

415 created_by: Optional[str] = None, 

416 created_from_ip: Optional[str] = None, 

417 created_via: Optional[str] = None, 

418 created_user_agent: Optional[str] = None, 

419 team_id: Optional[str] = None, 

420 owner_email: Optional[str] = None, 

421 visibility: Optional[str] = "public", 

422 ) -> ServerRead: 

423 """ 

424 Register a new server in the catalog and validate that all associated items exist. 

425 

426 This function performs the following steps: 

427 1. Checks if a server with the same name already exists. 

428 2. Creates a new server record. 

429 3. For each ID provided in associated_tools, associated_resources, and associated_prompts, 

430 verifies that the corresponding item exists. If an item does not exist, an error is raised. 

431 4. Associates the verified items to the new server. 

432 5. Commits the transaction, refreshes the ORM instance, and forces the loading of relationship data. 

433 6. Constructs a response dictionary that includes lists of associated item IDs. 

434 7. Notifies subscribers of the addition and returns the validated response. 

435 

436 Args: 

437 db (Session): The SQLAlchemy database session. 

438 server_in (ServerCreate): The server creation schema containing server details and lists of 

439 associated tool, resource, and prompt IDs (as strings). 

440 created_by (Optional[str]): Email of the user creating the server, used for ownership tracking. 

441 created_from_ip (Optional[str]): IP address from which the creation request originated. 

442 created_via (Optional[str]): Source of creation (api, ui, import). 

443 created_user_agent (Optional[str]): User agent string from the creation request. 

444 team_id (Optional[str]): Team ID to assign the server to. 

445 owner_email (Optional[str]): Email of the user who owns this server. 

446 visibility (str): Server visibility level (private, team, public). 

447 

448 Returns: 

449 ServerRead: The newly created server, with associated item IDs. 

450 

451 Raises: 

452 IntegrityError: If a database integrity error occurs. 

453 ServerNameConflictError: If a server name conflict occurs (public or team visibility). 

454 ServerError: If any associated tool, resource, or prompt does not exist, or if any other registration error occurs. 

455 

456 Examples: 

457 >>> from mcpgateway.services.server_service import ServerService 

458 >>> from unittest.mock import MagicMock, AsyncMock, patch 

459 >>> from mcpgateway.schemas import ServerRead 

460 >>> service = ServerService() 

461 >>> db = MagicMock() 

462 >>> server_in = MagicMock() 

463 >>> server_in.id = None # No custom UUID for this test 

464 >>> db.execute.return_value.scalar_one_or_none.return_value = None 

465 >>> db.add = MagicMock() 

466 >>> db.commit = MagicMock() 

467 >>> db.refresh = MagicMock() 

468 >>> service._notify_server_added = AsyncMock() 

469 >>> service.convert_server_to_read = MagicMock(return_value='server_read') 

470 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes 

471 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes 

472 >>> ServerRead.model_validate = MagicMock(return_value='server_read') 

473 >>> import asyncio 

474 >>> asyncio.run(service.register_server(db, server_in)) 

475 'server_read' 

476 """ 

477 try: 

478 logger.info(f"Registering server: {server_in.name}") 

479 # # Create the new server record. 

480 db_server = DbServer( 

481 name=server_in.name, 

482 description=server_in.description, 

483 icon=server_in.icon, 

484 enabled=True, 

485 tags=server_in.tags or [], 

486 # Team scoping fields - use schema values if provided, otherwise fallback to parameters 

487 team_id=getattr(server_in, "team_id", None) or team_id, 

488 owner_email=getattr(server_in, "owner_email", None) or owner_email or created_by, 

489 # IMPORTANT: Prefer function parameter over schema default 

490 # The API has visibility as a separate Body param that should override schema default 

491 visibility=visibility or getattr(server_in, "visibility", None) or "public", 

492 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata 

493 oauth_enabled=getattr(server_in, "oauth_enabled", False) or False, 

494 oauth_config=getattr(server_in, "oauth_config", None), 

495 # Metadata fields 

496 created_by=created_by, 

497 created_from_ip=created_from_ip, 

498 created_via=created_via, 

499 created_user_agent=created_user_agent, 

500 version=1, 

501 ) 

502 # Check for existing server with the same name (with row locking to prevent race conditions) 

503 # The unique constraint is on (team_id, owner_email, name), so we check based on that 

504 owner_email_to_check = getattr(server_in, "owner_email", None) or owner_email or created_by 

505 team_id_to_check = getattr(server_in, "team_id", None) or team_id 

506 

507 # Build conditions based on the actual unique constraint: (team_id, owner_email, name) 

508 conditions = [ 

509 DbServer.name == server_in.name, 

510 DbServer.team_id == team_id_to_check if team_id_to_check else DbServer.team_id.is_(None), 

511 DbServer.owner_email == owner_email_to_check if owner_email_to_check else DbServer.owner_email.is_(None), 

512 ] 

513 if server_in.id: 

514 conditions.append(DbServer.id != server_in.id) 

515 

516 existing_server = get_for_update(db, DbServer, where=and_(*conditions)) 

517 if existing_server: 

518 raise ServerNameConflictError(server_in.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility) 

519 # Set custom UUID if provided 

520 if server_in.id: 

521 logger.info(f"Setting custom UUID for server: {server_in.id}") 

522 db_server.id = server_in.id 

523 logger.info(f"Adding server to DB session: {db_server.name}") 

524 db.add(db_server) 

525 

526 # Associate tools, verifying each exists using bulk query when multiple items 

527 if server_in.associated_tools: 

528 tool_ids = [tool_id.strip() for tool_id in server_in.associated_tools if tool_id.strip()] 

529 if len(tool_ids) > 1: 

530 # Use bulk query for multiple items 

531 tools = db.execute(select(DbTool).where(DbTool.id.in_(tool_ids))).scalars().all() 

532 found_tool_ids = {tool.id for tool in tools} 

533 missing_tool_ids = set(tool_ids) - found_tool_ids 

534 if missing_tool_ids: 

535 raise ServerError(f"Tools with ids {missing_tool_ids} do not exist.") 

536 db_server.tools.extend(tools) 

537 elif tool_ids: 

538 # Use single query for single item (maintains test compatibility) 

539 tool_obj = db.get(DbTool, tool_ids[0]) 

540 if not tool_obj: 

541 raise ServerError(f"Tool with id {tool_ids[0]} does not exist.") 

542 db_server.tools.append(tool_obj) 

543 

544 # Associate resources, verifying each exists using bulk query when multiple items 

545 if server_in.associated_resources: 

546 resource_ids = [resource_id.strip() for resource_id in server_in.associated_resources if resource_id.strip()] 

547 if len(resource_ids) > 1: 

548 # Use bulk query for multiple items 

549 resources = db.execute(select(DbResource).where(DbResource.id.in_(resource_ids))).scalars().all() 

550 found_resource_ids = {resource.id for resource in resources} 

551 missing_resource_ids = set(resource_ids) - found_resource_ids 

552 if missing_resource_ids: 

553 raise ServerError(f"Resources with ids {missing_resource_ids} do not exist.") 

554 db_server.resources.extend(resources) 

555 elif resource_ids: 

556 # Use single query for single item (maintains test compatibility) 

557 resource_obj = db.get(DbResource, resource_ids[0]) 

558 if not resource_obj: 

559 raise ServerError(f"Resource with id {resource_ids[0]} does not exist.") 

560 db_server.resources.append(resource_obj) 

561 

562 # Associate prompts, verifying each exists using bulk query when multiple items 

563 if server_in.associated_prompts: 

564 prompt_ids = [prompt_id.strip() for prompt_id in server_in.associated_prompts if prompt_id.strip()] 

565 if len(prompt_ids) > 1: 

566 # Use bulk query for multiple items 

567 prompts = db.execute(select(DbPrompt).where(DbPrompt.id.in_(prompt_ids))).scalars().all() 

568 found_prompt_ids = {prompt.id for prompt in prompts} 

569 missing_prompt_ids = set(prompt_ids) - found_prompt_ids 

570 if missing_prompt_ids: 

571 raise ServerError(f"Prompts with ids {missing_prompt_ids} do not exist.") 

572 db_server.prompts.extend(prompts) 

573 elif prompt_ids: 

574 # Use single query for single item (maintains test compatibility) 

575 prompt_obj = db.get(DbPrompt, prompt_ids[0]) 

576 if not prompt_obj: 

577 raise ServerError(f"Prompt with id {prompt_ids[0]} does not exist.") 

578 db_server.prompts.append(prompt_obj) 

579 

580 # Associate A2A agents, verifying each exists using bulk query when multiple items 

581 if server_in.associated_a2a_agents: 

582 agent_ids = [agent_id.strip() for agent_id in server_in.associated_a2a_agents if agent_id.strip()] 

583 if len(agent_ids) > 1: 

584 # Use bulk query for multiple items 

585 agents = db.execute(select(DbA2AAgent).where(DbA2AAgent.id.in_(agent_ids))).scalars().all() 

586 found_agent_ids = {agent.id for agent in agents} 

587 missing_agent_ids = set(agent_ids) - found_agent_ids 

588 if missing_agent_ids: 

589 raise ServerError(f"A2A Agents with ids {missing_agent_ids} do not exist.") 

590 db_server.a2a_agents.extend(agents) 

591 

592 # Note: Auto-tool creation for A2A agents should be handled 

593 # by a separate service or background task to avoid circular imports 

594 for agent in agents: 

595 logger.info(f"A2A agent {agent.name} associated with server {db_server.name}") 

596 elif agent_ids: 

597 # Use single query for single item (maintains test compatibility) 

598 agent_obj = db.get(DbA2AAgent, agent_ids[0]) 

599 if not agent_obj: 

600 raise ServerError(f"A2A Agent with id {agent_ids[0]} does not exist.") 

601 db_server.a2a_agents.append(agent_obj) 

602 logger.info(f"A2A agent {agent_obj.name} associated with server {db_server.name}") 

603 

604 # Commit the new record and refresh. 

605 db.commit() 

606 db.refresh(db_server) 

607 # Force load the relationship attributes. 

608 _ = db_server.tools, db_server.resources, db_server.prompts, db_server.a2a_agents 

609 

610 # Assemble response data with associated item IDs. 

611 server_data = { 

612 "id": db_server.id, 

613 "name": db_server.name, 

614 "description": db_server.description, 

615 "icon": db_server.icon, 

616 "created_at": db_server.created_at, 

617 "updated_at": db_server.updated_at, 

618 "enabled": db_server.enabled, 

619 "associated_tools": [str(tool.id) for tool in db_server.tools], 

620 "associated_resources": [str(resource.id) for resource in db_server.resources], 

621 "associated_prompts": [str(prompt.id) for prompt in db_server.prompts], 

622 } 

623 logger.debug(f"Server Data: {server_data}") 

624 await self._notify_server_added(db_server) 

625 logger.info(f"Registered server: {server_in.name}") 

626 

627 # Structured logging: Audit trail for server creation 

628 self._audit_trail.log_action( 

629 user_id=created_by or "system", 

630 action="create_server", 

631 resource_type="server", 

632 resource_id=db_server.id, 

633 details={ 

634 "server_name": db_server.name, 

635 "visibility": visibility, 

636 "team_id": team_id, 

637 "associated_tools_count": len(db_server.tools), 

638 "associated_resources_count": len(db_server.resources), 

639 "associated_prompts_count": len(db_server.prompts), 

640 "associated_a2a_agents_count": len(db_server.a2a_agents), 

641 }, 

642 metadata={ 

643 "created_from_ip": created_from_ip, 

644 "created_via": created_via, 

645 "created_user_agent": created_user_agent, 

646 }, 

647 ) 

648 

649 # Structured logging: Log successful server creation 

650 self._structured_logger.log( 

651 level="INFO", 

652 message="Server created successfully", 

653 event_type="server_created", 

654 component="server_service", 

655 server_id=db_server.id, 

656 server_name=db_server.name, 

657 visibility=visibility, 

658 created_by=created_by, 

659 user_email=created_by, 

660 ) 

661 

662 # Team name is loaded via db_server.team property from email_team relationship 

663 return self.convert_server_to_read(db_server) 

664 except IntegrityError as ie: 

665 db.rollback() 

666 logger.error(f"IntegrityErrors in group: {ie}") 

667 

668 # Structured logging: Log database integrity error 

669 self._structured_logger.log( 

670 level="ERROR", 

671 message="Server creation failed due to database integrity error", 

672 event_type="server_creation_failed", 

673 component="server_service", 

674 server_name=server_in.name, 

675 error_type="IntegrityError", 

676 error_message=str(ie), 

677 created_by=created_by, 

678 user_email=created_by, 

679 ) 

680 raise ie 

681 except ServerNameConflictError as se: 

682 db.rollback() 

683 

684 # Structured logging: Log name conflict error 

685 self._structured_logger.log( 

686 level="WARNING", 

687 message="Server creation failed due to name conflict", 

688 event_type="server_name_conflict", 

689 component="server_service", 

690 server_name=server_in.name, 

691 visibility=visibility, 

692 created_by=created_by, 

693 user_email=created_by, 

694 ) 

695 raise se 

696 except Exception as ex: 

697 db.rollback() 

698 

699 # Structured logging: Log generic server creation failure 

700 self._structured_logger.log( 

701 level="ERROR", 

702 message="Server creation failed", 

703 event_type="server_creation_failed", 

704 component="server_service", 

705 server_name=server_in.name, 

706 error_type=type(ex).__name__, 

707 error_message=str(ex), 

708 created_by=created_by, 

709 user_email=created_by, 

710 ) 

711 raise ServerError(f"Failed to register server: {str(ex)}") 

712 

713 async def list_servers( 

714 self, 

715 db: Session, 

716 include_inactive: bool = False, 

717 tags: Optional[List[str]] = None, 

718 cursor: Optional[str] = None, 

719 limit: Optional[int] = None, 

720 page: Optional[int] = None, 

721 per_page: Optional[int] = None, 

722 user_email: Optional[str] = None, 

723 team_id: Optional[str] = None, 

724 visibility: Optional[str] = None, 

725 token_teams: Optional[List[str]] = None, 

726 ) -> Union[tuple[List[ServerRead], Optional[str]], Dict[str, Any]]: 

727 """List all registered servers with cursor or page-based pagination and optional team filtering. 

728 

729 Args: 

730 db: Database session. 

731 include_inactive: Whether to include inactive servers. 

732 tags: Filter servers by tags. If provided, only servers with at least one matching tag will be returned. 

733 cursor: Cursor for pagination (encoded last created_at and id). 

734 limit: Maximum number of servers to return. None for default, 0 for unlimited. 

735 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

736 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size. 

737 user_email: Email of user for team-based access control. None for no access control. 

738 team_id: Optional team ID to filter by specific team (requires user_email). 

739 visibility: Optional visibility filter (private, team, public) (requires user_email). 

740 token_teams: Optional list of team IDs from the token (None=unrestricted, []=public-only). 

741 

742 Returns: 

743 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}} 

744 If cursor is provided or neither: tuple of (list of ServerRead objects, next_cursor). 

745 

746 Examples: 

747 >>> from mcpgateway.services.server_service import ServerService 

748 >>> from unittest.mock import MagicMock 

749 >>> service = ServerService() 

750 >>> db = MagicMock() 

751 >>> server_read = MagicMock() 

752 >>> service.convert_server_to_read = MagicMock(return_value=server_read) 

753 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()] 

754 >>> import asyncio 

755 >>> servers, cursor = asyncio.run(service.list_servers(db)) 

756 >>> isinstance(servers, list) and cursor is None 

757 True 

758 """ 

759 # Check cache for first page only 

760 # SECURITY: Only cache public-only results (token_teams=[]) 

761 # - token_teams=None (admin bypass): Don't cache - admin sees all, should be fresh 

762 # - token_teams=[] (public-only): Cache - same result for all public-only users 

763 # - token_teams=[...] (team-scoped): Don't cache - results vary by team 

764 # - user_email set: Don't cache - results vary by user ownership 

765 cache = _get_registry_cache() 

766 is_public_only = token_teams is not None and len(token_teams) == 0 

767 use_cache = cursor is None and user_email is None and page is None and is_public_only 

768 if use_cache: 

769 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None) 

770 cached = await cache.get("servers", filters_hash) 

771 if cached is not None: 

772 # Reconstruct ServerRead objects from cached dicts 

773 cached_servers = [ServerRead.model_validate(s) for s in cached["servers"]] 

774 return (cached_servers, cached.get("next_cursor")) 

775 

776 # Build base query with ordering and eager load relationships to avoid N+1 

777 query = ( 

778 select(DbServer) 

779 .options( 

780 selectinload(DbServer.tools), 

781 selectinload(DbServer.resources), 

782 selectinload(DbServer.prompts), 

783 selectinload(DbServer.a2a_agents), 

784 joinedload(DbServer.email_team), 

785 ) 

786 .order_by(desc(DbServer.created_at), desc(DbServer.id)) 

787 ) 

788 

789 # Apply active/inactive filter 

790 if not include_inactive: 

791 query = query.where(DbServer.enabled) 

792 

793 # SECURITY: Apply token-based access control based on normalized token_teams 

794 # - token_teams is None: admin bypass (is_admin=true with explicit null teams) - sees all 

795 # - token_teams is []: public-only access (missing teams or explicit empty) 

796 # - token_teams is [...]: access to specified teams + public + user's own 

797 if token_teams is not None: 

798 if len(token_teams) == 0: 

799 # Public-only token: only access public servers 

800 query = query.where(DbServer.visibility == "public") 

801 else: 

802 # Team-scoped token: public servers + servers in allowed teams + user's own 

803 access_conditions = [ 

804 DbServer.visibility == "public", 

805 and_(DbServer.team_id.in_(token_teams), DbServer.visibility.in_(["team", "public"])), 

806 ] 

807 if user_email: 

808 access_conditions.append(and_(DbServer.owner_email == user_email, DbServer.visibility == "private")) 

809 query = query.where(or_(*access_conditions)) 

810 

811 if visibility: 

812 query = query.where(DbServer.visibility == visibility) 

813 

814 # Apply team-based access control if user_email is provided (and no token_teams filtering) 

815 elif user_email: 

816 team_service = TeamManagementService(db) 

817 user_teams = await team_service.get_user_teams(user_email) 

818 team_ids = [team.id for team in user_teams] 

819 

820 if team_id: 

821 # User requesting specific team - verify access 

822 if team_id not in team_ids: 

823 return ([], None) 

824 access_conditions = [ 

825 and_(DbServer.team_id == team_id, DbServer.visibility.in_(["team", "public"])), 

826 and_(DbServer.team_id == team_id, DbServer.owner_email == user_email), 

827 ] 

828 query = query.where(or_(*access_conditions)) 

829 else: 

830 # General access: user's servers + public servers + team servers 

831 access_conditions = [ 

832 DbServer.owner_email == user_email, 

833 DbServer.visibility == "public", 

834 ] 

835 if team_ids: 

836 access_conditions.append(and_(DbServer.team_id.in_(team_ids), DbServer.visibility.in_(["team", "public"]))) 

837 query = query.where(or_(*access_conditions)) 

838 

839 if visibility: 

840 query = query.where(DbServer.visibility == visibility) 

841 

842 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats) 

843 if tags: 

844 query = query.where(json_contains_tag_expr(db, DbServer.tags, tags, match_any=True)) 

845 

846 # Use unified pagination helper - handles both page and cursor pagination 

847 pag_result = await unified_paginate( 

848 db=db, 

849 query=query, 

850 page=page, 

851 per_page=per_page, 

852 cursor=cursor, 

853 limit=limit, 

854 base_url="/admin/servers", # Used for page-based links 

855 query_params={"include_inactive": include_inactive} if include_inactive else {}, 

856 ) 

857 

858 next_cursor = None 

859 # Extract servers based on pagination type 

860 if page is not None: 

861 # Page-based: pag_result is a dict 

862 servers_db = pag_result["data"] 

863 else: 

864 # Cursor-based: pag_result is a tuple 

865 servers_db, next_cursor = pag_result 

866 

867 db.commit() # Release transaction to avoid idle-in-transaction 

868 

869 # Convert to ServerRead (common for both pagination types) 

870 # Team names are loaded via joinedload(DbServer.email_team) 

871 result = [] 

872 for s in servers_db: 

873 try: 

874 result.append(self.convert_server_to_read(s, include_metrics=False)) 

875 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

876 logger.exception(f"Failed to convert server {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}") 

877 # Continue with remaining servers instead of failing completely 

878 

879 # Return appropriate format based on pagination type 

880 if page is not None: 

881 # Page-based format 

882 return { 

883 "data": result, 

884 "pagination": pag_result["pagination"], 

885 "links": pag_result["links"], 

886 } 

887 

888 # Cursor-based format 

889 

890 # Cache first page results - only for public-only queries (no user/team filtering) 

891 # SECURITY: Only cache public-only results (token_teams=[]), never admin bypass or team-scoped 

892 if cursor is None and user_email is None and is_public_only: 

893 try: 

894 cache_data = {"servers": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor} 

895 await cache.set("servers", cache_data, filters_hash) 

896 except AttributeError: 

897 pass # Skip caching if result objects don't support model_dump (e.g., in doctests) 

898 

899 return (result, next_cursor) 

900 

901 async def list_servers_for_user( 

902 self, db: Session, user_email: str, team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100 

903 ) -> List[ServerRead]: 

904 """ 

905 DEPRECATED: Use list_servers() with user_email parameter instead. 

906 

907 This method is maintained for backward compatibility but is no longer used. 

908 New code should call list_servers() with user_email, team_id, and visibility parameters. 

909 

910 List servers user has access to with team filtering. 

911 

912 Args: 

913 db: Database session 

914 user_email: Email of the user requesting servers 

915 team_id: Optional team ID to filter by specific team 

916 visibility: Optional visibility filter (private, team, public) 

917 include_inactive: Whether to include inactive servers 

918 skip: Number of servers to skip for pagination 

919 limit: Maximum number of servers to return 

920 

921 Returns: 

922 List[ServerRead]: Servers the user has access to 

923 """ 

924 # Build query following existing patterns from list_servers() 

925 team_service = TeamManagementService(db) 

926 user_teams = await team_service.get_user_teams(user_email) 

927 team_ids = [team.id for team in user_teams] 

928 

929 # Eager load relationships to avoid N+1 queries 

930 query = select(DbServer).options( 

931 selectinload(DbServer.tools), 

932 selectinload(DbServer.resources), 

933 selectinload(DbServer.prompts), 

934 selectinload(DbServer.a2a_agents), 

935 joinedload(DbServer.email_team), 

936 ) 

937 

938 # Apply active/inactive filter 

939 if not include_inactive: 

940 query = query.where(DbServer.enabled) 

941 

942 if team_id: 

943 if team_id not in team_ids: 

944 return [] # No access to team 

945 

946 access_conditions = [] 

947 # Filter by specific team 

948 access_conditions.append(and_(DbServer.team_id == team_id, DbServer.visibility.in_(["team", "public"]))) 

949 

950 access_conditions.append(and_(DbServer.team_id == team_id, DbServer.owner_email == user_email)) 

951 

952 query = query.where(or_(*access_conditions)) 

953 else: 

954 # Get user's accessible teams 

955 # Build access conditions following existing patterns 

956 access_conditions = [] 

957 

958 # 1. User's personal resources (owner_email matches) 

959 access_conditions.append(DbServer.owner_email == user_email) 

960 

961 # 2. Team resources where user is member 

962 if team_ids: 

963 access_conditions.append(and_(DbServer.team_id.in_(team_ids), DbServer.visibility.in_(["team", "public"]))) 

964 

965 # 3. Public resources (if visibility allows) 

966 access_conditions.append(DbServer.visibility == "public") 

967 

968 query = query.where(or_(*access_conditions)) 

969 

970 # Apply visibility filter if specified 

971 if visibility: 

972 query = query.where(DbServer.visibility == visibility) 

973 

974 # Apply pagination following existing patterns 

975 query = query.offset(skip).limit(limit) 

976 

977 servers = db.execute(query).scalars().all() 

978 

979 db.commit() # Release transaction to avoid idle-in-transaction 

980 

981 # Skip metrics to avoid N+1 queries in list operations 

982 # Team names are loaded via joinedload(DbServer.email_team) 

983 result = [] 

984 for s in servers: 

985 try: 

986 result.append(self.convert_server_to_read(s, include_metrics=False)) 

987 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

988 logger.exception(f"Failed to convert server {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}") 

989 # Continue with remaining servers instead of failing completely 

990 return result 

991 

992 async def get_server(self, db: Session, server_id: str) -> ServerRead: 

993 """Retrieve server details by ID. 

994 

995 Args: 

996 db: Database session. 

997 server_id: The unique identifier of the server. 

998 

999 Returns: 

1000 The corresponding ServerRead object. 

1001 

1002 Raises: 

1003 ServerNotFoundError: If no server with the given ID exists. 

1004 

1005 Examples: 

1006 >>> from mcpgateway.services.server_service import ServerService 

1007 >>> from unittest.mock import MagicMock 

1008 >>> service = ServerService() 

1009 >>> db = MagicMock() 

1010 >>> server = MagicMock() 

1011 >>> db.get.return_value = server 

1012 >>> service.convert_server_to_read = MagicMock(return_value='server_read') 

1013 >>> import asyncio 

1014 >>> asyncio.run(service.get_server(db, 'server_id')) 

1015 'server_read' 

1016 """ 

1017 server = db.execute( 

1018 select(DbServer) 

1019 .options( 

1020 selectinload(DbServer.tools), 

1021 selectinload(DbServer.resources), 

1022 selectinload(DbServer.prompts), 

1023 selectinload(DbServer.a2a_agents), 

1024 joinedload(DbServer.email_team), 

1025 ) 

1026 .where(DbServer.id == server_id) 

1027 ).scalar_one_or_none() 

1028 if not server: 

1029 raise ServerNotFoundError(f"Server not found: {server_id}") 

1030 server_data = { 

1031 "id": server.id, 

1032 "name": server.name, 

1033 "description": server.description, 

1034 "icon": server.icon, 

1035 "created_at": server.created_at, 

1036 "updated_at": server.updated_at, 

1037 "enabled": server.enabled, 

1038 "associated_tools": [tool.name for tool in server.tools], 

1039 "associated_resources": [res.id for res in server.resources], 

1040 "associated_prompts": [prompt.id for prompt in server.prompts], 

1041 } 

1042 logger.debug(f"Server Data: {server_data}") 

1043 # Team name is loaded via server.team property from email_team relationship 

1044 server_read = self.convert_server_to_read(server) 

1045 

1046 self._structured_logger.log( 

1047 level="INFO", 

1048 message="Server retrieved successfully", 

1049 event_type="server_viewed", 

1050 component="server_service", 

1051 server_id=server.id, 

1052 server_name=server.name, 

1053 team_id=getattr(server, "team_id", None), 

1054 resource_type="server", 

1055 resource_id=server.id, 

1056 custom_fields={ 

1057 "enabled": server.enabled, 

1058 "tool_count": len(getattr(server, "tools", []) or []), 

1059 "resource_count": len(getattr(server, "resources", []) or []), 

1060 "prompt_count": len(getattr(server, "prompts", []) or []), 

1061 }, 

1062 db=db, 

1063 ) 

1064 

1065 self._audit_trail.log_action( 

1066 action="view_server", 

1067 resource_type="server", 

1068 resource_id=server.id, 

1069 resource_name=server.name, 

1070 user_id="system", 

1071 team_id=getattr(server, "team_id", None), 

1072 context={"enabled": server.enabled}, 

1073 db=db, 

1074 ) 

1075 

1076 return server_read 

1077 

1078 async def update_server( 

1079 self, 

1080 db: Session, 

1081 server_id: str, 

1082 server_update: ServerUpdate, 

1083 user_email: str, 

1084 modified_by: Optional[str] = None, 

1085 modified_from_ip: Optional[str] = None, 

1086 modified_via: Optional[str] = None, 

1087 modified_user_agent: Optional[str] = None, 

1088 ) -> ServerRead: 

1089 """Update an existing server. 

1090 

1091 Args: 

1092 db: Database session. 

1093 server_id: The unique identifier of the server. 

1094 server_update: Server update schema with new data. 

1095 user_email: email of the user performing the update (for permission checks). 

1096 modified_by: Username who modified this server. 

1097 modified_from_ip: IP address from which modification was made. 

1098 modified_via: Source of modification (api, ui, etc.). 

1099 modified_user_agent: User agent of the client making the modification. 

1100 

1101 Returns: 

1102 The updated ServerRead object. 

1103 

1104 Raises: 

1105 ServerNotFoundError: If the server is not found. 

1106 PermissionError: If user doesn't own the server. 

1107 ServerNameConflictError: If a new name conflicts with an existing server. 

1108 ServerError: For other update errors. 

1109 IntegrityError: If a database integrity error occurs. 

1110 ValueError: If visibility or team constraints are violated. 

1111 

1112 Examples: 

1113 >>> from mcpgateway.services.server_service import ServerService 

1114 >>> from unittest.mock import MagicMock, AsyncMock, patch 

1115 >>> from mcpgateway.schemas import ServerRead 

1116 >>> service = ServerService() 

1117 >>> db = MagicMock() 

1118 >>> server = MagicMock() 

1119 >>> server.id = 'server_id' 

1120 >>> server.name = 'test_server' 

1121 >>> server.owner_email = 'user_email' # Set owner to match user performing update 

1122 >>> server.team_id = None 

1123 >>> server.visibility = 'public' 

1124 >>> db.get.return_value = server 

1125 >>> db.commit = MagicMock() 

1126 >>> db.refresh = MagicMock() 

1127 >>> db.execute.return_value.scalar_one_or_none.return_value = None 

1128 >>> service.convert_server_to_read = MagicMock(return_value='server_read') 

1129 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes 

1130 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes 

1131 >>> ServerRead.model_validate = MagicMock(return_value='server_read') 

1132 >>> server_update = MagicMock() 

1133 >>> server_update.id = None # No UUID change 

1134 >>> server_update.name = None # No name change 

1135 >>> server_update.description = None 

1136 >>> server_update.icon = None 

1137 >>> server_update.visibility = None 

1138 >>> server_update.team_id = None 

1139 >>> import asyncio 

1140 >>> with patch('mcpgateway.services.server_service.get_for_update', return_value=server): 

1141 ... asyncio.run(service.update_server(db, 'server_id', server_update, 'user_email')) 

1142 'server_read' 

1143 """ 

1144 try: 

1145 server = get_for_update( 

1146 db, 

1147 DbServer, 

1148 server_id, 

1149 options=[ 

1150 selectinload(DbServer.tools), 

1151 selectinload(DbServer.resources), 

1152 selectinload(DbServer.prompts), 

1153 selectinload(DbServer.a2a_agents), 

1154 selectinload(DbServer.email_team), 

1155 ], 

1156 ) 

1157 if not server: 

1158 raise ServerNotFoundError(f"Server not found: {server_id}") 

1159 

1160 # Check ownership if user_email provided 

1161 if user_email: 

1162 # First-Party 

1163 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

1164 

1165 permission_service = PermissionService(db) 

1166 if not await permission_service.check_resource_ownership(user_email, server): 

1167 raise PermissionError("Only the owner can update this server") 

1168 

1169 # Check for name conflict if name is being changed and visibility is public 

1170 if server_update.name and server_update.name != server.name: 

1171 visibility = server_update.visibility or server.visibility 

1172 team_id = server_update.team_id or server.team_id 

1173 if visibility.lower() == "public": 

1174 # Check for existing public server with the same name 

1175 existing_server = get_for_update(db, DbServer, where=and_(DbServer.name == server_update.name, DbServer.visibility == "public", DbServer.id != server.id)) 

1176 if existing_server: 

1177 raise ServerNameConflictError(server_update.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility) 

1178 elif visibility.lower() == "team" and team_id: 

1179 # Check for existing team server with the same name 

1180 existing_server = get_for_update( 

1181 db, DbServer, where=and_(DbServer.name == server_update.name, DbServer.visibility == "team", DbServer.team_id == team_id, DbServer.id != server.id) 

1182 ) 

1183 if existing_server: 

1184 raise ServerNameConflictError(server_update.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility) 

1185 

1186 # Update simple fields 

1187 if server_update.id is not None and server_update.id != server.id: 

1188 # Check if the new UUID is already in use 

1189 existing = db.get(DbServer, server_update.id) 

1190 if existing: 

1191 raise ServerError(f"Server with ID {server_update.id} already exists") 

1192 server.id = server_update.id 

1193 if server_update.name is not None: 

1194 server.name = server_update.name 

1195 if server_update.description is not None: 

1196 server.description = server_update.description 

1197 if server_update.icon is not None: 

1198 server.icon = server_update.icon 

1199 

1200 if server_update.visibility is not None: 

1201 new_visibility = server_update.visibility 

1202 

1203 # Validate visibility transitions 

1204 if new_visibility == "team": 

1205 if not server.team_id and not server_update.team_id: 

1206 raise ValueError("Cannot set visibility to 'team' without a team_id") 

1207 

1208 # Verify team exists and user is a member 

1209 if server.team_id: 

1210 team_id = server.team_id 

1211 else: 

1212 team_id = server_update.team_id 

1213 

1214 team = db.query(DbEmailTeam).filter(DbEmailTeam.id == team_id).first() 

1215 if not team: 

1216 raise ValueError(f"Team {team_id} not found") 

1217 

1218 # Verify user is a member of the team 

1219 membership = ( 

1220 db.query(DbEmailTeamMember) 

1221 .filter(DbEmailTeamMember.team_id == team_id, DbEmailTeamMember.user_email == user_email, DbEmailTeamMember.is_active, DbEmailTeamMember.role == "owner") 

1222 .first() 

1223 ) 

1224 if not membership: 

1225 raise ValueError("User membership in team not sufficient for this update.") 

1226 

1227 elif new_visibility == "public": 

1228 # Optional: Check if user has permission to make resources public 

1229 # This could be a platform-level permission 

1230 pass 

1231 

1232 server.visibility = new_visibility 

1233 

1234 if server_update.team_id is not None: 

1235 server.team_id = server_update.team_id 

1236 

1237 if server_update.owner_email is not None: 

1238 server.owner_email = server_update.owner_email 

1239 

1240 # Update associated tools if provided using bulk query 

1241 if server_update.associated_tools is not None: 

1242 server.tools = [] 

1243 if server_update.associated_tools: 

1244 tool_ids = [tool_id for tool_id in server_update.associated_tools if tool_id] 

1245 if tool_ids: 

1246 tools = db.execute(select(DbTool).where(DbTool.id.in_(tool_ids))).scalars().all() 

1247 server.tools = list(tools) 

1248 

1249 # Update associated resources if provided using bulk query 

1250 if server_update.associated_resources is not None: 

1251 server.resources = [] 

1252 if server_update.associated_resources: 

1253 resource_ids = [resource_id for resource_id in server_update.associated_resources if resource_id] 

1254 if resource_ids: 

1255 resources = db.execute(select(DbResource).where(DbResource.id.in_(resource_ids))).scalars().all() 

1256 server.resources = list(resources) 

1257 

1258 # Update associated prompts if provided using bulk query 

1259 if server_update.associated_prompts is not None: 

1260 server.prompts = [] 

1261 if server_update.associated_prompts: 

1262 prompt_ids = [prompt_id for prompt_id in server_update.associated_prompts if prompt_id] 

1263 if prompt_ids: 

1264 prompts = db.execute(select(DbPrompt).where(DbPrompt.id.in_(prompt_ids))).scalars().all() 

1265 server.prompts = list(prompts) 

1266 

1267 # Update tags if provided 

1268 if server_update.tags is not None: 

1269 server.tags = server_update.tags 

1270 

1271 # Update OAuth 2.0 configuration if provided 

1272 # Track if OAuth is being explicitly disabled to prevent config re-assignment 

1273 oauth_being_disabled = server_update.oauth_enabled is not None and not server_update.oauth_enabled 

1274 

1275 if server_update.oauth_enabled is not None: 

1276 server.oauth_enabled = server_update.oauth_enabled 

1277 # If OAuth is being disabled, clear the config 

1278 if oauth_being_disabled: 

1279 server.oauth_config = None 

1280 

1281 # Only update oauth_config if OAuth is not being explicitly disabled 

1282 # This prevents the case where oauth_enabled=False and oauth_config are both provided 

1283 if not oauth_being_disabled: 

1284 if hasattr(server_update, "model_fields_set") and "oauth_config" in server_update.model_fields_set: 

1285 server.oauth_config = server_update.oauth_config 

1286 elif server_update.oauth_config is not None: 

1287 server.oauth_config = server_update.oauth_config 

1288 

1289 # Update metadata fields 

1290 server.updated_at = datetime.now(timezone.utc) 

1291 if modified_by: 

1292 server.modified_by = modified_by 

1293 if modified_from_ip: 

1294 server.modified_from_ip = modified_from_ip 

1295 if modified_via: 

1296 server.modified_via = modified_via 

1297 if modified_user_agent: 

1298 server.modified_user_agent = modified_user_agent 

1299 if hasattr(server, "version") and server.version is not None: 

1300 server.version = server.version + 1 

1301 else: 

1302 server.version = 1 

1303 

1304 db.commit() 

1305 db.refresh(server) 

1306 # Force loading relationships 

1307 _ = server.tools, server.resources, server.prompts 

1308 

1309 # Invalidate cache after successful update 

1310 cache = _get_registry_cache() 

1311 await cache.invalidate_servers() 

1312 # Also invalidate tags cache since server tags may have changed 

1313 # First-Party 

1314 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

1315 

1316 await admin_stats_cache.invalidate_tags() 

1317 

1318 await self._notify_server_updated(server) 

1319 logger.info(f"Updated server: {server.name}") 

1320 

1321 # Structured logging: Audit trail for server update 

1322 changes = [] 

1323 if server_update.name: 

1324 changes.append(f"name: {server_update.name}") 

1325 if server_update.visibility: 

1326 changes.append(f"visibility: {server_update.visibility}") 

1327 if server_update.team_id: 

1328 changes.append(f"team_id: {server_update.team_id}") 

1329 

1330 self._audit_trail.log_action( 

1331 user_id=user_email or "system", 

1332 action="update_server", 

1333 resource_type="server", 

1334 resource_id=server.id, 

1335 details={ 

1336 "server_name": server.name, 

1337 "changes": ", ".join(changes) if changes else "metadata only", 

1338 "version": server.version, 

1339 }, 

1340 metadata={ 

1341 "modified_from_ip": modified_from_ip, 

1342 "modified_via": modified_via, 

1343 "modified_user_agent": modified_user_agent, 

1344 }, 

1345 ) 

1346 

1347 # Structured logging: Log successful server update 

1348 self._structured_logger.log( 

1349 level="INFO", 

1350 message="Server updated successfully", 

1351 event_type="server_updated", 

1352 component="server_service", 

1353 server_id=server.id, 

1354 server_name=server.name, 

1355 modified_by=user_email, 

1356 user_email=user_email, 

1357 ) 

1358 

1359 # Build a dictionary with associated IDs 

1360 # Team name is loaded via server.team property from email_team relationship 

1361 server_data = { 

1362 "id": server.id, 

1363 "name": server.name, 

1364 "description": server.description, 

1365 "icon": server.icon, 

1366 "team": server.team, 

1367 "created_at": server.created_at, 

1368 "updated_at": server.updated_at, 

1369 "enabled": server.enabled, 

1370 "associated_tools": [tool.id for tool in server.tools], 

1371 "associated_resources": [res.id for res in server.resources], 

1372 "associated_prompts": [prompt.id for prompt in server.prompts], 

1373 } 

1374 logger.debug(f"Server Data: {server_data}") 

1375 return self.convert_server_to_read(server) 

1376 except IntegrityError as ie: 

1377 db.rollback() 

1378 logger.error(f"IntegrityErrors in group: {ie}") 

1379 

1380 # Structured logging: Log database integrity error 

1381 self._structured_logger.log( 

1382 level="ERROR", 

1383 message="Server update failed due to database integrity error", 

1384 event_type="server_update_failed", 

1385 component="server_service", 

1386 server_id=server_id, 

1387 error_type="IntegrityError", 

1388 error_message=str(ie), 

1389 modified_by=user_email, 

1390 user_email=user_email, 

1391 ) 

1392 raise ie 

1393 except ServerNameConflictError as snce: 

1394 db.rollback() 

1395 logger.error(f"Server name conflict: {snce}") 

1396 

1397 # Structured logging: Log name conflict error 

1398 self._structured_logger.log( 

1399 level="WARNING", 

1400 message="Server update failed due to name conflict", 

1401 event_type="server_name_conflict", 

1402 component="server_service", 

1403 server_id=server_id, 

1404 modified_by=user_email, 

1405 user_email=user_email, 

1406 ) 

1407 raise snce 

1408 except Exception as e: 

1409 db.rollback() 

1410 

1411 # Structured logging: Log generic server update failure 

1412 self._structured_logger.log( 

1413 level="ERROR", 

1414 message="Server update failed", 

1415 event_type="server_update_failed", 

1416 component="server_service", 

1417 server_id=server_id, 

1418 error_type=type(e).__name__, 

1419 error_message=str(e), 

1420 modified_by=user_email, 

1421 user_email=user_email, 

1422 ) 

1423 raise ServerError(f"Failed to update server: {str(e)}") 

1424 

1425 async def set_server_state(self, db: Session, server_id: str, activate: bool, user_email: Optional[str] = None) -> ServerRead: 

1426 """Set the activation status of a server. 

1427 

1428 Args: 

1429 db: Database session. 

1430 server_id: The unique identifier of the server. 

1431 activate: True to activate, False to deactivate. 

1432 user_email: Optional[str] The email of the user to check if the user has permission to modify. 

1433 

1434 Returns: 

1435 The updated ServerRead object. 

1436 

1437 Raises: 

1438 ServerNotFoundError: If the server is not found. 

1439 ServerLockConflictError: If the server row is locked by another transaction. 

1440 ServerError: For other errors. 

1441 PermissionError: If user doesn't own the agent. 

1442 

1443 Examples: 

1444 >>> from mcpgateway.services.server_service import ServerService 

1445 >>> from unittest.mock import MagicMock, AsyncMock, patch 

1446 >>> from mcpgateway.schemas import ServerRead 

1447 >>> service = ServerService() 

1448 >>> db = MagicMock() 

1449 >>> server = MagicMock() 

1450 >>> db.get.return_value = server 

1451 >>> db.commit = MagicMock() 

1452 >>> db.refresh = MagicMock() 

1453 >>> service._notify_server_activated = AsyncMock() 

1454 >>> service._notify_server_deactivated = AsyncMock() 

1455 >>> service.convert_server_to_read = MagicMock(return_value='server_read') 

1456 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes 

1457 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes 

1458 >>> ServerRead.model_validate = MagicMock(return_value='server_read') 

1459 >>> import asyncio 

1460 >>> asyncio.run(service.set_server_state(db, 'server_id', True)) 

1461 'server_read' 

1462 """ 

1463 try: 

1464 # Use nowait=True to fail fast if row is locked, preventing lock contention under high load 

1465 try: 

1466 server = get_for_update( 

1467 db, 

1468 DbServer, 

1469 server_id, 

1470 nowait=True, 

1471 options=[ 

1472 selectinload(DbServer.tools), 

1473 selectinload(DbServer.resources), 

1474 selectinload(DbServer.prompts), 

1475 selectinload(DbServer.a2a_agents), 

1476 selectinload(DbServer.email_team), 

1477 ], 

1478 ) 

1479 except OperationalError as lock_err: 

1480 # Row is locked by another transaction - fail fast with 409 

1481 db.rollback() 

1482 raise ServerLockConflictError(f"Server {server_id} is currently being modified by another request") from lock_err 

1483 if not server: 

1484 raise ServerNotFoundError(f"Server not found: {server_id}") 

1485 

1486 if user_email: 

1487 # First-Party 

1488 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

1489 

1490 permission_service = PermissionService(db) 

1491 if not await permission_service.check_resource_ownership(user_email, server): 

1492 raise PermissionError("Only the owner can activate the Server" if activate else "Only the owner can deactivate the Server") 

1493 

1494 if server.enabled != activate: 

1495 server.enabled = activate 

1496 server.updated_at = datetime.now(timezone.utc) 

1497 db.commit() 

1498 db.refresh(server) 

1499 

1500 # Invalidate cache after status change 

1501 cache = _get_registry_cache() 

1502 await cache.invalidate_servers() 

1503 

1504 if activate: 

1505 await self._notify_server_activated(server) 

1506 else: 

1507 await self._notify_server_deactivated(server) 

1508 logger.info(f"Server {server.name} {'activated' if activate else 'deactivated'}") 

1509 

1510 # Structured logging: Audit trail for server state change 

1511 self._audit_trail.log_action( 

1512 user_id=user_email or "system", 

1513 action="activate_server" if activate else "deactivate_server", 

1514 resource_type="server", 

1515 resource_id=server.id, 

1516 details={ 

1517 "server_name": server.name, 

1518 "new_status": "active" if activate else "inactive", 

1519 }, 

1520 ) 

1521 

1522 # Structured logging: Log server status change 

1523 self._structured_logger.log( 

1524 level="INFO", 

1525 message=f"Server {'activated' if activate else 'deactivated'}", 

1526 event_type="server_status_changed", 

1527 component="server_service", 

1528 server_id=server.id, 

1529 server_name=server.name, 

1530 new_status="active" if activate else "inactive", 

1531 changed_by=user_email, 

1532 user_email=user_email, 

1533 ) 

1534 

1535 # Team name is loaded via server.team property from email_team relationship 

1536 server_data = { 

1537 "id": server.id, 

1538 "name": server.name, 

1539 "description": server.description, 

1540 "icon": server.icon, 

1541 "team": server.team, 

1542 "created_at": server.created_at, 

1543 "updated_at": server.updated_at, 

1544 "enabled": server.enabled, 

1545 "associated_tools": [tool.id for tool in server.tools], 

1546 "associated_resources": [res.id for res in server.resources], 

1547 "associated_prompts": [prompt.id for prompt in server.prompts], 

1548 } 

1549 logger.info(f"Server Data: {server_data}") 

1550 return self.convert_server_to_read(server) 

1551 except PermissionError as e: 

1552 # Structured logging: Log permission error 

1553 self._structured_logger.log( 

1554 level="WARNING", 

1555 message="Server state change failed due to insufficient permissions", 

1556 event_type="server_state_change_permission_denied", 

1557 component="server_service", 

1558 server_id=server_id, 

1559 user_email=user_email, 

1560 ) 

1561 raise e 

1562 except ServerLockConflictError: 

1563 # Re-raise lock conflicts without wrapping - allows 409 response 

1564 raise 

1565 except ServerNotFoundError: 

1566 # Re-raise not found without wrapping - allows 404 response 

1567 raise 

1568 except Exception as e: 

1569 db.rollback() 

1570 

1571 # Structured logging: Log generic server state change failure 

1572 self._structured_logger.log( 

1573 level="ERROR", 

1574 message="Server state change failed", 

1575 event_type="server_state_change_failed", 

1576 component="server_service", 

1577 server_id=server_id, 

1578 error_type=type(e).__name__, 

1579 error_message=str(e), 

1580 user_email=user_email, 

1581 ) 

1582 raise ServerError(f"Failed to set server state: {str(e)}") 

1583 

1584 async def delete_server(self, db: Session, server_id: str, user_email: Optional[str] = None, purge_metrics: bool = False) -> None: 

1585 """Permanently delete a server. 

1586 

1587 Args: 

1588 db: Database session. 

1589 server_id: The unique identifier of the server. 

1590 user_email: Email of user performing deletion (for ownership check). 

1591 purge_metrics: If True, delete raw + rollup metrics for this server. 

1592 

1593 Raises: 

1594 ServerNotFoundError: If the server is not found. 

1595 PermissionError: If user doesn't own the server. 

1596 ServerError: For other deletion errors. 

1597 

1598 Examples: 

1599 >>> from mcpgateway.services.server_service import ServerService 

1600 >>> from unittest.mock import MagicMock, AsyncMock, patch 

1601 >>> service = ServerService() 

1602 >>> db = MagicMock() 

1603 >>> server = MagicMock() 

1604 >>> db.get.return_value = server 

1605 >>> db.delete = MagicMock() 

1606 >>> db.commit = MagicMock() 

1607 >>> service._notify_server_deleted = AsyncMock() 

1608 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes 

1609 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes 

1610 >>> import asyncio 

1611 >>> asyncio.run(service.delete_server(db, 'server_id', 'user@example.com')) 

1612 """ 

1613 try: 

1614 server = db.get(DbServer, server_id) 

1615 if not server: 

1616 raise ServerNotFoundError(f"Server not found: {server_id}") 

1617 

1618 # Check ownership if user_email provided 

1619 if user_email: 

1620 # First-Party 

1621 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

1622 

1623 permission_service = PermissionService(db) 

1624 if not await permission_service.check_resource_ownership(user_email, server): 

1625 raise PermissionError("Only the owner can delete this server") 

1626 

1627 server_info = {"id": server.id, "name": server.name} 

1628 if purge_metrics: 

1629 with pause_rollup_during_purge(reason=f"purge_server:{server_id}"): 

1630 delete_metrics_in_batches(db, ServerMetric, ServerMetric.server_id, server_id) 

1631 delete_metrics_in_batches(db, ServerMetricsHourly, ServerMetricsHourly.server_id, server_id) 

1632 db.delete(server) 

1633 db.commit() 

1634 

1635 # Invalidate cache after successful deletion 

1636 cache = _get_registry_cache() 

1637 await cache.invalidate_servers() 

1638 # Also invalidate tags cache since server tags may have changed 

1639 # First-Party 

1640 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

1641 

1642 await admin_stats_cache.invalidate_tags() 

1643 # First-Party 

1644 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

1645 

1646 metrics_cache.invalidate_prefix("top_servers:") 

1647 metrics_cache.invalidate("servers") 

1648 

1649 await self._notify_server_deleted(server_info) 

1650 logger.info(f"Deleted server: {server_info['name']}") 

1651 

1652 # Structured logging: Audit trail for server deletion 

1653 self._audit_trail.log_action( 

1654 user_id=user_email or "system", 

1655 action="delete_server", 

1656 resource_type="server", 

1657 resource_id=server_info["id"], 

1658 details={ 

1659 "server_name": server_info["name"], 

1660 }, 

1661 ) 

1662 

1663 # Structured logging: Log successful server deletion 

1664 self._structured_logger.log( 

1665 level="INFO", 

1666 message="Server deleted successfully", 

1667 event_type="server_deleted", 

1668 component="server_service", 

1669 server_id=server_info["id"], 

1670 server_name=server_info["name"], 

1671 deleted_by=user_email, 

1672 user_email=user_email, 

1673 purge_metrics=purge_metrics, 

1674 ) 

1675 except PermissionError as pe: 

1676 db.rollback() 

1677 

1678 # Structured logging: Log permission error 

1679 self._structured_logger.log( 

1680 level="WARNING", 

1681 message="Server deletion failed due to insufficient permissions", 

1682 event_type="server_deletion_permission_denied", 

1683 component="server_service", 

1684 server_id=server_id, 

1685 user_email=user_email, 

1686 ) 

1687 raise pe 

1688 except Exception as e: 

1689 db.rollback() 

1690 

1691 # Structured logging: Log generic server deletion failure 

1692 self._structured_logger.log( 

1693 level="ERROR", 

1694 message="Server deletion failed", 

1695 event_type="server_deletion_failed", 

1696 component="server_service", 

1697 server_id=server_id, 

1698 error_type=type(e).__name__, 

1699 error_message=str(e), 

1700 user_email=user_email, 

1701 ) 

1702 raise ServerError(f"Failed to delete server: {str(e)}") 

1703 

1704 async def _publish_event(self, event: Dict[str, Any]) -> None: 

1705 """ 

1706 Publish an event to all subscribed queues. 

1707 

1708 Args: 

1709 event: Event to publish 

1710 """ 

1711 for queue in self._event_subscribers: 

1712 await queue.put(event) 

1713 

1714 async def subscribe_events(self) -> AsyncGenerator[Dict[str, Any], None]: 

1715 """Subscribe to server events. 

1716 

1717 Yields: 

1718 Server event messages. 

1719 """ 

1720 queue: asyncio.Queue = asyncio.Queue() 

1721 self._event_subscribers.append(queue) 

1722 try: 

1723 while True: 

1724 event = await queue.get() 

1725 yield event 

1726 finally: 

1727 self._event_subscribers.remove(queue) 

1728 

1729 async def _notify_server_added(self, server: DbServer) -> None: 

1730 """ 

1731 Notify subscribers that a new server has been added. 

1732 

1733 Args: 

1734 server: Server to add 

1735 """ 

1736 associated_tools = [tool.id for tool in server.tools] if server.tools else [] 

1737 associated_resources = [res.id for res in server.resources] if server.resources else [] 

1738 associated_prompts = [prompt.id for prompt in server.prompts] if server.prompts else [] 

1739 event = { 

1740 "type": "server_added", 

1741 "data": { 

1742 "id": server.id, 

1743 "name": server.name, 

1744 "description": server.description, 

1745 "icon": server.icon, 

1746 "associated_tools": associated_tools, 

1747 "associated_resources": associated_resources, 

1748 "associated_prompts": associated_prompts, 

1749 "enabled": server.enabled, 

1750 }, 

1751 "timestamp": datetime.now(timezone.utc).isoformat(), 

1752 } 

1753 await self._publish_event(event) 

1754 

1755 async def _notify_server_updated(self, server: DbServer) -> None: 

1756 """ 

1757 Notify subscribers that a server has been updated. 

1758 

1759 Args: 

1760 server: Server to update 

1761 """ 

1762 associated_tools = [tool.id for tool in server.tools] if server.tools else [] 

1763 associated_resources = [res.id for res in server.resources] if server.resources else [] 

1764 associated_prompts = [prompt.id for prompt in server.prompts] if server.prompts else [] 

1765 event = { 

1766 "type": "server_updated", 

1767 "data": { 

1768 "id": server.id, 

1769 "name": server.name, 

1770 "description": server.description, 

1771 "icon": server.icon, 

1772 "associated_tools": associated_tools, 

1773 "associated_resources": associated_resources, 

1774 "associated_prompts": associated_prompts, 

1775 "enabled": server.enabled, 

1776 }, 

1777 "timestamp": datetime.now(timezone.utc).isoformat(), 

1778 } 

1779 await self._publish_event(event) 

1780 

1781 async def _notify_server_activated(self, server: DbServer) -> None: 

1782 """ 

1783 Notify subscribers that a server has been activated. 

1784 

1785 Args: 

1786 server: Server to activate 

1787 """ 

1788 event = { 

1789 "type": "server_activated", 

1790 "data": { 

1791 "id": server.id, 

1792 "name": server.name, 

1793 "enabled": True, 

1794 }, 

1795 "timestamp": datetime.now(timezone.utc).isoformat(), 

1796 } 

1797 await self._publish_event(event) 

1798 

1799 async def _notify_server_deactivated(self, server: DbServer) -> None: 

1800 """ 

1801 Notify subscribers that a server has been deactivated. 

1802 

1803 Args: 

1804 server: Server to deactivate 

1805 """ 

1806 event = { 

1807 "type": "server_deactivated", 

1808 "data": { 

1809 "id": server.id, 

1810 "name": server.name, 

1811 "enabled": False, 

1812 }, 

1813 "timestamp": datetime.now(timezone.utc).isoformat(), 

1814 } 

1815 await self._publish_event(event) 

1816 

1817 async def _notify_server_deleted(self, server_info: Dict[str, Any]) -> None: 

1818 """ 

1819 Notify subscribers that a server has been deleted. 

1820 

1821 Args: 

1822 server_info: Dictionary on server to be deleted 

1823 """ 

1824 event = { 

1825 "type": "server_deleted", 

1826 "data": server_info, 

1827 "timestamp": datetime.now(timezone.utc).isoformat(), 

1828 } 

1829 await self._publish_event(event) 

1830 

1831 # --- Metrics --- 

1832 async def aggregate_metrics(self, db: Session) -> ServerMetrics: 

1833 """ 

1834 Aggregate metrics for all server invocations across all servers. 

1835 

1836 Combines recent raw metrics (within retention period) with historical 

1837 hourly rollups for complete historical coverage. Uses in-memory caching 

1838 (10s TTL) to reduce database load under high request rates. 

1839 

1840 Args: 

1841 db: Database session 

1842 

1843 Returns: 

1844 ServerMetrics: Aggregated metrics from raw + hourly rollup tables. 

1845 

1846 Examples: 

1847 >>> from mcpgateway.services.server_service import ServerService 

1848 >>> service = ServerService() 

1849 >>> # Method exists and is callable 

1850 >>> callable(service.aggregate_metrics) 

1851 True 

1852 """ 

1853 # Check cache first (if enabled) 

1854 # First-Party 

1855 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel 

1856 

1857 if is_cache_enabled(): 

1858 cached = metrics_cache.get("servers") 

1859 if cached is not None: 

1860 return ServerMetrics(**cached) 

1861 

1862 # Use combined raw + rollup query for full historical coverage 

1863 # First-Party 

1864 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel 

1865 

1866 result = aggregate_metrics_combined(db, "server") 

1867 

1868 metrics = ServerMetrics( 

1869 total_executions=result.total_executions, 

1870 successful_executions=result.successful_executions, 

1871 failed_executions=result.failed_executions, 

1872 failure_rate=result.failure_rate, 

1873 min_response_time=result.min_response_time, 

1874 max_response_time=result.max_response_time, 

1875 avg_response_time=result.avg_response_time, 

1876 last_execution_time=result.last_execution_time, 

1877 ) 

1878 

1879 # Cache the result as dict for serialization compatibility (if enabled) 

1880 if is_cache_enabled(): 

1881 metrics_cache.set("servers", metrics.model_dump()) 

1882 

1883 return metrics 

1884 

1885 async def reset_metrics(self, db: Session) -> None: 

1886 """ 

1887 Reset all server metrics by deleting raw and hourly rollup records. 

1888 

1889 Args: 

1890 db: Database session 

1891 

1892 Examples: 

1893 >>> from mcpgateway.services.server_service import ServerService 

1894 >>> from unittest.mock import MagicMock 

1895 >>> service = ServerService() 

1896 >>> db = MagicMock() 

1897 >>> db.execute = MagicMock() 

1898 >>> db.commit = MagicMock() 

1899 >>> import asyncio 

1900 >>> asyncio.run(service.reset_metrics(db)) 

1901 """ 

1902 db.execute(delete(ServerMetric)) 

1903 db.execute(delete(ServerMetricsHourly)) 

1904 db.commit() 

1905 

1906 # Invalidate metrics cache 

1907 # First-Party 

1908 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

1909 

1910 metrics_cache.invalidate("servers") 

1911 metrics_cache.invalidate_prefix("top_servers:") 

1912 

1913 def get_oauth_protected_resource_metadata(self, db: Session, server_id: str, resource_base_url: str) -> Dict[str, Any]: 

1914 """ 

1915 Get RFC 9728 OAuth 2.0 Protected Resource Metadata for a server. 

1916 

1917 This method retrieves the OAuth configuration for a server and formats it 

1918 according to RFC 9728 Protected Resource Metadata specification, enabling 

1919 MCP clients to discover OAuth authorization servers for browser-based SSO. 

1920 

1921 Args: 

1922 db: Database session. 

1923 server_id: The ID of the server. 

1924 resource_base_url: The base URL for the resource (e.g., "https://gateway.example.com/servers/abc123"). 

1925 

1926 Returns: 

1927 Dict containing RFC 9728 Protected Resource Metadata: 

1928 - resource: The protected resource identifier (URL) 

1929 - authorization_servers: List of authorization server issuer URIs 

1930 - bearer_methods_supported: Supported bearer token methods 

1931 - scopes_supported: Optional list of supported scopes 

1932 

1933 Raises: 

1934 ServerNotFoundError: If server doesn't exist, is disabled, or is non-public. 

1935 ServerError: If OAuth is not enabled or not properly configured. 

1936 

1937 Examples: 

1938 >>> from mcpgateway.services.server_service import ServerService 

1939 >>> service = ServerService() 

1940 >>> # Method exists and is callable 

1941 >>> callable(service.get_oauth_protected_resource_metadata) 

1942 True 

1943 """ 

1944 server = db.get(DbServer, server_id) 

1945 

1946 # Return not found for non-existent, disabled, or non-public servers 

1947 # (avoids leaking information about private/team servers) 

1948 if not server: 

1949 raise ServerNotFoundError(f"Server not found: {server_id}") 

1950 

1951 if not server.enabled: 

1952 raise ServerNotFoundError(f"Server not found: {server_id}") 

1953 

1954 if getattr(server, "visibility", "public") != "public": 

1955 raise ServerNotFoundError(f"Server not found: {server_id}") 

1956 

1957 # Check OAuth configuration 

1958 if not getattr(server, "oauth_enabled", False): 

1959 raise ServerError(f"OAuth not enabled for server: {server_id}") 

1960 

1961 oauth_config = getattr(server, "oauth_config", None) 

1962 if not oauth_config: 

1963 raise ServerError(f"OAuth not configured for server: {server_id}") 

1964 

1965 # Extract authorization server(s) - support both list and single value 

1966 authorization_servers = oauth_config.get("authorization_servers", []) 

1967 if not authorization_servers: 

1968 auth_server = oauth_config.get("authorization_server") 

1969 if auth_server: 

1970 authorization_servers = [auth_server] 

1971 

1972 if not authorization_servers: 

1973 raise ServerError(f"OAuth authorization_server not configured for server: {server_id}") 

1974 

1975 # Build RFC 9728 Protected Resource Metadata response 

1976 response_data: Dict[str, Any] = { 

1977 "resource": resource_base_url, 

1978 "authorization_servers": authorization_servers, 

1979 "bearer_methods_supported": ["header"], 

1980 } 

1981 

1982 # Add optional scopes if configured (never include secrets from oauth_config) 

1983 scopes = oauth_config.get("scopes_supported") or oauth_config.get("scopes") 

1984 if scopes: 

1985 response_data["scopes_supported"] = scopes 

1986 

1987 logger.debug(f"Returning OAuth protected resource metadata for server {server_id}") 

1988 return response_data 

1989 

1990 

1991# Lazy singleton - created on first access, not at module import time. 

1992# This avoids instantiation when only exception classes are imported. 

1993_server_service_instance = None # pylint: disable=invalid-name 

1994 

1995 

1996def __getattr__(name: str): 

1997 """Module-level __getattr__ for lazy singleton creation. 

1998 

1999 Args: 

2000 name: The attribute name being accessed. 

2001 

2002 Returns: 

2003 The server_service singleton instance if name is "server_service". 

2004 

2005 Raises: 

2006 AttributeError: If the attribute name is not "server_service". 

2007 """ 

2008 global _server_service_instance # pylint: disable=global-statement 

2009 if name == "server_service": 

2010 if _server_service_instance is None: 

2011 _server_service_instance = ServerService() 

2012 return _server_service_instance 

2013 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")