Coverage for mcpgateway / services / server_service.py: 100%

591 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/server_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7ContextForge Server Service 

8 

9This module implements server management for the MCP Servers Catalog. 

10It handles server registration, listing, retrieval, updates, activation toggling, and deletion. 

11It also publishes event notifications for server changes. 

12""" 

13 

14# Standard 

15import asyncio 

16import binascii 

17from datetime import datetime, timezone 

18from typing import Any, AsyncGenerator, Dict, List, Optional, Union 

19 

20# Third-Party 

21import httpx 

22from pydantic import ValidationError 

23from sqlalchemy import and_, delete, desc, or_, select 

24from sqlalchemy.exc import IntegrityError, OperationalError 

25from sqlalchemy.orm import joinedload, selectinload, Session 

26 

27# First-Party 

28from mcpgateway.config import settings 

29from mcpgateway.db import A2AAgent as DbA2AAgent 

30from mcpgateway.db import EmailTeam as DbEmailTeam 

31from mcpgateway.db import EmailTeamMember as DbEmailTeamMember 

32from mcpgateway.db import get_for_update 

33from mcpgateway.db import Prompt as DbPrompt 

34from mcpgateway.db import Resource as DbResource 

35from mcpgateway.db import Server as DbServer 

36from mcpgateway.db import ServerMetric, ServerMetricsHourly 

37from mcpgateway.db import Tool as DbTool 

38from mcpgateway.schemas import ServerCreate, ServerMetrics, ServerRead, ServerUpdate, TopPerformer 

39from mcpgateway.services.audit_trail_service import get_audit_trail_service 

40from mcpgateway.services.base_service import BaseService 

41from mcpgateway.services.encryption_service import protect_oauth_config_for_storage 

42from mcpgateway.services.logging_service import LoggingService 

43from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge 

44from mcpgateway.services.performance_tracker import get_performance_tracker 

45from mcpgateway.services.structured_logger import get_structured_logger 

46from mcpgateway.services.team_management_service import TeamManagementService 

47from mcpgateway.utils.metrics_common import build_top_performers 

48from mcpgateway.utils.pagination import unified_paginate 

49from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr 

50 

51# Cache import (lazy to avoid circular dependencies) 

52_REGISTRY_CACHE = None 

53 

54 

55def _get_registry_cache(): 

56 """Get registry cache singleton lazily. 

57 

58 Returns: 

59 RegistryCache instance. 

60 """ 

61 global _REGISTRY_CACHE # pylint: disable=global-statement 

62 if _REGISTRY_CACHE is None: 

63 # First-Party 

64 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel 

65 

66 _REGISTRY_CACHE = registry_cache 

67 return _REGISTRY_CACHE 

68 

69 

70def _validate_server_team_assignment(db: Session, user_email: Optional[str], target_team_id: Optional[str]) -> None: 

71 """Validate team assignment and ownership requirements for server updates. 

72 

73 Args: 

74 db: Database session used for membership checks. 

75 user_email: Requesting user email. When omitted, ownership checks are skipped 

76 for system/internal update paths. 

77 target_team_id: Team identifier to validate. 

78 

79 Raises: 

80 ValueError: If team ID is missing, team does not exist, or caller is not 

81 an active team owner. 

82 """ 

83 if not target_team_id: 

84 raise ValueError("Cannot set visibility to 'team' without a team_id") 

85 

86 team = db.query(DbEmailTeam).filter(DbEmailTeam.id == target_team_id).first() 

87 if not team: 

88 raise ValueError(f"Team {target_team_id} not found") 

89 

90 # Preserve existing behavior for system/internal updates where 

91 # user context may be intentionally omitted. 

92 if not user_email: 

93 return 

94 

95 membership = ( 

96 db.query(DbEmailTeamMember) 

97 .filter(DbEmailTeamMember.team_id == target_team_id, DbEmailTeamMember.user_email == user_email, DbEmailTeamMember.is_active, DbEmailTeamMember.role == "owner") 

98 .first() 

99 ) 

100 if not membership: 

101 raise ValueError("User membership in team not sufficient for this update.") 

102 

103 

104# Initialize logging service first 

105logging_service = LoggingService() 

106logger = logging_service.get_logger(__name__) 

107 

108 

109class ServerError(Exception): 

110 """Base class for server-related errors.""" 

111 

112 

113class ServerNotFoundError(ServerError): 

114 """Raised when a requested server is not found.""" 

115 

116 

117class ServerLockConflictError(ServerError): 

118 """Raised when a server row is locked by another transaction.""" 

119 

120 

121class ServerNameConflictError(ServerError): 

122 """Raised when a server name conflicts with an existing one.""" 

123 

124 def __init__(self, name: str, enabled: bool = True, server_id: Optional[str] = None, visibility: str = "public") -> None: 

125 """ 

126 Initialize a ServerNameConflictError exception. 

127 

128 This exception indicates a server name conflict, with additional context about visibility, 

129 whether the conflicting server is active, and its ID if known. The error message starts 

130 with the visibility information. 

131 

132 Visibility rules: 

133 - public: Restricts server names globally (across all teams). 

134 - team: Restricts server names only within the same team. 

135 

136 Args: 

137 name: The server name that caused the conflict. 

138 enabled: Whether the conflicting server is currently active. Defaults to True. 

139 server_id: The ID of the conflicting server, if known. Only included in message for inactive servers. 

140 visibility: The visibility of the conflicting server (e.g., "public", "private", "team"). 

141 

142 Examples: 

143 >>> error = ServerNameConflictError("My Server") 

144 >>> str(error) 

145 'Public Server already exists with name: My Server' 

146 >>> error = ServerNameConflictError("My Server", enabled=False, server_id=123) 

147 >>> str(error) 

148 'Public Server already exists with name: My Server (currently inactive, ID: 123)' 

149 >>> error.enabled 

150 False 

151 >>> error.server_id 

152 123 

153 >>> error = ServerNameConflictError("My Server", enabled=False, visibility="team") 

154 >>> str(error) 

155 'Team Server already exists with name: My Server (currently inactive, ID: None)' 

156 >>> error.enabled 

157 False 

158 >>> error.server_id is None 

159 True 

160 """ 

161 self.name = name 

162 self.enabled = enabled 

163 self.server_id = server_id 

164 message = f"{visibility.capitalize()} Server already exists with name: {name}" 

165 if not enabled: 

166 message += f" (currently inactive, ID: {server_id})" 

167 super().__init__(message) 

168 

169 

170class ServerService(BaseService): 

171 """Service for managing MCP Servers in the catalog. 

172 

173 Provides methods to create, list, retrieve, update, set state, and delete server records. 

174 Also supports event notifications for changes in server data. 

175 """ 

176 

177 _visibility_model_cls = DbServer 

178 

179 def __init__(self) -> None: 

180 """Initialize a new ServerService instance. 

181 

182 Sets up the service with: 

183 - An empty list for event subscribers that will receive server change notifications 

184 - An HTTP client configured with timeout and SSL verification settings from config 

185 

186 The HTTP client is used for health checks and other server-related HTTP operations. 

187 Event subscribers can register to receive notifications about server additions, 

188 updates, activations, deactivations, and deletions. 

189 

190 Examples: 

191 >>> from mcpgateway.services.server_service import ServerService 

192 >>> service = ServerService() 

193 >>> isinstance(service._event_subscribers, list) 

194 True 

195 >>> len(service._event_subscribers) 

196 0 

197 >>> hasattr(service, '_http_client') 

198 True 

199 """ 

200 self._event_subscribers: List[asyncio.Queue] = [] 

201 self._http_client = httpx.AsyncClient( 

202 timeout=settings.federation_timeout, 

203 verify=not settings.skip_ssl_verify, 

204 limits=httpx.Limits( 

205 max_connections=settings.httpx_max_connections, 

206 max_keepalive_connections=settings.httpx_max_keepalive_connections, 

207 keepalive_expiry=settings.httpx_keepalive_expiry, 

208 ), 

209 ) 

210 self._structured_logger = get_structured_logger("server_service") 

211 self._audit_trail = get_audit_trail_service() 

212 self._performance_tracker = get_performance_tracker() 

213 

214 async def initialize(self) -> None: 

215 """Initialize the server service.""" 

216 logger.info("Initializing server service") 

217 

218 async def shutdown(self) -> None: 

219 """Shutdown the server service.""" 

220 await self._http_client.aclose() 

221 logger.info("Server service shutdown complete") 

222 

223 # get_top_server 

224 async def get_top_servers(self, db: Session, limit: Optional[int] = 5, include_deleted: bool = False) -> List[TopPerformer]: 

225 """Retrieve the top-performing servers based on execution count. 

226 

227 Queries the database to get servers with their metrics, ordered by the number of executions 

228 in descending order. Combines recent raw metrics with historical hourly rollups for complete 

229 historical coverage. Returns a list of TopPerformer objects containing server details and 

230 performance metrics. Results are cached for performance. 

231 

232 Args: 

233 db (Session): Database session for querying server metrics. 

234 limit (Optional[int]): Maximum number of servers to return. Defaults to 5. 

235 include_deleted (bool): Whether to include deleted servers from rollups. 

236 

237 Returns: 

238 List[TopPerformer]: A list of TopPerformer objects, each containing: 

239 - id: Server ID. 

240 - name: Server name. 

241 - execution_count: Total number of executions. 

242 - avg_response_time: Average response time in seconds, or None if no metrics. 

243 - success_rate: Success rate percentage, or None if no metrics. 

244 - last_execution: Timestamp of the last execution, or None if no metrics. 

245 """ 

246 # Check cache first (if enabled) 

247 # First-Party 

248 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel 

249 

250 effective_limit = limit or 5 

251 cache_key = f"top_servers:{effective_limit}:include_deleted={include_deleted}" 

252 

253 if is_cache_enabled(): 

254 cached = metrics_cache.get(cache_key) 

255 if cached is not None: 

256 return cached 

257 

258 # Use combined query that includes both raw metrics and rollup data 

259 # First-Party 

260 from mcpgateway.services.metrics_query_service import get_top_performers_combined # pylint: disable=import-outside-toplevel 

261 

262 results = get_top_performers_combined( 

263 db=db, 

264 metric_type="server", 

265 entity_model=DbServer, 

266 limit=effective_limit, 

267 include_deleted=include_deleted, 

268 ) 

269 top_performers = build_top_performers(results) 

270 

271 # Cache the result (if enabled) 

272 if is_cache_enabled(): 

273 metrics_cache.set(cache_key, top_performers) 

274 

275 return top_performers 

276 

277 def convert_server_to_read(self, server: DbServer, include_metrics: bool = False) -> ServerRead: 

278 """ 

279 Converts a DbServer instance into a ServerRead model, optionally including aggregated metrics. 

280 

281 Args: 

282 server (DbServer): The ORM instance of the server. 

283 include_metrics (bool): Whether to include metrics in the result. Defaults to False. 

284 Set to False for list operations to avoid N+1 query issues. 

285 

286 Returns: 

287 ServerRead: The Pydantic model representing the server, optionally including aggregated metrics. 

288 

289 Examples: 

290 >>> from types import SimpleNamespace 

291 >>> from datetime import datetime, timezone 

292 >>> svc = ServerService() 

293 >>> now = datetime.now(timezone.utc) 

294 >>> # Fake metric objects 

295 >>> m1 = SimpleNamespace(is_success=True, response_time=0.2, timestamp=now) 

296 >>> m2 = SimpleNamespace(is_success=False, response_time=0.4, timestamp=now) 

297 >>> server = SimpleNamespace( 

298 ... id='s1', name='S', description=None, icon=None, 

299 ... created_at=now, updated_at=now, enabled=True, 

300 ... associated_tools=[], associated_resources=[], associated_prompts=[], associated_a2a_agents=[], 

301 ... tags=[], metrics=[m1, m2], 

302 ... tools=[], resources=[], prompts=[], a2a_agents=[], 

303 ... team_id=None, owner_email=None, visibility=None, 

304 ... created_by=None, modified_by=None 

305 ... ) 

306 >>> result = svc.convert_server_to_read(server, include_metrics=True) 

307 >>> result.metrics.total_executions 

308 2 

309 >>> result.metrics.successful_executions 

310 1 

311 """ 

312 # Build dict explicitly from attributes to ensure SQLAlchemy populates them 

313 # (using __dict__.copy() can return empty dict with certain query patterns) 

314 server_dict = { 

315 "id": server.id, 

316 "name": server.name, 

317 "description": server.description, 

318 "icon": server.icon, 

319 "enabled": server.enabled, 

320 "created_at": server.created_at, 

321 "updated_at": server.updated_at, 

322 "team_id": server.team_id, 

323 "owner_email": server.owner_email, 

324 "visibility": server.visibility, 

325 "created_by": server.created_by, 

326 "created_from_ip": getattr(server, "created_from_ip", None), 

327 "created_via": getattr(server, "created_via", None), 

328 "created_user_agent": getattr(server, "created_user_agent", None), 

329 "modified_by": server.modified_by, 

330 "modified_from_ip": getattr(server, "modified_from_ip", None), 

331 "modified_via": getattr(server, "modified_via", None), 

332 "modified_user_agent": getattr(server, "modified_user_agent", None), 

333 "import_batch_id": getattr(server, "import_batch_id", None), 

334 "federation_source": getattr(server, "federation_source", None), 

335 "version": getattr(server, "version", None), 

336 "tags": server.tags or [], 

337 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata 

338 "oauth_enabled": getattr(server, "oauth_enabled", False), 

339 "oauth_config": getattr(server, "oauth_config", None), 

340 } 

341 

342 # Compute aggregated metrics only if requested (avoids N+1 queries in list operations) 

343 if include_metrics: 

344 total = 0 

345 successful = 0 

346 failed = 0 

347 min_rt = None 

348 max_rt = None 

349 sum_rt = 0.0 

350 last_time = None 

351 

352 if hasattr(server, "metrics") and server.metrics: 

353 for m in server.metrics: 

354 total += 1 

355 if m.is_success: 

356 successful += 1 

357 else: 

358 failed += 1 

359 

360 # Track min/max response times 

361 if min_rt is None or m.response_time < min_rt: 

362 min_rt = m.response_time 

363 if max_rt is None or m.response_time > max_rt: 

364 max_rt = m.response_time 

365 

366 sum_rt += m.response_time 

367 

368 # Track last execution time 

369 if last_time is None or m.timestamp > last_time: 

370 last_time = m.timestamp 

371 

372 failure_rate = (failed / total) if total > 0 else 0.0 

373 avg_rt = (sum_rt / total) if total > 0 else None 

374 

375 server_dict["metrics"] = { 

376 "total_executions": total, 

377 "successful_executions": successful, 

378 "failed_executions": failed, 

379 "failure_rate": failure_rate, 

380 "min_response_time": min_rt, 

381 "max_response_time": max_rt, 

382 "avg_response_time": avg_rt, 

383 "last_execution_time": last_time, 

384 } 

385 else: 

386 server_dict["metrics"] = None 

387 # Add associated IDs from relationships 

388 server_dict["associated_tools"] = [tool.name for tool in server.tools] if server.tools else [] 

389 server_dict["associated_tool_ids"] = [str(tool.id) for tool in server.tools] if server.tools else [] 

390 server_dict["associated_resources"] = [res.id for res in server.resources] if server.resources else [] 

391 server_dict["associated_prompts"] = [prompt.id for prompt in server.prompts] if server.prompts else [] 

392 server_dict["associated_a2a_agents"] = [agent.id for agent in server.a2a_agents] if server.a2a_agents else [] 

393 

394 # Team name is loaded via server.team property from email_team relationship 

395 server_dict["team"] = getattr(server, "team", None) 

396 

397 return ServerRead.model_validate(server_dict).masked() 

398 

399 def _assemble_associated_items( 

400 self, 

401 tools: Optional[List[str]], 

402 resources: Optional[List[str]], 

403 prompts: Optional[List[str]], 

404 a2a_agents: Optional[List[str]] = None, 

405 gateways: Optional[List[str]] = None, 

406 ) -> Dict[str, Any]: 

407 """ 

408 Assemble the associated items dictionary from the separate fields. 

409 

410 Args: 

411 tools: List of tool IDs. 

412 resources: List of resource IDs. 

413 prompts: List of prompt IDs. 

414 a2a_agents: List of A2A agent IDs. 

415 gateways: List of gateway IDs. 

416 

417 Returns: 

418 A dictionary with keys "tools", "resources", "prompts", "a2a_agents", and "gateways". 

419 

420 Examples: 

421 >>> service = ServerService() 

422 >>> # Test with all None values 

423 >>> result = service._assemble_associated_items(None, None, None) 

424 >>> result 

425 {'tools': [], 'resources': [], 'prompts': [], 'a2a_agents': [], 'gateways': []} 

426 

427 >>> # Test with empty lists 

428 >>> result = service._assemble_associated_items([], [], []) 

429 >>> result 

430 {'tools': [], 'resources': [], 'prompts': [], 'a2a_agents': [], 'gateways': []} 

431 

432 >>> # Test with actual values 

433 >>> result = service._assemble_associated_items(['tool1', 'tool2'], ['res1'], ['prompt1']) 

434 >>> result 

435 {'tools': ['tool1', 'tool2'], 'resources': ['res1'], 'prompts': ['prompt1'], 'a2a_agents': [], 'gateways': []} 

436 

437 >>> # Test with mixed None and values 

438 >>> result = service._assemble_associated_items(['tool1'], None, ['prompt1']) 

439 >>> result 

440 {'tools': ['tool1'], 'resources': [], 'prompts': ['prompt1'], 'a2a_agents': [], 'gateways': []} 

441 """ 

442 return { 

443 "tools": tools or [], 

444 "resources": resources or [], 

445 "prompts": prompts or [], 

446 "a2a_agents": a2a_agents or [], 

447 "gateways": gateways or [], 

448 } 

449 

450 async def register_server( 

451 self, 

452 db: Session, 

453 server_in: ServerCreate, 

454 created_by: Optional[str] = None, 

455 created_from_ip: Optional[str] = None, 

456 created_via: Optional[str] = None, 

457 created_user_agent: Optional[str] = None, 

458 team_id: Optional[str] = None, 

459 owner_email: Optional[str] = None, 

460 visibility: Optional[str] = "public", 

461 ) -> ServerRead: 

462 """ 

463 Register a new server in the catalog and validate that all associated items exist. 

464 

465 This function performs the following steps: 

466 1. Checks if a server with the same name already exists. 

467 2. Creates a new server record. 

468 3. For each ID provided in associated_tools, associated_resources, and associated_prompts, 

469 verifies that the corresponding item exists. If an item does not exist, an error is raised. 

470 4. Associates the verified items to the new server. 

471 5. Commits the transaction, refreshes the ORM instance, and forces the loading of relationship data. 

472 6. Constructs a response dictionary that includes lists of associated item IDs. 

473 7. Notifies subscribers of the addition and returns the validated response. 

474 

475 Args: 

476 db (Session): The SQLAlchemy database session. 

477 server_in (ServerCreate): The server creation schema containing server details and lists of 

478 associated tool, resource, and prompt IDs (as strings). 

479 created_by (Optional[str]): Email of the user creating the server, used for ownership tracking. 

480 created_from_ip (Optional[str]): IP address from which the creation request originated. 

481 created_via (Optional[str]): Source of creation (api, ui, import). 

482 created_user_agent (Optional[str]): User agent string from the creation request. 

483 team_id (Optional[str]): Team ID to assign the server to. 

484 owner_email (Optional[str]): Email of the user who owns this server. 

485 visibility (str): Server visibility level (private, team, public). 

486 

487 Returns: 

488 ServerRead: The newly created server, with associated item IDs. 

489 

490 Raises: 

491 IntegrityError: If a database integrity error occurs. 

492 ServerNameConflictError: If a server name conflict occurs (public or team visibility). 

493 ServerError: If any associated tool, resource, or prompt does not exist, or if any other registration error occurs. 

494 

495 Examples: 

496 >>> from mcpgateway.services.server_service import ServerService 

497 >>> from unittest.mock import MagicMock, AsyncMock, patch 

498 >>> from mcpgateway.schemas import ServerRead 

499 >>> service = ServerService() 

500 >>> db = MagicMock() 

501 >>> server_in = MagicMock() 

502 >>> server_in.id = None # No custom UUID for this test 

503 >>> db.execute.return_value.scalar_one_or_none.return_value = None 

504 >>> db.add = MagicMock() 

505 >>> db.commit = MagicMock() 

506 >>> db.refresh = MagicMock() 

507 >>> service._notify_server_added = AsyncMock() 

508 >>> service.convert_server_to_read = MagicMock(return_value='server_read') 

509 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes 

510 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes 

511 >>> ServerRead.model_validate = MagicMock(return_value='server_read') 

512 >>> import asyncio 

513 >>> asyncio.run(service.register_server(db, server_in)) 

514 'server_read' 

515 """ 

516 try: 

517 logger.info(f"Registering server: {server_in.name}") 

518 oauth_config = await protect_oauth_config_for_storage(getattr(server_in, "oauth_config", None)) 

519 # # Create the new server record. 

520 db_server = DbServer( 

521 name=server_in.name, 

522 description=server_in.description, 

523 icon=server_in.icon, 

524 enabled=True, 

525 tags=server_in.tags or [], 

526 # Team scoping fields - use schema values if provided, otherwise fallback to parameters 

527 team_id=getattr(server_in, "team_id", None) or team_id, 

528 owner_email=getattr(server_in, "owner_email", None) or owner_email or created_by, 

529 # IMPORTANT: Prefer function parameter over schema default 

530 # The API has visibility as a separate Body param that should override schema default 

531 visibility=visibility or getattr(server_in, "visibility", None) or "public", 

532 # OAuth 2.0 configuration for RFC 9728 Protected Resource Metadata 

533 oauth_enabled=getattr(server_in, "oauth_enabled", False) or False, 

534 oauth_config=oauth_config, 

535 # Metadata fields 

536 created_by=created_by, 

537 created_from_ip=created_from_ip, 

538 created_via=created_via, 

539 created_user_agent=created_user_agent, 

540 version=1, 

541 ) 

542 # Check for existing server with the same name (with row locking to prevent race conditions) 

543 # The unique constraint is on (team_id, owner_email, name), so we check based on that 

544 owner_email_to_check = getattr(server_in, "owner_email", None) or owner_email or created_by 

545 team_id_to_check = getattr(server_in, "team_id", None) or team_id 

546 

547 # Build conditions based on the actual unique constraint: (team_id, owner_email, name) 

548 conditions = [ 

549 DbServer.name == server_in.name, 

550 DbServer.team_id == team_id_to_check if team_id_to_check else DbServer.team_id.is_(None), 

551 DbServer.owner_email == owner_email_to_check if owner_email_to_check else DbServer.owner_email.is_(None), 

552 ] 

553 if server_in.id: 

554 conditions.append(DbServer.id != server_in.id) 

555 

556 existing_server = get_for_update(db, DbServer, where=and_(*conditions)) 

557 if existing_server: 

558 raise ServerNameConflictError(server_in.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility) 

559 # Set custom UUID if provided 

560 if server_in.id: 

561 logger.info(f"Setting custom UUID for server: {server_in.id}") 

562 db_server.id = server_in.id 

563 logger.info(f"Adding server to DB session: {db_server.name}") 

564 db.add(db_server) 

565 

566 # Associate tools, verifying each exists using bulk query when multiple items 

567 if server_in.associated_tools: 

568 tool_ids = [tool_id.strip() for tool_id in server_in.associated_tools if tool_id.strip()] 

569 if len(tool_ids) > 1: 

570 # Use bulk query for multiple items 

571 tools = db.execute(select(DbTool).where(DbTool.id.in_(tool_ids))).scalars().all() 

572 found_tool_ids = {tool.id for tool in tools} 

573 missing_tool_ids = set(tool_ids) - found_tool_ids 

574 if missing_tool_ids: 

575 raise ServerError(f"Tools with ids {missing_tool_ids} do not exist.") 

576 db_server.tools.extend(tools) 

577 elif tool_ids: 

578 # Use single query for single item (maintains test compatibility) 

579 tool_obj = db.get(DbTool, tool_ids[0]) 

580 if not tool_obj: 

581 raise ServerError(f"Tool with id {tool_ids[0]} does not exist.") 

582 db_server.tools.append(tool_obj) 

583 

584 # Associate resources, verifying each exists using bulk query when multiple items 

585 if server_in.associated_resources: 

586 resource_ids = [resource_id.strip() for resource_id in server_in.associated_resources if resource_id.strip()] 

587 if len(resource_ids) > 1: 

588 # Use bulk query for multiple items 

589 resources = db.execute(select(DbResource).where(DbResource.id.in_(resource_ids))).scalars().all() 

590 found_resource_ids = {resource.id for resource in resources} 

591 missing_resource_ids = set(resource_ids) - found_resource_ids 

592 if missing_resource_ids: 

593 raise ServerError(f"Resources with ids {missing_resource_ids} do not exist.") 

594 db_server.resources.extend(resources) 

595 elif resource_ids: 

596 # Use single query for single item (maintains test compatibility) 

597 resource_obj = db.get(DbResource, resource_ids[0]) 

598 if not resource_obj: 

599 raise ServerError(f"Resource with id {resource_ids[0]} does not exist.") 

600 db_server.resources.append(resource_obj) 

601 

602 # Associate prompts, verifying each exists using bulk query when multiple items 

603 if server_in.associated_prompts: 

604 prompt_ids = [prompt_id.strip() for prompt_id in server_in.associated_prompts if prompt_id.strip()] 

605 if len(prompt_ids) > 1: 

606 # Use bulk query for multiple items 

607 prompts = db.execute(select(DbPrompt).where(DbPrompt.id.in_(prompt_ids))).scalars().all() 

608 found_prompt_ids = {prompt.id for prompt in prompts} 

609 missing_prompt_ids = set(prompt_ids) - found_prompt_ids 

610 if missing_prompt_ids: 

611 raise ServerError(f"Prompts with ids {missing_prompt_ids} do not exist.") 

612 db_server.prompts.extend(prompts) 

613 elif prompt_ids: 

614 # Use single query for single item (maintains test compatibility) 

615 prompt_obj = db.get(DbPrompt, prompt_ids[0]) 

616 if not prompt_obj: 

617 raise ServerError(f"Prompt with id {prompt_ids[0]} does not exist.") 

618 db_server.prompts.append(prompt_obj) 

619 

620 # Associate A2A agents, verifying each exists using bulk query when multiple items 

621 if server_in.associated_a2a_agents: 

622 agent_ids = [agent_id.strip() for agent_id in server_in.associated_a2a_agents if agent_id.strip()] 

623 if len(agent_ids) > 1: 

624 # Use bulk query for multiple items 

625 agents = db.execute(select(DbA2AAgent).where(DbA2AAgent.id.in_(agent_ids))).scalars().all() 

626 found_agent_ids = {agent.id for agent in agents} 

627 missing_agent_ids = set(agent_ids) - found_agent_ids 

628 if missing_agent_ids: 

629 raise ServerError(f"A2A Agents with ids {missing_agent_ids} do not exist.") 

630 db_server.a2a_agents.extend(agents) 

631 

632 # Note: Auto-tool creation for A2A agents should be handled 

633 # by a separate service or background task to avoid circular imports 

634 for agent in agents: 

635 logger.info(f"A2A agent {agent.name} associated with server {db_server.name}") 

636 elif agent_ids: 

637 # Use single query for single item (maintains test compatibility) 

638 agent_obj = db.get(DbA2AAgent, agent_ids[0]) 

639 if not agent_obj: 

640 raise ServerError(f"A2A Agent with id {agent_ids[0]} does not exist.") 

641 db_server.a2a_agents.append(agent_obj) 

642 logger.info(f"A2A agent {agent_obj.name} associated with server {db_server.name}") 

643 

644 # Commit the new record and refresh. 

645 db.commit() 

646 db.refresh(db_server) 

647 # Force load the relationship attributes. 

648 _ = db_server.tools, db_server.resources, db_server.prompts, db_server.a2a_agents 

649 

650 # Assemble response data with associated item IDs. 

651 server_data = { 

652 "id": db_server.id, 

653 "name": db_server.name, 

654 "description": db_server.description, 

655 "icon": db_server.icon, 

656 "created_at": db_server.created_at, 

657 "updated_at": db_server.updated_at, 

658 "enabled": db_server.enabled, 

659 "associated_tools": [str(tool.id) for tool in db_server.tools], 

660 "associated_resources": [str(resource.id) for resource in db_server.resources], 

661 "associated_prompts": [str(prompt.id) for prompt in db_server.prompts], 

662 } 

663 logger.debug(f"Server Data: {server_data}") 

664 await self._notify_server_added(db_server) 

665 logger.info(f"Registered server: {server_in.name}") 

666 

667 # Structured logging: Audit trail for server creation 

668 self._audit_trail.log_action( 

669 user_id=created_by or "system", 

670 action="create_server", 

671 resource_type="server", 

672 resource_id=db_server.id, 

673 details={ 

674 "server_name": db_server.name, 

675 "visibility": visibility, 

676 "team_id": team_id, 

677 "associated_tools_count": len(db_server.tools), 

678 "associated_resources_count": len(db_server.resources), 

679 "associated_prompts_count": len(db_server.prompts), 

680 "associated_a2a_agents_count": len(db_server.a2a_agents), 

681 }, 

682 metadata={ 

683 "created_from_ip": created_from_ip, 

684 "created_via": created_via, 

685 "created_user_agent": created_user_agent, 

686 }, 

687 ) 

688 

689 # Structured logging: Log successful server creation 

690 self._structured_logger.log( 

691 level="INFO", 

692 message="Server created successfully", 

693 event_type="server_created", 

694 component="server_service", 

695 server_id=db_server.id, 

696 server_name=db_server.name, 

697 visibility=visibility, 

698 created_by=created_by, 

699 user_email=created_by, 

700 ) 

701 

702 # Team name is loaded via db_server.team property from email_team relationship 

703 return self.convert_server_to_read(db_server) 

704 except IntegrityError as ie: 

705 db.rollback() 

706 logger.error(f"IntegrityErrors in group: {ie}") 

707 

708 # Structured logging: Log database integrity error 

709 self._structured_logger.log( 

710 level="ERROR", 

711 message="Server creation failed due to database integrity error", 

712 event_type="server_creation_failed", 

713 component="server_service", 

714 server_name=server_in.name, 

715 error_type="IntegrityError", 

716 error_message=str(ie), 

717 created_by=created_by, 

718 user_email=created_by, 

719 ) 

720 raise ie 

721 except ServerNameConflictError as se: 

722 db.rollback() 

723 

724 # Structured logging: Log name conflict error 

725 self._structured_logger.log( 

726 level="WARNING", 

727 message="Server creation failed due to name conflict", 

728 event_type="server_name_conflict", 

729 component="server_service", 

730 server_name=server_in.name, 

731 visibility=visibility, 

732 created_by=created_by, 

733 user_email=created_by, 

734 ) 

735 raise se 

736 except Exception as ex: 

737 db.rollback() 

738 

739 # Structured logging: Log generic server creation failure 

740 self._structured_logger.log( 

741 level="ERROR", 

742 message="Server creation failed", 

743 event_type="server_creation_failed", 

744 component="server_service", 

745 server_name=server_in.name, 

746 error_type=type(ex).__name__, 

747 error_message=str(ex), 

748 created_by=created_by, 

749 user_email=created_by, 

750 ) 

751 raise ServerError(f"Failed to register server: {str(ex)}") 

752 

753 async def list_servers( 

754 self, 

755 db: Session, 

756 include_inactive: bool = False, 

757 tags: Optional[List[str]] = None, 

758 cursor: Optional[str] = None, 

759 limit: Optional[int] = None, 

760 page: Optional[int] = None, 

761 per_page: Optional[int] = None, 

762 user_email: Optional[str] = None, 

763 team_id: Optional[str] = None, 

764 visibility: Optional[str] = None, 

765 token_teams: Optional[List[str]] = None, 

766 ) -> Union[tuple[List[ServerRead], Optional[str]], Dict[str, Any]]: 

767 """List all registered servers with cursor or page-based pagination and optional team filtering. 

768 

769 Args: 

770 db: Database session. 

771 include_inactive: Whether to include inactive servers. 

772 tags: Filter servers by tags. If provided, only servers with at least one matching tag will be returned. 

773 cursor: Cursor for pagination (encoded last created_at and id). 

774 limit: Maximum number of servers to return. None for default, 0 for unlimited. 

775 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

776 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size. 

777 user_email: Email of user for team-based access control. None for no access control. 

778 team_id: Optional team ID to filter by specific team (requires user_email). 

779 visibility: Optional visibility filter (private, team, public) (requires user_email). 

780 token_teams: Optional list of team IDs from the token (None=unrestricted, []=public-only). 

781 

782 Returns: 

783 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}} 

784 If cursor is provided or neither: tuple of (list of ServerRead objects, next_cursor). 

785 

786 Examples: 

787 >>> from mcpgateway.services.server_service import ServerService 

788 >>> from unittest.mock import MagicMock 

789 >>> service = ServerService() 

790 >>> db = MagicMock() 

791 >>> server_read = MagicMock() 

792 >>> service.convert_server_to_read = MagicMock(return_value=server_read) 

793 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()] 

794 >>> import asyncio 

795 >>> servers, cursor = asyncio.run(service.list_servers(db)) 

796 >>> isinstance(servers, list) and cursor is None 

797 True 

798 """ 

799 # Check cache for first page only 

800 # SECURITY: Only cache public-only results (token_teams=[]) 

801 # - token_teams=None (admin bypass): Don't cache - admin sees all, should be fresh 

802 # - token_teams=[] (public-only): Cache - same result for all public-only users 

803 # - token_teams=[...] (team-scoped): Don't cache - results vary by team 

804 # - user_email set: Don't cache - results vary by user ownership 

805 cache = _get_registry_cache() 

806 is_public_only = token_teams is not None and len(token_teams) == 0 

807 use_cache = cursor is None and user_email is None and page is None and is_public_only 

808 if use_cache: 

809 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None) 

810 cached = await cache.get("servers", filters_hash) 

811 if cached is not None: 

812 # Reconstruct ServerRead objects from cached dicts 

813 cached_servers = [ServerRead.model_validate(s).masked() for s in cached["servers"]] 

814 return (cached_servers, cached.get("next_cursor")) 

815 

816 # Build base query with ordering and eager load relationships to avoid N+1 

817 query = ( 

818 select(DbServer) 

819 .options( 

820 selectinload(DbServer.tools), 

821 selectinload(DbServer.resources), 

822 selectinload(DbServer.prompts), 

823 selectinload(DbServer.a2a_agents), 

824 joinedload(DbServer.email_team), 

825 ) 

826 .order_by(desc(DbServer.created_at), desc(DbServer.id)) 

827 ) 

828 

829 # Apply active/inactive filter 

830 if not include_inactive: 

831 query = query.where(DbServer.enabled) 

832 

833 query = await self._apply_access_control(query, db, user_email, token_teams, team_id) 

834 

835 if visibility: 

836 query = query.where(DbServer.visibility == visibility) 

837 

838 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats) 

839 if tags: 

840 query = query.where(json_contains_tag_expr(db, DbServer.tags, tags, match_any=True)) 

841 

842 # Use unified pagination helper - handles both page and cursor pagination 

843 pag_result = await unified_paginate( 

844 db=db, 

845 query=query, 

846 page=page, 

847 per_page=per_page, 

848 cursor=cursor, 

849 limit=limit, 

850 base_url="/admin/servers", # Used for page-based links 

851 query_params={"include_inactive": include_inactive} if include_inactive else {}, 

852 ) 

853 

854 next_cursor = None 

855 # Extract servers based on pagination type 

856 if page is not None: 

857 # Page-based: pag_result is a dict 

858 servers_db = pag_result["data"] 

859 else: 

860 # Cursor-based: pag_result is a tuple 

861 servers_db, next_cursor = pag_result 

862 

863 db.commit() # Release transaction to avoid idle-in-transaction 

864 

865 # Convert to ServerRead (common for both pagination types) 

866 # Team names are loaded via joinedload(DbServer.email_team) 

867 result = [] 

868 for s in servers_db: 

869 try: 

870 result.append(self.convert_server_to_read(s, include_metrics=False)) 

871 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

872 logger.exception(f"Failed to convert server {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}") 

873 # Continue with remaining servers instead of failing completely 

874 

875 # Return appropriate format based on pagination type 

876 if page is not None: 

877 # Page-based format 

878 return { 

879 "data": result, 

880 "pagination": pag_result["pagination"], 

881 "links": pag_result["links"], 

882 } 

883 

884 # Cursor-based format 

885 

886 # Cache first page results - only for public-only queries (no user/team filtering) 

887 # SECURITY: Only cache public-only results (token_teams=[]), never admin bypass or team-scoped 

888 if cursor is None and user_email is None and is_public_only: 

889 try: 

890 cache_data = {"servers": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor} 

891 await cache.set("servers", cache_data, filters_hash) 

892 except AttributeError: 

893 pass # Skip caching if result objects don't support model_dump (e.g., in doctests) 

894 

895 return (result, next_cursor) 

896 

897 async def list_servers_for_user( 

898 self, db: Session, user_email: str, team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100 

899 ) -> List[ServerRead]: 

900 """ 

901 DEPRECATED: Use list_servers() with user_email parameter instead. 

902 

903 This method is maintained for backward compatibility but is no longer used. 

904 New code should call list_servers() with user_email, team_id, and visibility parameters. 

905 

906 List servers user has access to with team filtering. 

907 

908 Args: 

909 db: Database session 

910 user_email: Email of the user requesting servers 

911 team_id: Optional team ID to filter by specific team 

912 visibility: Optional visibility filter (private, team, public) 

913 include_inactive: Whether to include inactive servers 

914 skip: Number of servers to skip for pagination 

915 limit: Maximum number of servers to return 

916 

917 Returns: 

918 List[ServerRead]: Servers the user has access to 

919 """ 

920 # Build query following existing patterns from list_servers() 

921 team_service = TeamManagementService(db) 

922 user_teams = await team_service.get_user_teams(user_email) 

923 team_ids = [team.id for team in user_teams] 

924 

925 # Eager load relationships to avoid N+1 queries 

926 query = select(DbServer).options( 

927 selectinload(DbServer.tools), 

928 selectinload(DbServer.resources), 

929 selectinload(DbServer.prompts), 

930 selectinload(DbServer.a2a_agents), 

931 joinedload(DbServer.email_team), 

932 ) 

933 

934 # Apply active/inactive filter 

935 if not include_inactive: 

936 query = query.where(DbServer.enabled) 

937 

938 if team_id: 

939 if team_id not in team_ids: 

940 return [] # No access to team 

941 

942 access_conditions = [] 

943 # Filter by specific team 

944 access_conditions.append(and_(DbServer.team_id == team_id, DbServer.visibility.in_(["team", "public"]))) 

945 

946 access_conditions.append(and_(DbServer.team_id == team_id, DbServer.owner_email == user_email)) 

947 

948 query = query.where(or_(*access_conditions)) 

949 else: 

950 # Get user's accessible teams 

951 # Build access conditions following existing patterns 

952 access_conditions = [] 

953 

954 # 1. User's personal resources (owner_email matches) 

955 access_conditions.append(DbServer.owner_email == user_email) 

956 

957 # 2. Team resources where user is member 

958 if team_ids: 

959 access_conditions.append(and_(DbServer.team_id.in_(team_ids), DbServer.visibility.in_(["team", "public"]))) 

960 

961 # 3. Public resources (if visibility allows) 

962 access_conditions.append(DbServer.visibility == "public") 

963 

964 query = query.where(or_(*access_conditions)) 

965 

966 # Apply visibility filter if specified 

967 if visibility: 

968 query = query.where(DbServer.visibility == visibility) 

969 

970 # Apply pagination following existing patterns 

971 query = query.offset(skip).limit(limit) 

972 

973 servers = db.execute(query).scalars().all() 

974 

975 db.commit() # Release transaction to avoid idle-in-transaction 

976 

977 # Skip metrics to avoid N+1 queries in list operations 

978 # Team names are loaded via joinedload(DbServer.email_team) 

979 result = [] 

980 for s in servers: 

981 try: 

982 result.append(self.convert_server_to_read(s, include_metrics=False)) 

983 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

984 logger.exception(f"Failed to convert server {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}") 

985 # Continue with remaining servers instead of failing completely 

986 return result 

987 

988 async def get_server(self, db: Session, server_id: str) -> ServerRead: 

989 """Retrieve server details by ID. 

990 

991 Args: 

992 db: Database session. 

993 server_id: The unique identifier of the server. 

994 

995 Returns: 

996 The corresponding ServerRead object. 

997 

998 Raises: 

999 ServerNotFoundError: If no server with the given ID exists. 

1000 

1001 Examples: 

1002 >>> from mcpgateway.services.server_service import ServerService 

1003 >>> from unittest.mock import MagicMock 

1004 >>> service = ServerService() 

1005 >>> db = MagicMock() 

1006 >>> server = MagicMock() 

1007 >>> db.get.return_value = server 

1008 >>> service.convert_server_to_read = MagicMock(return_value='server_read') 

1009 >>> import asyncio 

1010 >>> asyncio.run(service.get_server(db, 'server_id')) 

1011 'server_read' 

1012 """ 

1013 server = db.execute( 

1014 select(DbServer) 

1015 .options( 

1016 selectinload(DbServer.tools), 

1017 selectinload(DbServer.resources), 

1018 selectinload(DbServer.prompts), 

1019 selectinload(DbServer.a2a_agents), 

1020 joinedload(DbServer.email_team), 

1021 ) 

1022 .where(DbServer.id == server_id) 

1023 ).scalar_one_or_none() 

1024 if not server: 

1025 raise ServerNotFoundError(f"Server not found: {server_id}") 

1026 server_data = { 

1027 "id": server.id, 

1028 "name": server.name, 

1029 "description": server.description, 

1030 "icon": server.icon, 

1031 "created_at": server.created_at, 

1032 "updated_at": server.updated_at, 

1033 "enabled": server.enabled, 

1034 "associated_tools": [tool.name for tool in server.tools], 

1035 "associated_resources": [res.id for res in server.resources], 

1036 "associated_prompts": [prompt.id for prompt in server.prompts], 

1037 } 

1038 logger.debug(f"Server Data: {server_data}") 

1039 # Team name is loaded via server.team property from email_team relationship 

1040 server_read = self.convert_server_to_read(server) 

1041 

1042 self._structured_logger.log( 

1043 level="INFO", 

1044 message="Server retrieved successfully", 

1045 event_type="server_viewed", 

1046 component="server_service", 

1047 server_id=server.id, 

1048 server_name=server.name, 

1049 team_id=getattr(server, "team_id", None), 

1050 resource_type="server", 

1051 resource_id=server.id, 

1052 custom_fields={ 

1053 "enabled": server.enabled, 

1054 "tool_count": len(getattr(server, "tools", []) or []), 

1055 "resource_count": len(getattr(server, "resources", []) or []), 

1056 "prompt_count": len(getattr(server, "prompts", []) or []), 

1057 }, 

1058 ) 

1059 

1060 self._audit_trail.log_action( 

1061 action="view_server", 

1062 resource_type="server", 

1063 resource_id=server.id, 

1064 resource_name=server.name, 

1065 user_id="system", 

1066 team_id=getattr(server, "team_id", None), 

1067 context={"enabled": server.enabled}, 

1068 db=db, 

1069 ) 

1070 

1071 return server_read 

1072 

1073 async def update_server( 

1074 self, 

1075 db: Session, 

1076 server_id: str, 

1077 server_update: ServerUpdate, 

1078 user_email: str, 

1079 modified_by: Optional[str] = None, 

1080 modified_from_ip: Optional[str] = None, 

1081 modified_via: Optional[str] = None, 

1082 modified_user_agent: Optional[str] = None, 

1083 ) -> ServerRead: 

1084 """Update an existing server. 

1085 

1086 Args: 

1087 db: Database session. 

1088 server_id: The unique identifier of the server. 

1089 server_update: Server update schema with new data. 

1090 user_email: email of the user performing the update (for permission checks). 

1091 modified_by: Username who modified this server. 

1092 modified_from_ip: IP address from which modification was made. 

1093 modified_via: Source of modification (api, ui, etc.). 

1094 modified_user_agent: User agent of the client making the modification. 

1095 

1096 Returns: 

1097 The updated ServerRead object. 

1098 

1099 Raises: 

1100 ServerNotFoundError: If the server is not found. 

1101 PermissionError: If user doesn't own the server. 

1102 ServerNameConflictError: If a new name conflicts with an existing server. 

1103 ServerError: For other update errors. 

1104 IntegrityError: If a database integrity error occurs. 

1105 ValueError: If visibility or team constraints are violated. 

1106 

1107 Examples: 

1108 >>> from mcpgateway.services.server_service import ServerService 

1109 >>> from unittest.mock import MagicMock, AsyncMock, patch 

1110 >>> from mcpgateway.schemas import ServerRead 

1111 >>> service = ServerService() 

1112 >>> db = MagicMock() 

1113 >>> server = MagicMock() 

1114 >>> server.id = 'server_id' 

1115 >>> server.name = 'test_server' 

1116 >>> server.owner_email = 'user_email' # Set owner to match user performing update 

1117 >>> server.team_id = None 

1118 >>> server.visibility = 'public' 

1119 >>> db.get.return_value = server 

1120 >>> db.commit = MagicMock() 

1121 >>> db.refresh = MagicMock() 

1122 >>> db.execute.return_value.scalar_one_or_none.return_value = None 

1123 >>> service.convert_server_to_read = MagicMock(return_value='server_read') 

1124 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes 

1125 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes 

1126 >>> ServerRead.model_validate = MagicMock(return_value='server_read') 

1127 >>> server_update = MagicMock() 

1128 >>> server_update.id = None # No UUID change 

1129 >>> server_update.name = None # No name change 

1130 >>> server_update.description = None 

1131 >>> server_update.icon = None 

1132 >>> server_update.visibility = None 

1133 >>> server_update.team_id = None 

1134 >>> import asyncio 

1135 >>> with patch('mcpgateway.services.server_service.get_for_update', return_value=server): 

1136 ... asyncio.run(service.update_server(db, 'server_id', server_update, 'user_email')) 

1137 'server_read' 

1138 """ 

1139 try: 

1140 server = get_for_update( 

1141 db, 

1142 DbServer, 

1143 server_id, 

1144 options=[ 

1145 selectinload(DbServer.tools), 

1146 selectinload(DbServer.resources), 

1147 selectinload(DbServer.prompts), 

1148 selectinload(DbServer.a2a_agents), 

1149 selectinload(DbServer.email_team), 

1150 ], 

1151 ) 

1152 if not server: 

1153 raise ServerNotFoundError(f"Server not found: {server_id}") 

1154 

1155 # Check ownership if user_email provided 

1156 if user_email: 

1157 # First-Party 

1158 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

1159 

1160 permission_service = PermissionService(db) 

1161 if not await permission_service.check_resource_ownership(user_email, server): 

1162 raise PermissionError("Only the owner can update this server") 

1163 

1164 # Check for name conflict if name is being changed and visibility is public 

1165 if server_update.name and server_update.name != server.name: 

1166 visibility = server_update.visibility or server.visibility 

1167 team_id = server_update.team_id or server.team_id 

1168 if visibility.lower() == "public": 

1169 # Check for existing public server with the same name 

1170 existing_server = get_for_update(db, DbServer, where=and_(DbServer.name == server_update.name, DbServer.visibility == "public", DbServer.id != server.id)) 

1171 if existing_server: 

1172 raise ServerNameConflictError(server_update.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility) 

1173 elif visibility.lower() == "team" and team_id: 

1174 # Check for existing team server with the same name 

1175 existing_server = get_for_update( 

1176 db, DbServer, where=and_(DbServer.name == server_update.name, DbServer.visibility == "team", DbServer.team_id == team_id, DbServer.id != server.id) 

1177 ) 

1178 if existing_server: 

1179 raise ServerNameConflictError(server_update.name, enabled=existing_server.enabled, server_id=existing_server.id, visibility=existing_server.visibility) 

1180 

1181 # Update simple fields 

1182 if server_update.id is not None and server_update.id != server.id: 

1183 # Check if the new UUID is already in use 

1184 existing = db.get(DbServer, server_update.id) 

1185 if existing: 

1186 raise ServerError(f"Server with ID {server_update.id} already exists") 

1187 server.id = server_update.id 

1188 if server_update.name is not None: 

1189 server.name = server_update.name 

1190 if server_update.description is not None: 

1191 server.description = server_update.description 

1192 if server_update.icon is not None: 

1193 server.icon = server_update.icon 

1194 

1195 if server_update.visibility is not None: 

1196 new_visibility = server_update.visibility 

1197 

1198 # Validate visibility transitions 

1199 if new_visibility == "team": 

1200 target_team_id = server_update.team_id if server_update.team_id is not None else server.team_id 

1201 _validate_server_team_assignment(db, user_email, target_team_id) 

1202 

1203 elif new_visibility == "public": 

1204 # Optional: Check if user has permission to make resources public 

1205 # This could be a platform-level permission 

1206 pass 

1207 

1208 server.visibility = new_visibility 

1209 

1210 if server_update.team_id is not None: 

1211 if server_update.team_id != server.team_id: 

1212 _validate_server_team_assignment(db, user_email, server_update.team_id) 

1213 server.team_id = server_update.team_id 

1214 

1215 # Update associated tools if provided using bulk query 

1216 if server_update.associated_tools is not None: 

1217 server.tools = [] 

1218 if server_update.associated_tools: 

1219 tool_ids = [tool_id for tool_id in server_update.associated_tools if tool_id] 

1220 if tool_ids: 

1221 tools = db.execute(select(DbTool).where(DbTool.id.in_(tool_ids))).scalars().all() 

1222 server.tools = list(tools) 

1223 

1224 # Update associated resources if provided using bulk query 

1225 if server_update.associated_resources is not None: 

1226 server.resources = [] 

1227 if server_update.associated_resources: 

1228 resource_ids = [resource_id for resource_id in server_update.associated_resources if resource_id] 

1229 if resource_ids: 

1230 resources = db.execute(select(DbResource).where(DbResource.id.in_(resource_ids))).scalars().all() 

1231 server.resources = list(resources) 

1232 

1233 # Update associated prompts if provided using bulk query 

1234 if server_update.associated_prompts is not None: 

1235 server.prompts = [] 

1236 if server_update.associated_prompts: 

1237 prompt_ids = [prompt_id for prompt_id in server_update.associated_prompts if prompt_id] 

1238 if prompt_ids: 

1239 prompts = db.execute(select(DbPrompt).where(DbPrompt.id.in_(prompt_ids))).scalars().all() 

1240 server.prompts = list(prompts) 

1241 

1242 # Update tags if provided 

1243 if server_update.tags is not None: 

1244 server.tags = server_update.tags 

1245 

1246 # Update OAuth 2.0 configuration if provided 

1247 # Track if OAuth is being explicitly disabled to prevent config re-assignment 

1248 oauth_being_disabled = server_update.oauth_enabled is not None and not server_update.oauth_enabled 

1249 

1250 if server_update.oauth_enabled is not None: 

1251 server.oauth_enabled = server_update.oauth_enabled 

1252 # If OAuth is being disabled, clear the config 

1253 if oauth_being_disabled: 

1254 server.oauth_config = None 

1255 

1256 # Only update oauth_config if OAuth is not being explicitly disabled 

1257 # This prevents the case where oauth_enabled=False and oauth_config are both provided 

1258 if not oauth_being_disabled: 

1259 if hasattr(server_update, "model_fields_set") and "oauth_config" in server_update.model_fields_set: 

1260 server.oauth_config = await protect_oauth_config_for_storage(server_update.oauth_config, existing_oauth_config=server.oauth_config) 

1261 elif server_update.oauth_config is not None: 

1262 server.oauth_config = await protect_oauth_config_for_storage(server_update.oauth_config, existing_oauth_config=server.oauth_config) 

1263 

1264 # Update metadata fields 

1265 server.updated_at = datetime.now(timezone.utc) 

1266 if modified_by: 

1267 server.modified_by = modified_by 

1268 if modified_from_ip: 

1269 server.modified_from_ip = modified_from_ip 

1270 if modified_via: 

1271 server.modified_via = modified_via 

1272 if modified_user_agent: 

1273 server.modified_user_agent = modified_user_agent 

1274 if hasattr(server, "version") and server.version is not None: 

1275 server.version = server.version + 1 

1276 else: 

1277 server.version = 1 

1278 

1279 db.commit() 

1280 db.refresh(server) 

1281 # Force loading relationships 

1282 _ = server.tools, server.resources, server.prompts 

1283 

1284 # Invalidate cache after successful update 

1285 cache = _get_registry_cache() 

1286 await cache.invalidate_servers() 

1287 # Also invalidate tags cache since server tags may have changed 

1288 # First-Party 

1289 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

1290 

1291 await admin_stats_cache.invalidate_tags() 

1292 

1293 await self._notify_server_updated(server) 

1294 logger.info(f"Updated server: {server.name}") 

1295 

1296 # Structured logging: Audit trail for server update 

1297 changes = [] 

1298 if server_update.name: 

1299 changes.append(f"name: {server_update.name}") 

1300 if server_update.visibility: 

1301 changes.append(f"visibility: {server_update.visibility}") 

1302 if server_update.team_id: 

1303 changes.append(f"team_id: {server_update.team_id}") 

1304 

1305 self._audit_trail.log_action( 

1306 user_id=user_email or "system", 

1307 action="update_server", 

1308 resource_type="server", 

1309 resource_id=server.id, 

1310 details={ 

1311 "server_name": server.name, 

1312 "changes": ", ".join(changes) if changes else "metadata only", 

1313 "version": server.version, 

1314 }, 

1315 metadata={ 

1316 "modified_from_ip": modified_from_ip, 

1317 "modified_via": modified_via, 

1318 "modified_user_agent": modified_user_agent, 

1319 }, 

1320 ) 

1321 

1322 # Structured logging: Log successful server update 

1323 self._structured_logger.log( 

1324 level="INFO", 

1325 message="Server updated successfully", 

1326 event_type="server_updated", 

1327 component="server_service", 

1328 server_id=server.id, 

1329 server_name=server.name, 

1330 modified_by=user_email, 

1331 user_email=user_email, 

1332 ) 

1333 

1334 # Build a dictionary with associated IDs 

1335 # Team name is loaded via server.team property from email_team relationship 

1336 server_data = { 

1337 "id": server.id, 

1338 "name": server.name, 

1339 "description": server.description, 

1340 "icon": server.icon, 

1341 "team": server.team, 

1342 "created_at": server.created_at, 

1343 "updated_at": server.updated_at, 

1344 "enabled": server.enabled, 

1345 "associated_tools": [tool.id for tool in server.tools], 

1346 "associated_resources": [res.id for res in server.resources], 

1347 "associated_prompts": [prompt.id for prompt in server.prompts], 

1348 } 

1349 logger.debug(f"Server Data: {server_data}") 

1350 return self.convert_server_to_read(server) 

1351 except IntegrityError as ie: 

1352 db.rollback() 

1353 logger.error(f"IntegrityErrors in group: {ie}") 

1354 

1355 # Structured logging: Log database integrity error 

1356 self._structured_logger.log( 

1357 level="ERROR", 

1358 message="Server update failed due to database integrity error", 

1359 event_type="server_update_failed", 

1360 component="server_service", 

1361 server_id=server_id, 

1362 error_type="IntegrityError", 

1363 error_message=str(ie), 

1364 modified_by=user_email, 

1365 user_email=user_email, 

1366 ) 

1367 raise ie 

1368 except ServerNameConflictError as snce: 

1369 db.rollback() 

1370 logger.error(f"Server name conflict: {snce}") 

1371 

1372 # Structured logging: Log name conflict error 

1373 self._structured_logger.log( 

1374 level="WARNING", 

1375 message="Server update failed due to name conflict", 

1376 event_type="server_name_conflict", 

1377 component="server_service", 

1378 server_id=server_id, 

1379 modified_by=user_email, 

1380 user_email=user_email, 

1381 ) 

1382 raise snce 

1383 except Exception as e: 

1384 db.rollback() 

1385 

1386 # Structured logging: Log generic server update failure 

1387 self._structured_logger.log( 

1388 level="ERROR", 

1389 message="Server update failed", 

1390 event_type="server_update_failed", 

1391 component="server_service", 

1392 server_id=server_id, 

1393 error_type=type(e).__name__, 

1394 error_message=str(e), 

1395 modified_by=user_email, 

1396 user_email=user_email, 

1397 ) 

1398 raise ServerError(f"Failed to update server: {str(e)}") 

1399 

1400 async def set_server_state(self, db: Session, server_id: str, activate: bool, user_email: Optional[str] = None) -> ServerRead: 

1401 """Set the activation status of a server. 

1402 

1403 Args: 

1404 db: Database session. 

1405 server_id: The unique identifier of the server. 

1406 activate: True to activate, False to deactivate. 

1407 user_email: Optional[str] The email of the user to check if the user has permission to modify. 

1408 

1409 Returns: 

1410 The updated ServerRead object. 

1411 

1412 Raises: 

1413 ServerNotFoundError: If the server is not found. 

1414 ServerLockConflictError: If the server row is locked by another transaction. 

1415 ServerError: For other errors. 

1416 PermissionError: If user doesn't own the agent. 

1417 

1418 Examples: 

1419 >>> from mcpgateway.services.server_service import ServerService 

1420 >>> from unittest.mock import MagicMock, AsyncMock, patch 

1421 >>> from mcpgateway.schemas import ServerRead 

1422 >>> service = ServerService() 

1423 >>> db = MagicMock() 

1424 >>> server = MagicMock() 

1425 >>> db.get.return_value = server 

1426 >>> db.commit = MagicMock() 

1427 >>> db.refresh = MagicMock() 

1428 >>> service._notify_server_activated = AsyncMock() 

1429 >>> service._notify_server_deactivated = AsyncMock() 

1430 >>> service.convert_server_to_read = MagicMock(return_value='server_read') 

1431 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes 

1432 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes 

1433 >>> ServerRead.model_validate = MagicMock(return_value='server_read') 

1434 >>> import asyncio 

1435 >>> asyncio.run(service.set_server_state(db, 'server_id', True)) 

1436 'server_read' 

1437 """ 

1438 try: 

1439 # Use nowait=True to fail fast if row is locked, preventing lock contention under high load 

1440 try: 

1441 server = get_for_update( 

1442 db, 

1443 DbServer, 

1444 server_id, 

1445 nowait=True, 

1446 options=[ 

1447 selectinload(DbServer.tools), 

1448 selectinload(DbServer.resources), 

1449 selectinload(DbServer.prompts), 

1450 selectinload(DbServer.a2a_agents), 

1451 selectinload(DbServer.email_team), 

1452 ], 

1453 ) 

1454 except OperationalError as lock_err: 

1455 # Row is locked by another transaction - fail fast with 409 

1456 db.rollback() 

1457 raise ServerLockConflictError(f"Server {server_id} is currently being modified by another request") from lock_err 

1458 if not server: 

1459 raise ServerNotFoundError(f"Server not found: {server_id}") 

1460 

1461 if user_email: 

1462 # First-Party 

1463 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

1464 

1465 permission_service = PermissionService(db) 

1466 if not await permission_service.check_resource_ownership(user_email, server): 

1467 raise PermissionError("Only the owner can activate the Server" if activate else "Only the owner can deactivate the Server") 

1468 

1469 if server.enabled != activate: 

1470 server.enabled = activate 

1471 server.updated_at = datetime.now(timezone.utc) 

1472 db.commit() 

1473 db.refresh(server) 

1474 

1475 # Invalidate cache after status change 

1476 cache = _get_registry_cache() 

1477 await cache.invalidate_servers() 

1478 

1479 if activate: 

1480 await self._notify_server_activated(server) 

1481 else: 

1482 await self._notify_server_deactivated(server) 

1483 logger.info(f"Server {server.name} {'activated' if activate else 'deactivated'}") 

1484 

1485 # Structured logging: Audit trail for server state change 

1486 self._audit_trail.log_action( 

1487 user_id=user_email or "system", 

1488 action="activate_server" if activate else "deactivate_server", 

1489 resource_type="server", 

1490 resource_id=server.id, 

1491 details={ 

1492 "server_name": server.name, 

1493 "new_status": "active" if activate else "inactive", 

1494 }, 

1495 ) 

1496 

1497 # Structured logging: Log server status change 

1498 self._structured_logger.log( 

1499 level="INFO", 

1500 message=f"Server {'activated' if activate else 'deactivated'}", 

1501 event_type="server_status_changed", 

1502 component="server_service", 

1503 server_id=server.id, 

1504 server_name=server.name, 

1505 new_status="active" if activate else "inactive", 

1506 changed_by=user_email, 

1507 user_email=user_email, 

1508 ) 

1509 

1510 # Team name is loaded via server.team property from email_team relationship 

1511 server_data = { 

1512 "id": server.id, 

1513 "name": server.name, 

1514 "description": server.description, 

1515 "icon": server.icon, 

1516 "team": server.team, 

1517 "created_at": server.created_at, 

1518 "updated_at": server.updated_at, 

1519 "enabled": server.enabled, 

1520 "associated_tools": [tool.id for tool in server.tools], 

1521 "associated_resources": [res.id for res in server.resources], 

1522 "associated_prompts": [prompt.id for prompt in server.prompts], 

1523 } 

1524 logger.info(f"Server Data: {server_data}") 

1525 return self.convert_server_to_read(server) 

1526 except PermissionError as e: 

1527 # Structured logging: Log permission error 

1528 self._structured_logger.log( 

1529 level="WARNING", 

1530 message="Server state change failed due to insufficient permissions", 

1531 event_type="server_state_change_permission_denied", 

1532 component="server_service", 

1533 server_id=server_id, 

1534 user_email=user_email, 

1535 ) 

1536 raise e 

1537 except ServerLockConflictError: 

1538 # Re-raise lock conflicts without wrapping - allows 409 response 

1539 raise 

1540 except ServerNotFoundError: 

1541 # Re-raise not found without wrapping - allows 404 response 

1542 raise 

1543 except Exception as e: 

1544 db.rollback() 

1545 

1546 # Structured logging: Log generic server state change failure 

1547 self._structured_logger.log( 

1548 level="ERROR", 

1549 message="Server state change failed", 

1550 event_type="server_state_change_failed", 

1551 component="server_service", 

1552 server_id=server_id, 

1553 error_type=type(e).__name__, 

1554 error_message=str(e), 

1555 user_email=user_email, 

1556 ) 

1557 raise ServerError(f"Failed to set server state: {str(e)}") 

1558 

1559 async def delete_server(self, db: Session, server_id: str, user_email: Optional[str] = None, purge_metrics: bool = False) -> None: 

1560 """Permanently delete a server. 

1561 

1562 Args: 

1563 db: Database session. 

1564 server_id: The unique identifier of the server. 

1565 user_email: Email of user performing deletion (for ownership check). 

1566 purge_metrics: If True, delete raw + rollup metrics for this server. 

1567 

1568 Raises: 

1569 ServerNotFoundError: If the server is not found. 

1570 PermissionError: If user doesn't own the server. 

1571 ServerError: For other deletion errors. 

1572 

1573 Examples: 

1574 >>> from mcpgateway.services.server_service import ServerService 

1575 >>> from unittest.mock import MagicMock, AsyncMock, patch 

1576 >>> service = ServerService() 

1577 >>> db = MagicMock() 

1578 >>> server = MagicMock() 

1579 >>> db.get.return_value = server 

1580 >>> db.delete = MagicMock() 

1581 >>> db.commit = MagicMock() 

1582 >>> service._notify_server_deleted = AsyncMock() 

1583 >>> service._structured_logger = MagicMock() # Mock structured logger to prevent database writes 

1584 >>> service._audit_trail = MagicMock() # Mock audit trail to prevent database writes 

1585 >>> import asyncio 

1586 >>> asyncio.run(service.delete_server(db, 'server_id', 'user@example.com')) 

1587 """ 

1588 try: 

1589 server = db.get(DbServer, server_id) 

1590 if not server: 

1591 raise ServerNotFoundError(f"Server not found: {server_id}") 

1592 

1593 # Check ownership if user_email provided 

1594 if user_email: 

1595 # First-Party 

1596 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

1597 

1598 permission_service = PermissionService(db) 

1599 if not await permission_service.check_resource_ownership(user_email, server): 

1600 raise PermissionError("Only the owner can delete this server") 

1601 

1602 server_info = {"id": server.id, "name": server.name} 

1603 if purge_metrics: 

1604 with pause_rollup_during_purge(reason=f"purge_server:{server_id}"): 

1605 delete_metrics_in_batches(db, ServerMetric, ServerMetric.server_id, server_id) 

1606 delete_metrics_in_batches(db, ServerMetricsHourly, ServerMetricsHourly.server_id, server_id) 

1607 db.delete(server) 

1608 db.commit() 

1609 

1610 # Invalidate cache after successful deletion 

1611 cache = _get_registry_cache() 

1612 await cache.invalidate_servers() 

1613 # Also invalidate tags cache since server tags may have changed 

1614 # First-Party 

1615 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

1616 

1617 await admin_stats_cache.invalidate_tags() 

1618 # First-Party 

1619 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

1620 

1621 metrics_cache.invalidate_prefix("top_servers:") 

1622 metrics_cache.invalidate("servers") 

1623 

1624 await self._notify_server_deleted(server_info) 

1625 logger.info(f"Deleted server: {server_info['name']}") 

1626 

1627 # Structured logging: Audit trail for server deletion 

1628 self._audit_trail.log_action( 

1629 user_id=user_email or "system", 

1630 action="delete_server", 

1631 resource_type="server", 

1632 resource_id=server_info["id"], 

1633 details={ 

1634 "server_name": server_info["name"], 

1635 }, 

1636 ) 

1637 

1638 # Structured logging: Log successful server deletion 

1639 self._structured_logger.log( 

1640 level="INFO", 

1641 message="Server deleted successfully", 

1642 event_type="server_deleted", 

1643 component="server_service", 

1644 server_id=server_info["id"], 

1645 server_name=server_info["name"], 

1646 deleted_by=user_email, 

1647 user_email=user_email, 

1648 purge_metrics=purge_metrics, 

1649 ) 

1650 except PermissionError as pe: 

1651 db.rollback() 

1652 

1653 # Structured logging: Log permission error 

1654 self._structured_logger.log( 

1655 level="WARNING", 

1656 message="Server deletion failed due to insufficient permissions", 

1657 event_type="server_deletion_permission_denied", 

1658 component="server_service", 

1659 server_id=server_id, 

1660 user_email=user_email, 

1661 ) 

1662 raise pe 

1663 except Exception as e: 

1664 db.rollback() 

1665 

1666 # Structured logging: Log generic server deletion failure 

1667 self._structured_logger.log( 

1668 level="ERROR", 

1669 message="Server deletion failed", 

1670 event_type="server_deletion_failed", 

1671 component="server_service", 

1672 server_id=server_id, 

1673 error_type=type(e).__name__, 

1674 error_message=str(e), 

1675 user_email=user_email, 

1676 ) 

1677 raise ServerError(f"Failed to delete server: {str(e)}") 

1678 

1679 async def _publish_event(self, event: Dict[str, Any]) -> None: 

1680 """ 

1681 Publish an event to all subscribed queues. 

1682 

1683 Args: 

1684 event: Event to publish 

1685 """ 

1686 for queue in self._event_subscribers: 

1687 await queue.put(event) 

1688 

1689 async def subscribe_events(self) -> AsyncGenerator[Dict[str, Any], None]: 

1690 """Subscribe to server events. 

1691 

1692 Yields: 

1693 Server event messages. 

1694 """ 

1695 queue: asyncio.Queue = asyncio.Queue() 

1696 self._event_subscribers.append(queue) 

1697 try: 

1698 while True: 

1699 event = await queue.get() 

1700 yield event 

1701 finally: 

1702 self._event_subscribers.remove(queue) 

1703 

1704 async def _notify_server_added(self, server: DbServer) -> None: 

1705 """ 

1706 Notify subscribers that a new server has been added. 

1707 

1708 Args: 

1709 server: Server to add 

1710 """ 

1711 associated_tools = [tool.id for tool in server.tools] if server.tools else [] 

1712 associated_resources = [res.id for res in server.resources] if server.resources else [] 

1713 associated_prompts = [prompt.id for prompt in server.prompts] if server.prompts else [] 

1714 event = { 

1715 "type": "server_added", 

1716 "data": { 

1717 "id": server.id, 

1718 "name": server.name, 

1719 "description": server.description, 

1720 "icon": server.icon, 

1721 "associated_tools": associated_tools, 

1722 "associated_resources": associated_resources, 

1723 "associated_prompts": associated_prompts, 

1724 "enabled": server.enabled, 

1725 }, 

1726 "timestamp": datetime.now(timezone.utc).isoformat(), 

1727 } 

1728 await self._publish_event(event) 

1729 

1730 async def _notify_server_updated(self, server: DbServer) -> None: 

1731 """ 

1732 Notify subscribers that a server has been updated. 

1733 

1734 Args: 

1735 server: Server to update 

1736 """ 

1737 associated_tools = [tool.id for tool in server.tools] if server.tools else [] 

1738 associated_resources = [res.id for res in server.resources] if server.resources else [] 

1739 associated_prompts = [prompt.id for prompt in server.prompts] if server.prompts else [] 

1740 event = { 

1741 "type": "server_updated", 

1742 "data": { 

1743 "id": server.id, 

1744 "name": server.name, 

1745 "description": server.description, 

1746 "icon": server.icon, 

1747 "associated_tools": associated_tools, 

1748 "associated_resources": associated_resources, 

1749 "associated_prompts": associated_prompts, 

1750 "enabled": server.enabled, 

1751 }, 

1752 "timestamp": datetime.now(timezone.utc).isoformat(), 

1753 } 

1754 await self._publish_event(event) 

1755 

1756 async def _notify_server_activated(self, server: DbServer) -> None: 

1757 """ 

1758 Notify subscribers that a server has been activated. 

1759 

1760 Args: 

1761 server: Server to activate 

1762 """ 

1763 event = { 

1764 "type": "server_activated", 

1765 "data": { 

1766 "id": server.id, 

1767 "name": server.name, 

1768 "enabled": True, 

1769 }, 

1770 "timestamp": datetime.now(timezone.utc).isoformat(), 

1771 } 

1772 await self._publish_event(event) 

1773 

1774 async def _notify_server_deactivated(self, server: DbServer) -> None: 

1775 """ 

1776 Notify subscribers that a server has been deactivated. 

1777 

1778 Args: 

1779 server: Server to deactivate 

1780 """ 

1781 event = { 

1782 "type": "server_deactivated", 

1783 "data": { 

1784 "id": server.id, 

1785 "name": server.name, 

1786 "enabled": False, 

1787 }, 

1788 "timestamp": datetime.now(timezone.utc).isoformat(), 

1789 } 

1790 await self._publish_event(event) 

1791 

1792 async def _notify_server_deleted(self, server_info: Dict[str, Any]) -> None: 

1793 """ 

1794 Notify subscribers that a server has been deleted. 

1795 

1796 Args: 

1797 server_info: Dictionary on server to be deleted 

1798 """ 

1799 event = { 

1800 "type": "server_deleted", 

1801 "data": server_info, 

1802 "timestamp": datetime.now(timezone.utc).isoformat(), 

1803 } 

1804 await self._publish_event(event) 

1805 

1806 # --- Metrics --- 

1807 async def aggregate_metrics(self, db: Session) -> ServerMetrics: 

1808 """ 

1809 Aggregate metrics for all server invocations across all servers. 

1810 

1811 Combines recent raw metrics (within retention period) with historical 

1812 hourly rollups for complete historical coverage. Uses in-memory caching 

1813 (10s TTL) to reduce database load under high request rates. 

1814 

1815 Args: 

1816 db: Database session 

1817 

1818 Returns: 

1819 ServerMetrics: Aggregated metrics from raw + hourly rollup tables. 

1820 

1821 Examples: 

1822 >>> from mcpgateway.services.server_service import ServerService 

1823 >>> service = ServerService() 

1824 >>> # Method exists and is callable 

1825 >>> callable(service.aggregate_metrics) 

1826 True 

1827 """ 

1828 # Check cache first (if enabled) 

1829 # First-Party 

1830 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel 

1831 

1832 if is_cache_enabled(): 

1833 cached = metrics_cache.get("servers") 

1834 if cached is not None: 

1835 return ServerMetrics(**cached) 

1836 

1837 # Use combined raw + rollup query for full historical coverage 

1838 # First-Party 

1839 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel 

1840 

1841 result = aggregate_metrics_combined(db, "server") 

1842 

1843 metrics = ServerMetrics( 

1844 total_executions=result.total_executions, 

1845 successful_executions=result.successful_executions, 

1846 failed_executions=result.failed_executions, 

1847 failure_rate=result.failure_rate, 

1848 min_response_time=result.min_response_time, 

1849 max_response_time=result.max_response_time, 

1850 avg_response_time=result.avg_response_time, 

1851 last_execution_time=result.last_execution_time, 

1852 ) 

1853 

1854 # Cache the result as dict for serialization compatibility (if enabled) 

1855 if is_cache_enabled(): 

1856 metrics_cache.set("servers", metrics.model_dump()) 

1857 

1858 return metrics 

1859 

1860 async def reset_metrics(self, db: Session) -> None: 

1861 """ 

1862 Reset all server metrics by deleting raw and hourly rollup records. 

1863 

1864 Args: 

1865 db: Database session 

1866 

1867 Examples: 

1868 >>> from mcpgateway.services.server_service import ServerService 

1869 >>> from unittest.mock import MagicMock 

1870 >>> service = ServerService() 

1871 >>> db = MagicMock() 

1872 >>> db.execute = MagicMock() 

1873 >>> db.commit = MagicMock() 

1874 >>> import asyncio 

1875 >>> asyncio.run(service.reset_metrics(db)) 

1876 """ 

1877 db.execute(delete(ServerMetric)) 

1878 db.execute(delete(ServerMetricsHourly)) 

1879 db.commit() 

1880 

1881 # Invalidate metrics cache 

1882 # First-Party 

1883 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

1884 

1885 metrics_cache.invalidate("servers") 

1886 metrics_cache.invalidate_prefix("top_servers:") 

1887 

1888 def get_oauth_protected_resource_metadata(self, db: Session, server_id: str, resource_base_url: str) -> Dict[str, Any]: 

1889 """ 

1890 Get RFC 9728 OAuth 2.0 Protected Resource Metadata for a server. 

1891 

1892 This method retrieves the OAuth configuration for a server and formats it 

1893 according to RFC 9728 Protected Resource Metadata specification, enabling 

1894 MCP clients to discover OAuth authorization servers for browser-based SSO. 

1895 

1896 Args: 

1897 db: Database session. 

1898 server_id: The ID of the server. 

1899 resource_base_url: The base URL for the resource (e.g., "https://gateway.example.com/servers/abc123/mcp"). 

1900 

1901 Returns: 

1902 Dict containing RFC 9728 Protected Resource Metadata: 

1903 - resource: The protected resource identifier (URL with /mcp suffix) 

1904 - authorization_servers: JSON array of authorization server issuer URIs (RFC 9728 Section 2) 

1905 - bearer_methods_supported: Supported bearer token methods (always ["header"]) 

1906 - scopes_supported: Optional list of supported scopes 

1907 

1908 Raises: 

1909 ServerNotFoundError: If server doesn't exist, is disabled, or is non-public. 

1910 ServerError: If OAuth is not enabled or not properly configured. 

1911 

1912 Examples: 

1913 >>> from mcpgateway.services.server_service import ServerService 

1914 >>> service = ServerService() 

1915 >>> # Method exists and is callable 

1916 >>> callable(service.get_oauth_protected_resource_metadata) 

1917 True 

1918 """ 

1919 server = db.get(DbServer, server_id) 

1920 

1921 # Return not found for non-existent, disabled, or non-public servers 

1922 # (avoids leaking information about private/team servers) 

1923 if not server: 

1924 raise ServerNotFoundError(f"Server not found: {server_id}") 

1925 

1926 if not server.enabled: 

1927 raise ServerNotFoundError(f"Server not found: {server_id}") 

1928 

1929 if getattr(server, "visibility", "public") != "public": 

1930 raise ServerNotFoundError(f"Server not found: {server_id}") 

1931 

1932 # Check OAuth configuration 

1933 if not getattr(server, "oauth_enabled", False): 

1934 raise ServerError(f"OAuth not enabled for server: {server_id}") 

1935 

1936 oauth_config = getattr(server, "oauth_config", None) 

1937 if not oauth_config: 

1938 raise ServerError(f"OAuth not configured for server: {server_id}") 

1939 

1940 # Extract authorization server(s) - support both list and single value in config 

1941 authorization_servers = oauth_config.get("authorization_servers", []) 

1942 if not authorization_servers: 

1943 auth_server = oauth_config.get("authorization_server") 

1944 if auth_server: 

1945 authorization_servers = [auth_server] if isinstance(auth_server, str) else auth_server 

1946 

1947 if not authorization_servers: 

1948 raise ServerError(f"OAuth authorization_server not configured for server: {server_id}") 

1949 

1950 # Build RFC 9728 Protected Resource Metadata response 

1951 response_data: Dict[str, Any] = { 

1952 "resource": resource_base_url, 

1953 "authorization_servers": authorization_servers, 

1954 "bearer_methods_supported": ["header"], 

1955 } 

1956 

1957 # Add optional scopes if configured (never include secrets from oauth_config) 

1958 scopes = oauth_config.get("scopes_supported") or oauth_config.get("scopes") 

1959 if scopes: 

1960 response_data["scopes_supported"] = scopes 

1961 

1962 logger.debug(f"Returning OAuth protected resource metadata for server {server_id}") 

1963 return response_data 

1964 

1965 

1966# Lazy singleton - created on first access, not at module import time. 

1967# This avoids instantiation when only exception classes are imported. 

1968_server_service_instance = None # pylint: disable=invalid-name 

1969 

1970 

1971def __getattr__(name: str): 

1972 """Module-level __getattr__ for lazy singleton creation. 

1973 

1974 Args: 

1975 name: The attribute name being accessed. 

1976 

1977 Returns: 

1978 The server_service singleton instance if name is "server_service". 

1979 

1980 Raises: 

1981 AttributeError: If the attribute name is not "server_service". 

1982 """ 

1983 global _server_service_instance # pylint: disable=global-statement 

1984 if name == "server_service": 

1985 if _server_service_instance is None: 

1986 _server_service_instance = ServerService() 

1987 return _server_service_instance 

1988 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")