Coverage for mcpgateway / services / resource_service.py: 99%

1190 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/resource_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Resource Service Implementation. 

8This module implements resource management according to the MCP specification. 

9It handles: 

10- Resource registration and retrieval 

11- Resource templates and URI handling 

12- Resource subscriptions and updates 

13- Content type management 

14- Active/inactive resource management 

15 

16Examples: 

17 >>> from mcpgateway.services.resource_service import ResourceService, ResourceError 

18 >>> service = ResourceService() 

19 >>> isinstance(service._event_service, EventService) 

20 True 

21""" 

22 

23# Standard 

24import binascii 

25from datetime import datetime, timezone 

26from functools import lru_cache 

27import mimetypes 

28import os 

29import re 

30import ssl 

31import time 

32from types import SimpleNamespace 

33from typing import Any, AsyncGenerator, Dict, List, Optional, Union 

34import uuid 

35 

36# Third-Party 

37import httpx 

38from mcp import ClientSession 

39from mcp.client.sse import sse_client 

40from mcp.client.streamable_http import streamablehttp_client 

41import parse 

42from pydantic import ValidationError 

43from sqlalchemy import and_, delete, desc, not_, or_, select 

44from sqlalchemy.exc import IntegrityError, OperationalError 

45from sqlalchemy.orm import joinedload, Session 

46 

47# First-Party 

48from mcpgateway.common.models import ResourceContent, ResourceContents, ResourceTemplate, TextContent 

49from mcpgateway.common.validators import SecurityValidator 

50from mcpgateway.config import settings 

51from mcpgateway.db import EmailTeam, fresh_db_session 

52from mcpgateway.db import Gateway as DbGateway 

53from mcpgateway.db import get_for_update 

54from mcpgateway.db import Resource as DbResource 

55from mcpgateway.db import ResourceMetric, ResourceMetricsHourly 

56from mcpgateway.db import ResourceSubscription as DbSubscription 

57from mcpgateway.db import server_resource_association 

58from mcpgateway.observability import create_span 

59from mcpgateway.schemas import ResourceCreate, ResourceMetrics, ResourceRead, ResourceSubscription, ResourceUpdate, TopPerformer 

60from mcpgateway.services.audit_trail_service import get_audit_trail_service 

61from mcpgateway.services.base_service import BaseService 

62from mcpgateway.services.event_service import EventService 

63from mcpgateway.services.logging_service import LoggingService 

64from mcpgateway.services.mcp_session_pool import get_mcp_session_pool, TransportType 

65from mcpgateway.services.metrics_buffer_service import get_metrics_buffer_service 

66from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge 

67from mcpgateway.services.oauth_manager import OAuthManager 

68from mcpgateway.services.observability_service import current_trace_id, ObservabilityService 

69from mcpgateway.services.structured_logger import get_structured_logger 

70from mcpgateway.utils.gateway_access import build_gateway_auth_headers, check_gateway_access 

71from mcpgateway.utils.metrics_common import build_top_performers 

72from mcpgateway.utils.pagination import unified_paginate 

73from mcpgateway.utils.services_auth import decode_auth 

74from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr 

75from mcpgateway.utils.ssl_context_cache import get_cached_ssl_context 

76from mcpgateway.utils.url_auth import apply_query_param_auth, sanitize_exception_message 

77from mcpgateway.utils.validate_signature import validate_signature 

78 

79# Plugin support imports (conditional) 

80try: 

81 # First-Party 

82 from mcpgateway.plugins.framework import get_plugin_manager, GlobalContext, PluginContextTable, ResourceHookType, ResourcePostFetchPayload, ResourcePreFetchPayload 

83 

84 PLUGINS_AVAILABLE = True 

85except ImportError: 

86 PLUGINS_AVAILABLE = False 

87 

88# Cache import (lazy to avoid circular dependencies) 

89_REGISTRY_CACHE = None 

90 

91 

92def _get_registry_cache(): 

93 """Get registry cache singleton lazily. 

94 

95 Returns: 

96 RegistryCache instance. 

97 """ 

98 global _REGISTRY_CACHE # pylint: disable=global-statement 

99 if _REGISTRY_CACHE is None: 

100 # First-Party 

101 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel 

102 

103 _REGISTRY_CACHE = registry_cache 

104 return _REGISTRY_CACHE 

105 

106 

107# Initialize logging service first 

108logging_service = LoggingService() 

109logger = logging_service.get_logger(__name__) 

110 

111# Initialize structured logger, audit trail, and metrics buffer for resource operations 

112structured_logger = get_structured_logger("resource_service") 

113audit_trail = get_audit_trail_service() 

114metrics_buffer = get_metrics_buffer_service() 

115 

116 

117class ResourceError(Exception): 

118 """Base class for resource-related errors.""" 

119 

120 

121class ResourceNotFoundError(ResourceError): 

122 """Raised when a requested resource is not found.""" 

123 

124 

125class ResourceURIConflictError(ResourceError): 

126 """Raised when a resource URI conflicts with existing (active or inactive) resource.""" 

127 

128 def __init__(self, uri: str, enabled: bool = True, resource_id: Optional[int] = None, visibility: str = "public") -> None: 

129 """Initialize the error with resource information. 

130 

131 Args: 

132 uri: The conflicting resource URI 

133 enabled: Whether the existing resource is active 

134 resource_id: ID of the existing resource if available 

135 visibility: Visibility status of the resource 

136 """ 

137 self.uri = uri 

138 self.enabled = enabled 

139 self.resource_id = resource_id 

140 message = f"{visibility.capitalize()} Resource already exists with URI: {uri}" 

141 logger.info(f"ResourceURIConflictError: {message}") 

142 if not enabled: 

143 message += f" (currently inactive, ID: {resource_id})" 

144 super().__init__(message) 

145 

146 

147class ResourceValidationError(ResourceError): 

148 """Raised when resource validation fails.""" 

149 

150 

151class ResourceLockConflictError(ResourceError): 

152 """Raised when a resource row is locked by another transaction. 

153 

154 Raises: 

155 ResourceLockConflictError: When attempting to modify a resource that is 

156 currently locked by another concurrent request. 

157 """ 

158 

159 

160class ResourceService(BaseService): 

161 """Service for managing resources. 

162 

163 Handles: 

164 - Resource registration and retrieval 

165 - Resource templates and URIs 

166 - Resource subscriptions 

167 - Content type detection 

168 - Active/inactive status management 

169 """ 

170 

171 _visibility_model_cls = DbResource 

172 

173 def __init__(self) -> None: 

174 """Initialize the resource service.""" 

175 self._event_service = EventService(channel_name="mcpgateway:resource_events") 

176 self._template_cache: Dict[str, ResourceTemplate] = {} 

177 self.oauth_manager = OAuthManager(request_timeout=int(os.getenv("OAUTH_REQUEST_TIMEOUT", "30")), max_retries=int(os.getenv("OAUTH_MAX_RETRIES", "3"))) 

178 

179 # Initialize plugin manager if plugins are enabled in settings 

180 self._plugin_manager = None 

181 if PLUGINS_AVAILABLE: 

182 try: 

183 self._plugin_manager = get_plugin_manager() 

184 if self._plugin_manager: 

185 logger.info("Plugin manager initialized for ResourceService with config: %s", settings.plugins.config_file) 

186 except Exception as e: 

187 logger.warning("Plugin manager initialization failed in ResourceService: %s", e) 

188 self._plugin_manager = None 

189 

190 # Initialize mime types 

191 mimetypes.init() 

192 

193 async def initialize(self) -> None: 

194 """Initialize the service.""" 

195 logger.info("Initializing resource service") 

196 await self._event_service.initialize() 

197 

198 async def shutdown(self) -> None: 

199 """Shutdown the service.""" 

200 # Clear subscriptions 

201 await self._event_service.shutdown() 

202 logger.info("Resource service shutdown complete") 

203 

204 async def get_top_resources(self, db: Session, limit: Optional[int] = 5, include_deleted: bool = False) -> List[TopPerformer]: 

205 """Retrieve the top-performing resources based on execution count. 

206 

207 Queries the database to get resources with their metrics, ordered by the number of executions 

208 in descending order. Combines recent raw metrics with historical hourly rollups for complete 

209 historical coverage. Uses the resource URI as the name field for TopPerformer objects. 

210 Returns a list of TopPerformer objects containing resource details and performance metrics. 

211 Results are cached for performance. 

212 

213 Args: 

214 db (Session): Database session for querying resource metrics. 

215 limit (Optional[int]): Maximum number of resources to return. Defaults to 5. 

216 include_deleted (bool): Whether to include deleted resources from rollups. 

217 

218 Returns: 

219 List[TopPerformer]: A list of TopPerformer objects, each containing: 

220 - id: Resource ID. 

221 - name: Resource URI (used as the name field). 

222 - execution_count: Total number of executions. 

223 - avg_response_time: Average response time in seconds, or None if no metrics. 

224 - success_rate: Success rate percentage, or None if no metrics. 

225 - last_execution: Timestamp of the last execution, or None if no metrics. 

226 """ 

227 # Check cache first (if enabled) 

228 # First-Party 

229 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel 

230 

231 effective_limit = limit or 5 

232 cache_key = f"top_resources:{effective_limit}:include_deleted={include_deleted}" 

233 

234 if is_cache_enabled(): 

235 cached = metrics_cache.get(cache_key) 

236 if cached is not None: 

237 return cached 

238 

239 # Use combined query that includes both raw metrics and rollup data 

240 # Use name_column="uri" to maintain backward compatibility (resources show URI as name) 

241 # First-Party 

242 from mcpgateway.services.metrics_query_service import get_top_performers_combined # pylint: disable=import-outside-toplevel 

243 

244 results = get_top_performers_combined( 

245 db=db, 

246 metric_type="resource", 

247 entity_model=DbResource, 

248 limit=effective_limit, 

249 name_column="uri", # Resources use URI as display name 

250 include_deleted=include_deleted, 

251 ) 

252 top_performers = build_top_performers(results) 

253 

254 # Cache the result (if enabled) 

255 if is_cache_enabled(): 

256 metrics_cache.set(cache_key, top_performers) 

257 

258 return top_performers 

259 

260 def convert_resource_to_read(self, resource: DbResource, include_metrics: bool = False) -> ResourceRead: 

261 """ 

262 Converts a DbResource instance into a ResourceRead model, optionally including aggregated metrics. 

263 

264 Args: 

265 resource (DbResource): The ORM instance of the resource. 

266 include_metrics (bool): Whether to include metrics in the result. Defaults to False. 

267 Set to False for list operations to avoid N+1 query issues. 

268 

269 Returns: 

270 ResourceRead: The Pydantic model representing the resource, optionally including aggregated metrics. 

271 

272 Examples: 

273 >>> from types import SimpleNamespace 

274 >>> from datetime import datetime, timezone 

275 >>> svc = ResourceService() 

276 >>> now = datetime.now(timezone.utc) 

277 >>> # Fake metrics 

278 >>> m1 = SimpleNamespace(is_success=True, response_time=0.1, timestamp=now) 

279 >>> m2 = SimpleNamespace(is_success=False, response_time=0.3, timestamp=now) 

280 >>> r = SimpleNamespace( 

281 ... id="ca627760127d409080fdefc309147e08", uri='res://x', name='R', description=None, mime_type='text/plain', size=123, 

282 ... created_at=now, updated_at=now, enabled=True, tags=[{"id": "t", "label": "T"}], metrics=[m1, m2] 

283 ... ) 

284 >>> out = svc.convert_resource_to_read(r, include_metrics=True) 

285 >>> out.metrics.total_executions 

286 2 

287 >>> out.metrics.successful_executions 

288 1 

289 """ 

290 resource_dict = resource.__dict__.copy() 

291 # Remove SQLAlchemy state and any pre-existing 'metrics' attribute 

292 resource_dict.pop("_sa_instance_state", None) 

293 resource_dict.pop("metrics", None) 

294 

295 # Ensure required base fields are present even if SQLAlchemy hasn't loaded them into __dict__ yet 

296 resource_dict["id"] = getattr(resource, "id", resource_dict.get("id")) 

297 resource_dict["uri"] = getattr(resource, "uri", resource_dict.get("uri")) 

298 resource_dict["name"] = getattr(resource, "name", resource_dict.get("name")) 

299 resource_dict["description"] = getattr(resource, "description", resource_dict.get("description")) 

300 resource_dict["mime_type"] = getattr(resource, "mime_type", resource_dict.get("mime_type")) 

301 resource_dict["size"] = getattr(resource, "size", resource_dict.get("size")) 

302 resource_dict["created_at"] = getattr(resource, "created_at", resource_dict.get("created_at")) 

303 resource_dict["updated_at"] = getattr(resource, "updated_at", resource_dict.get("updated_at")) 

304 resource_dict["is_active"] = getattr(resource, "is_active", resource_dict.get("is_active")) 

305 resource_dict["enabled"] = getattr(resource, "enabled", resource_dict.get("enabled")) 

306 

307 # Compute aggregated metrics from the resource's metrics list (only if requested) 

308 if include_metrics: 

309 total = len(resource.metrics) if hasattr(resource, "metrics") and resource.metrics is not None else 0 

310 successful = sum(1 for m in resource.metrics if m.is_success) if total > 0 else 0 

311 failed = sum(1 for m in resource.metrics if not m.is_success) if total > 0 else 0 

312 failure_rate = (failed / total) if total > 0 else 0.0 

313 min_rt = min((m.response_time for m in resource.metrics), default=None) if total > 0 else None 

314 max_rt = max((m.response_time for m in resource.metrics), default=None) if total > 0 else None 

315 avg_rt = (sum(m.response_time for m in resource.metrics) / total) if total > 0 else None 

316 last_time = max((m.timestamp for m in resource.metrics), default=None) if total > 0 else None 

317 

318 resource_dict["metrics"] = { 

319 "total_executions": total, 

320 "successful_executions": successful, 

321 "failed_executions": failed, 

322 "failure_rate": failure_rate, 

323 "min_response_time": min_rt, 

324 "max_response_time": max_rt, 

325 "avg_response_time": avg_rt, 

326 "last_execution_time": last_time, 

327 } 

328 else: 

329 resource_dict["metrics"] = None 

330 

331 raw_tags = resource.tags or [] 

332 normalized_tags = [] 

333 for tag in raw_tags: 

334 if isinstance(tag, str): 

335 normalized_tags.append(tag) 

336 continue 

337 if isinstance(tag, dict): 

338 label = tag.get("label") or tag.get("name") 

339 if label: 

340 normalized_tags.append(label) 

341 continue 

342 label = getattr(tag, "label", None) or getattr(tag, "name", None) 

343 if label: 

344 normalized_tags.append(label) 

345 resource_dict["tags"] = normalized_tags 

346 resource_dict["team"] = getattr(resource, "team", None) 

347 

348 # Include metadata fields for proper API response 

349 resource_dict["created_by"] = getattr(resource, "created_by", None) 

350 resource_dict["modified_by"] = getattr(resource, "modified_by", None) 

351 resource_dict["created_at"] = getattr(resource, "created_at", None) 

352 resource_dict["updated_at"] = getattr(resource, "updated_at", None) 

353 resource_dict["version"] = getattr(resource, "version", None) 

354 return ResourceRead.model_validate(resource_dict) 

355 

356 def _get_team_name(self, db: Session, team_id: Optional[str]) -> Optional[str]: 

357 """Retrieve the team name given a team ID. 

358 

359 Args: 

360 db (Session): Database session for querying teams. 

361 team_id (Optional[str]): The ID of the team. 

362 

363 Returns: 

364 Optional[str]: The name of the team if found, otherwise None. 

365 """ 

366 if not team_id: 

367 return None 

368 team = db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first() 

369 db.commit() # Release transaction to avoid idle-in-transaction 

370 return team.name if team else None 

371 

372 async def register_resource( 

373 self, 

374 db: Session, 

375 resource: ResourceCreate, 

376 created_by: Optional[str] = None, 

377 created_from_ip: Optional[str] = None, 

378 created_via: Optional[str] = None, 

379 created_user_agent: Optional[str] = None, 

380 import_batch_id: Optional[str] = None, 

381 federation_source: Optional[str] = None, 

382 team_id: Optional[str] = None, 

383 owner_email: Optional[str] = None, 

384 visibility: Optional[str] = "public", 

385 ) -> ResourceRead: 

386 """Register a new resource. 

387 

388 Args: 

389 db: Database session 

390 resource: Resource creation schema 

391 created_by: User who created the resource 

392 created_from_ip: IP address of the creator 

393 created_via: Method used to create the resource (e.g., API, UI) 

394 created_user_agent: User agent of the creator 

395 import_batch_id: Optional batch ID for bulk imports 

396 federation_source: Optional source of the resource if federated 

397 team_id (Optional[str]): Team ID to assign the resource to. 

398 owner_email (Optional[str]): Email of the user who owns this resource. 

399 visibility (str): Resource visibility level (private, team, public). 

400 

401 Returns: 

402 Created resource information 

403 

404 Raises: 

405 IntegrityError: If a database integrity error occurs. 

406 ResourceURIConflictError: If a resource with the same URI already exists. 

407 ResourceError: For other resource registration errors 

408 

409 Examples: 

410 >>> from mcpgateway.services.resource_service import ResourceService 

411 >>> from unittest.mock import MagicMock, AsyncMock 

412 >>> from mcpgateway.schemas import ResourceRead 

413 >>> service = ResourceService() 

414 >>> db = MagicMock() 

415 >>> resource = MagicMock() 

416 >>> db.execute.return_value.scalar_one_or_none.return_value = None 

417 >>> db.add = MagicMock() 

418 >>> db.commit = MagicMock() 

419 >>> db.refresh = MagicMock() 

420 >>> service._notify_resource_added = AsyncMock() 

421 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read') 

422 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read') 

423 >>> import asyncio 

424 >>> asyncio.run(service.register_resource(db, resource)) 

425 'resource_read' 

426 """ 

427 try: 

428 logger.info(f"Registering resource: {resource.uri}") 

429 

430 # Extract gateway_id from resource if present 

431 gateway_id = getattr(resource, "gateway_id", None) 

432 

433 # Check for existing server with the same uri 

434 if visibility.lower() == "public": 

435 logger.info(f"visibility:: {visibility}") 

436 # Check for existing public resource with the same uri and gateway_id 

437 existing_resource = db.execute(select(DbResource).where(DbResource.uri == resource.uri, DbResource.visibility == "public", DbResource.gateway_id == gateway_id)).scalar_one_or_none() 

438 if existing_resource: 

439 raise ResourceURIConflictError(resource.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility) 

440 elif visibility.lower() == "team" and team_id: 

441 # Check for existing team resource with the same uri and gateway_id 

442 existing_resource = db.execute( 

443 select(DbResource).where(DbResource.uri == resource.uri, DbResource.visibility == "team", DbResource.team_id == team_id, DbResource.gateway_id == gateway_id) 

444 ).scalar_one_or_none() 

445 if existing_resource: 

446 raise ResourceURIConflictError(resource.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility) 

447 

448 # Detect mime type if not provided 

449 mime_type = resource.mime_type 

450 if not mime_type: 

451 mime_type = self._detect_mime_type(resource.uri, resource.content) 

452 

453 # Determine content storage 

454 is_text = mime_type and mime_type.startswith("text/") or isinstance(resource.content, str) 

455 

456 # Create DB model 

457 db_resource = DbResource( 

458 uri=resource.uri, 

459 name=resource.name, 

460 description=resource.description, 

461 mime_type=mime_type, 

462 uri_template=resource.uri_template, 

463 text_content=resource.content if is_text else None, 

464 binary_content=(resource.content.encode() if is_text and isinstance(resource.content, str) else resource.content if isinstance(resource.content, bytes) else None), 

465 size=len(resource.content) if resource.content else 0, 

466 tags=resource.tags or [], 

467 created_by=created_by, 

468 created_from_ip=created_from_ip, 

469 created_via=created_via, 

470 created_user_agent=created_user_agent, 

471 import_batch_id=import_batch_id, 

472 federation_source=federation_source, 

473 version=1, 

474 # Team scoping fields - use schema values if provided, otherwise fallback to parameters 

475 team_id=getattr(resource, "team_id", None) or team_id, 

476 owner_email=getattr(resource, "owner_email", None) or owner_email or created_by, 

477 # Endpoint visibility parameter takes precedence over schema default 

478 visibility=visibility if visibility is not None else getattr(resource, "visibility", "public"), 

479 gateway_id=gateway_id, 

480 ) 

481 

482 # Add to DB 

483 db.add(db_resource) 

484 db.commit() 

485 db.refresh(db_resource) 

486 

487 # Notify subscribers 

488 await self._notify_resource_added(db_resource) 

489 

490 logger.info(f"Registered resource: {resource.uri}") 

491 

492 # Structured logging: Audit trail for resource creation 

493 audit_trail.log_action( 

494 user_id=created_by or "system", 

495 action="create_resource", 

496 resource_type="resource", 

497 resource_id=str(db_resource.id), 

498 resource_name=db_resource.name, 

499 user_email=owner_email, 

500 team_id=team_id, 

501 client_ip=created_from_ip, 

502 user_agent=created_user_agent, 

503 new_values={ 

504 "uri": db_resource.uri, 

505 "name": db_resource.name, 

506 "visibility": visibility, 

507 "mime_type": db_resource.mime_type, 

508 }, 

509 context={ 

510 "created_via": created_via, 

511 "import_batch_id": import_batch_id, 

512 "federation_source": federation_source, 

513 }, 

514 db=db, 

515 ) 

516 

517 # Structured logging: Log successful resource creation 

518 structured_logger.log( 

519 level="INFO", 

520 message="Resource created successfully", 

521 event_type="resource_created", 

522 component="resource_service", 

523 user_id=created_by, 

524 user_email=owner_email, 

525 team_id=team_id, 

526 resource_type="resource", 

527 resource_id=str(db_resource.id), 

528 custom_fields={ 

529 "resource_uri": db_resource.uri, 

530 "resource_name": db_resource.name, 

531 "visibility": visibility, 

532 }, 

533 ) 

534 

535 db_resource.team = self._get_team_name(db, db_resource.team_id) 

536 return self.convert_resource_to_read(db_resource) 

537 except IntegrityError as ie: 

538 logger.error(f"IntegrityErrors in group: {ie}") 

539 

540 # Structured logging: Log database integrity error 

541 structured_logger.log( 

542 level="ERROR", 

543 message="Resource creation failed due to database integrity error", 

544 event_type="resource_creation_failed", 

545 component="resource_service", 

546 user_id=created_by, 

547 user_email=owner_email, 

548 error=ie, 

549 custom_fields={ 

550 "resource_uri": resource.uri, 

551 }, 

552 ) 

553 raise ie 

554 except ResourceURIConflictError as rce: 

555 logger.error(f"ResourceURIConflictError in group: {resource.uri}") 

556 

557 # Structured logging: Log URI conflict error 

558 structured_logger.log( 

559 level="WARNING", 

560 message="Resource creation failed due to URI conflict", 

561 event_type="resource_uri_conflict", 

562 component="resource_service", 

563 user_id=created_by, 

564 user_email=owner_email, 

565 custom_fields={ 

566 "resource_uri": resource.uri, 

567 "visibility": visibility, 

568 }, 

569 ) 

570 raise rce 

571 except Exception as e: 

572 db.rollback() 

573 

574 # Structured logging: Log generic resource creation failure 

575 structured_logger.log( 

576 level="ERROR", 

577 message="Resource creation failed", 

578 event_type="resource_creation_failed", 

579 component="resource_service", 

580 user_id=created_by, 

581 user_email=owner_email, 

582 error=e, 

583 custom_fields={ 

584 "resource_uri": resource.uri, 

585 }, 

586 ) 

587 raise ResourceError(f"Failed to register resource: {str(e)}") 

588 

589 async def register_resources_bulk( 

590 self, 

591 db: Session, 

592 resources: List[ResourceCreate], 

593 created_by: Optional[str] = None, 

594 created_from_ip: Optional[str] = None, 

595 created_via: Optional[str] = None, 

596 created_user_agent: Optional[str] = None, 

597 import_batch_id: Optional[str] = None, 

598 federation_source: Optional[str] = None, 

599 team_id: Optional[str] = None, 

600 owner_email: Optional[str] = None, 

601 visibility: Optional[str] = "public", 

602 conflict_strategy: str = "skip", 

603 ) -> Dict[str, Any]: 

604 """Register multiple resources in bulk with a single commit. 

605 

606 This method provides significant performance improvements over individual 

607 resource registration by: 

608 - Using db.add_all() instead of individual db.add() calls 

609 - Performing a single commit for all resources 

610 - Batch conflict detection 

611 - Chunking for very large imports (>500 items) 

612 

613 Args: 

614 db: Database session 

615 resources: List of resource creation schemas 

616 created_by: Username who created these resources 

617 created_from_ip: IP address of creator 

618 created_via: Creation method (ui, api, import, federation) 

619 created_user_agent: User agent of creation request 

620 import_batch_id: UUID for bulk import operations 

621 federation_source: Source gateway for federated resources 

622 team_id: Team ID to assign the resources to 

623 owner_email: Email of the user who owns these resources 

624 visibility: Resource visibility level (private, team, public) 

625 conflict_strategy: How to handle conflicts (skip, update, rename, fail) 

626 

627 Returns: 

628 Dict with statistics: 

629 - created: Number of resources created 

630 - updated: Number of resources updated 

631 - skipped: Number of resources skipped 

632 - failed: Number of resources that failed 

633 - errors: List of error messages 

634 

635 Raises: 

636 ResourceError: If bulk registration fails critically 

637 

638 Examples: 

639 >>> from mcpgateway.services.resource_service import ResourceService 

640 >>> from unittest.mock import MagicMock 

641 >>> service = ResourceService() 

642 >>> db = MagicMock() 

643 >>> resources = [MagicMock(), MagicMock()] 

644 >>> import asyncio 

645 >>> try: 

646 ... result = asyncio.run(service.register_resources_bulk(db, resources)) 

647 ... except Exception: 

648 ... pass 

649 """ 

650 if not resources: 

651 return {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []} 

652 

653 stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []} 

654 

655 # Process in chunks to avoid memory issues and SQLite parameter limits 

656 chunk_size = 500 

657 

658 for chunk_start in range(0, len(resources), chunk_size): 

659 chunk = resources[chunk_start : chunk_start + chunk_size] 

660 

661 try: 

662 # Batch check for existing resources to detect conflicts 

663 resource_uris = [resource.uri for resource in chunk] 

664 

665 # Build base query conditions 

666 if visibility.lower() == "public": 

667 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "public"] 

668 elif visibility.lower() == "team" and team_id: 

669 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "team", DbResource.team_id == team_id] 

670 else: 

671 # Private resources - check by owner 

672 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "private", DbResource.owner_email == (owner_email or created_by)] 

673 

674 existing_resources_query = select(DbResource).where(*base_conditions) 

675 existing_resources = db.execute(existing_resources_query).scalars().all() 

676 # Use (uri, gateway_id) tuple as key for proper conflict detection with gateway_id scoping 

677 existing_resources_map = {(r.uri, r.gateway_id): r for r in existing_resources} 

678 

679 resources_to_add = [] 

680 resources_to_update = [] 

681 

682 for resource in chunk: 

683 try: 

684 # Use provided parameters or schema values 

685 resource_team_id = team_id if team_id is not None else getattr(resource, "team_id", None) 

686 resource_owner_email = owner_email or getattr(resource, "owner_email", None) or created_by 

687 resource_visibility = visibility if visibility is not None else getattr(resource, "visibility", "public") 

688 resource_gateway_id = getattr(resource, "gateway_id", None) 

689 

690 # Look up existing resource by (uri, gateway_id) tuple 

691 existing_resource = existing_resources_map.get((resource.uri, resource_gateway_id)) 

692 

693 if existing_resource: 

694 # Handle conflict based on strategy 

695 if conflict_strategy == "skip": 

696 stats["skipped"] += 1 

697 continue 

698 if conflict_strategy == "update": 

699 # Update existing resource 

700 existing_resource.name = resource.name 

701 existing_resource.description = resource.description 

702 existing_resource.mime_type = resource.mime_type 

703 existing_resource.size = getattr(resource, "size", None) 

704 existing_resource.uri_template = resource.uri_template 

705 existing_resource.tags = resource.tags or [] 

706 existing_resource.modified_by = created_by 

707 existing_resource.modified_from_ip = created_from_ip 

708 existing_resource.modified_via = created_via 

709 existing_resource.modified_user_agent = created_user_agent 

710 existing_resource.updated_at = datetime.now(timezone.utc) 

711 existing_resource.version = (existing_resource.version or 1) + 1 

712 

713 resources_to_update.append(existing_resource) 

714 stats["updated"] += 1 

715 elif conflict_strategy == "rename": 

716 # Create with renamed resource 

717 new_uri = f"{resource.uri}_imported_{int(datetime.now().timestamp())}" 

718 db_resource = DbResource( 

719 uri=new_uri, 

720 name=resource.name, 

721 description=resource.description, 

722 mime_type=resource.mime_type, 

723 size=getattr(resource, "size", None), 

724 uri_template=resource.uri_template, 

725 gateway_id=getattr(resource, "gateway_id", None), 

726 tags=resource.tags or [], 

727 created_by=created_by, 

728 created_from_ip=created_from_ip, 

729 created_via=created_via, 

730 created_user_agent=created_user_agent, 

731 import_batch_id=import_batch_id, 

732 federation_source=federation_source, 

733 version=1, 

734 team_id=resource_team_id, 

735 owner_email=resource_owner_email, 

736 visibility=resource_visibility, 

737 ) 

738 resources_to_add.append(db_resource) 

739 stats["created"] += 1 

740 elif conflict_strategy == "fail": 

741 stats["failed"] += 1 

742 stats["errors"].append(f"Resource URI conflict: {resource.uri}") 

743 continue 

744 else: 

745 # Create new resource 

746 db_resource = DbResource( 

747 uri=resource.uri, 

748 name=resource.name, 

749 description=resource.description, 

750 mime_type=resource.mime_type, 

751 size=getattr(resource, "size", None), 

752 uri_template=resource.uri_template, 

753 gateway_id=getattr(resource, "gateway_id", None), 

754 tags=resource.tags or [], 

755 created_by=created_by, 

756 created_from_ip=created_from_ip, 

757 created_via=created_via, 

758 created_user_agent=created_user_agent, 

759 import_batch_id=import_batch_id, 

760 federation_source=federation_source, 

761 version=1, 

762 team_id=resource_team_id, 

763 owner_email=resource_owner_email, 

764 visibility=resource_visibility, 

765 ) 

766 resources_to_add.append(db_resource) 

767 stats["created"] += 1 

768 

769 except Exception as e: 

770 stats["failed"] += 1 

771 stats["errors"].append(f"Failed to process resource {resource.uri}: {str(e)}") 

772 logger.warning(f"Failed to process resource {resource.uri} in bulk operation: {str(e)}") 

773 continue 

774 

775 # Bulk add new resources 

776 if resources_to_add: 

777 db.add_all(resources_to_add) 

778 

779 # Commit the chunk 

780 db.commit() 

781 

782 # Refresh resources for notifications and audit trail 

783 for db_resource in resources_to_add: 

784 db.refresh(db_resource) 

785 # Notify subscribers 

786 await self._notify_resource_added(db_resource) 

787 

788 # Log bulk audit trail entry 

789 if resources_to_add or resources_to_update: 

790 audit_trail.log_action( 

791 user_id=created_by or "system", 

792 action="bulk_create_resources" if resources_to_add else "bulk_update_resources", 

793 resource_type="resource", 

794 resource_id=import_batch_id or "bulk_operation", 

795 resource_name=f"Bulk operation: {len(resources_to_add)} created, {len(resources_to_update)} updated", 

796 user_email=owner_email, 

797 team_id=team_id, 

798 client_ip=created_from_ip, 

799 user_agent=created_user_agent, 

800 new_values={ 

801 "resources_created": len(resources_to_add), 

802 "resources_updated": len(resources_to_update), 

803 "visibility": visibility, 

804 }, 

805 context={ 

806 "created_via": created_via, 

807 "import_batch_id": import_batch_id, 

808 "federation_source": federation_source, 

809 "conflict_strategy": conflict_strategy, 

810 }, 

811 db=db, 

812 ) 

813 

814 logger.info(f"Bulk registered {len(resources_to_add)} resources, updated {len(resources_to_update)} resources in chunk") 

815 

816 except Exception as e: 

817 db.rollback() 

818 logger.error(f"Failed to process chunk in bulk resource registration: {str(e)}") 

819 stats["failed"] += len(chunk) 

820 stats["errors"].append(f"Chunk processing failed: {str(e)}") 

821 continue 

822 

823 # Final structured logging 

824 structured_logger.log( 

825 level="INFO", 

826 message="Bulk resource registration completed", 

827 event_type="resources_bulk_created", 

828 component="resource_service", 

829 user_id=created_by, 

830 user_email=owner_email, 

831 team_id=team_id, 

832 resource_type="resource", 

833 custom_fields={ 

834 "resources_created": stats["created"], 

835 "resources_updated": stats["updated"], 

836 "resources_skipped": stats["skipped"], 

837 "resources_failed": stats["failed"], 

838 "total_resources": len(resources), 

839 "visibility": visibility, 

840 "conflict_strategy": conflict_strategy, 

841 }, 

842 ) 

843 

844 return stats 

845 

846 async def _check_resource_access( 

847 self, 

848 db: Session, 

849 resource: DbResource, 

850 user_email: Optional[str], 

851 token_teams: Optional[List[str]], 

852 ) -> bool: 

853 """Check if user has access to a resource based on visibility rules. 

854 

855 Implements the same access control logic as list_resources() for consistency. 

856 

857 Args: 

858 db: Database session for team membership lookup if needed. 

859 resource: Resource ORM object with visibility, team_id, owner_email. 

860 user_email: Email of the requesting user (None = unauthenticated). 

861 token_teams: List of team IDs from token. 

862 - None = unrestricted admin access 

863 - [] = public-only token 

864 - [...] = team-scoped token 

865 

866 Returns: 

867 True if access is allowed, False otherwise. 

868 """ 

869 visibility = getattr(resource, "visibility", "public") 

870 resource_team_id = getattr(resource, "team_id", None) 

871 resource_owner_email = getattr(resource, "owner_email", None) 

872 

873 # Public resources are accessible by everyone 

874 if visibility == "public": 

875 return True 

876 

877 # Admin bypass: token_teams=None AND user_email=None means unrestricted admin 

878 # This happens when is_admin=True and no team scoping in token 

879 if token_teams is None and user_email is None: 

880 return True 

881 

882 # No user context (but not admin) = deny access to non-public resources 

883 if not user_email: 

884 return False 

885 

886 # Public-only tokens (empty teams array) can ONLY access public resources 

887 is_public_only_token = token_teams is not None and len(token_teams) == 0 

888 if is_public_only_token: 

889 return False # Already checked public above 

890 

891 # Owner can access their own private resources 

892 if visibility == "private" and resource_owner_email and resource_owner_email == user_email: 

893 return True 

894 

895 # Team resources: check team membership (matches list_resources behavior) 

896 if resource_team_id: 

897 # Use token_teams if provided, otherwise look up from DB 

898 if token_teams is not None: 

899 team_ids = token_teams 

900 else: 

901 if db is None: 

902 logger.warning("Missing database session for team-scoped resource access check") 

903 return False 

904 # First-Party 

905 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel 

906 

907 team_service = TeamManagementService(db) 

908 user_teams = await team_service.get_user_teams(user_email) 

909 team_ids = [team.id for team in user_teams] 

910 

911 # Team/public visibility allows access if user is in the team 

912 if visibility in ["team", "public"] and resource_team_id in team_ids: 

913 return True 

914 

915 return False 

916 

917 async def list_resources( 

918 self, 

919 db: Session, 

920 include_inactive: bool = False, 

921 cursor: Optional[str] = None, 

922 tags: Optional[List[str]] = None, 

923 limit: Optional[int] = None, 

924 page: Optional[int] = None, 

925 per_page: Optional[int] = None, 

926 user_email: Optional[str] = None, 

927 team_id: Optional[str] = None, 

928 visibility: Optional[str] = None, 

929 token_teams: Optional[List[str]] = None, 

930 ) -> Union[tuple[List[ResourceRead], Optional[str]], Dict[str, Any]]: 

931 """ 

932 Retrieve a list of registered resources from the database with pagination support. 

933 

934 This method retrieves resources from the database and converts them into a list 

935 of ResourceRead objects. It supports filtering out inactive resources based on the 

936 include_inactive parameter and cursor-based pagination. 

937 

938 Args: 

939 db (Session): The SQLAlchemy database session. 

940 include_inactive (bool): If True, include inactive resources in the result. 

941 Defaults to False. 

942 cursor (Optional[str], optional): An opaque cursor token for pagination. 

943 Opaque base64-encoded string containing last item's ID and created_at. 

944 tags (Optional[List[str]]): Filter resources by tags. If provided, only resources with at least one matching tag will be returned. 

945 limit (Optional[int]): Maximum number of resources to return. Use 0 for all resources (no limit). 

946 If not specified, uses pagination_default_page_size. 

947 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

948 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size. 

949 user_email (Optional[str]): User email for team-based access control. If None, no access control is applied. 

950 team_id (Optional[str]): Filter by specific team ID. Requires user_email for access validation. 

951 visibility (Optional[str]): Filter by visibility (private, team, public). 

952 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API token access 

953 where the token scope should be respected instead of the user's full team memberships. 

954 

955 Returns: 

956 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}} 

957 If cursor is provided or neither: tuple of (list of ResourceRead objects, next_cursor). 

958 

959 Examples: 

960 >>> from mcpgateway.services.resource_service import ResourceService 

961 >>> from unittest.mock import MagicMock 

962 >>> service = ResourceService() 

963 >>> db = MagicMock() 

964 >>> resource_read = MagicMock() 

965 >>> service.convert_resource_to_read = MagicMock(return_value=resource_read) 

966 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()] 

967 >>> import asyncio 

968 >>> resources, next_cursor = asyncio.run(service.list_resources(db)) 

969 >>> isinstance(resources, list) 

970 True 

971 

972 With tags filter: 

973 >>> db2 = MagicMock() 

974 >>> bind = MagicMock() 

975 >>> bind.dialect = MagicMock() 

976 >>> bind.dialect.name = "sqlite" # or "postgresql" / "mysql" 

977 >>> db2.get_bind.return_value = bind 

978 >>> db2.execute.return_value.scalars.return_value.all.return_value = [MagicMock()] 

979 >>> result2, _ = asyncio.run(service.list_resources(db2, tags=['api'])) 

980 >>> isinstance(result2, list) 

981 True 

982 """ 

983 # Check cache for first page only (cursor=None) 

984 # Skip caching when: 

985 # - user_email is provided (team-filtered results are user-specific) 

986 # - token_teams is set (scoped access, e.g., public-only or team-scoped tokens) 

987 # - page-based pagination is used 

988 # This prevents cache poisoning where admin results could leak to public-only requests 

989 cache = _get_registry_cache() 

990 if cursor is None and user_email is None and token_teams is None and page is None: 

991 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None, limit=limit) 

992 cached = await cache.get("resources", filters_hash) 

993 if cached is not None: 

994 # Reconstruct ResourceRead objects from cached dicts 

995 cached_resources = [ResourceRead.model_validate(r) for r in cached["resources"]] 

996 return (cached_resources, cached.get("next_cursor")) 

997 

998 # Build base query with ordering 

999 query = select(DbResource).where(DbResource.uri_template.is_(None)).order_by(desc(DbResource.created_at), desc(DbResource.id)) 

1000 

1001 # Apply active/inactive filter 

1002 if not include_inactive: 

1003 query = query.where(DbResource.enabled) 

1004 

1005 query = await self._apply_access_control(query, db, user_email, token_teams, team_id) 

1006 

1007 if visibility: 

1008 query = query.where(DbResource.visibility == visibility) 

1009 

1010 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats) 

1011 if tags: 

1012 query = query.where(json_contains_tag_expr(db, DbResource.tags, tags, match_any=True)) 

1013 

1014 # Use unified pagination helper - handles both page and cursor pagination 

1015 pag_result = await unified_paginate( 

1016 db=db, 

1017 query=query, 

1018 page=page, 

1019 per_page=per_page, 

1020 cursor=cursor, 

1021 limit=limit, 

1022 base_url="/admin/resources", # Used for page-based links 

1023 query_params={"include_inactive": include_inactive} if include_inactive else {}, 

1024 ) 

1025 

1026 next_cursor = None 

1027 # Extract servers based on pagination type 

1028 if page is not None: 

1029 # Page-based: pag_result is a dict 

1030 resources_db = pag_result["data"] 

1031 else: 

1032 # Cursor-based: pag_result is a tuple 

1033 resources_db, next_cursor = pag_result 

1034 

1035 # Fetch team names for the resources (common for both pagination types) 

1036 team_ids_set = {s.team_id for s in resources_db if s.team_id} 

1037 team_map = {} 

1038 if team_ids_set: 

1039 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(team_ids_set), EmailTeam.is_active.is_(True))).all() 

1040 team_map = {team.id: team.name for team in teams} 

1041 

1042 db.commit() # Release transaction to avoid idle-in-transaction 

1043 

1044 # Convert to ResourceRead (common for both pagination types) 

1045 result = [] 

1046 for s in resources_db: 

1047 try: 

1048 s.team = team_map.get(s.team_id) if s.team_id else None 

1049 result.append(self.convert_resource_to_read(s, include_metrics=False)) 

1050 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

1051 logger.exception(f"Failed to convert resource {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}") 

1052 # Continue with remaining resources instead of failing completely 

1053 # Return appropriate format based on pagination type 

1054 if page is not None: 

1055 # Page-based format 

1056 return { 

1057 "data": result, 

1058 "pagination": pag_result["pagination"], 

1059 "links": pag_result["links"], 

1060 } 

1061 

1062 # Cursor-based format 

1063 

1064 # Cache first page results - only for non-user-specific/non-scoped queries 

1065 # Must match the same conditions as cache lookup to prevent cache poisoning 

1066 if cursor is None and user_email is None and token_teams is None: 

1067 try: 

1068 cache_data = {"resources": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor} 

1069 await cache.set("resources", cache_data, filters_hash) 

1070 except AttributeError: 

1071 pass # Skip caching if result objects don't support model_dump (e.g., in doctests) 

1072 

1073 return (result, next_cursor) 

1074 

1075 async def list_resources_for_user( 

1076 self, db: Session, user_email: str, team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100 

1077 ) -> List[ResourceRead]: 

1078 """ 

1079 DEPRECATED: Use list_resources() with user_email parameter instead. 

1080 

1081 List resources user has access to with team filtering. 

1082 

1083 This method is maintained for backward compatibility but is no longer used. 

1084 New code should call list_resources() with user_email, team_id, and visibility parameters. 

1085 

1086 Args: 

1087 db: Database session 

1088 user_email: Email of the user requesting resources 

1089 team_id: Optional team ID to filter by specific team 

1090 visibility: Optional visibility filter (private, team, public) 

1091 include_inactive: Whether to include inactive resources 

1092 skip: Number of resources to skip for pagination 

1093 limit: Maximum number of resources to return 

1094 

1095 Returns: 

1096 List[ResourceRead]: Resources the user has access to 

1097 

1098 Examples: 

1099 >>> from unittest.mock import MagicMock 

1100 >>> import asyncio 

1101 >>> service = ResourceService() 

1102 >>> db = MagicMock() 

1103 >>> # Patch out TeamManagementService so it doesn't run real logic 

1104 >>> import mcpgateway.services.resource_service as _rs 

1105 >>> class FakeTeamService: 

1106 ... def __init__(self, db): pass 

1107 ... async def get_user_teams(self, email): return [] 

1108 >>> _rs.TeamManagementService = FakeTeamService 

1109 >>> # Force DB to return one fake row with a 'team' attribute 

1110 >>> class FakeResource: 

1111 ... team_id = None 

1112 >>> fake_resource = FakeResource() 

1113 >>> db.execute.return_value.scalars.return_value.all.return_value = [fake_resource] 

1114 >>> service.convert_resource_to_read = MagicMock(return_value="converted") 

1115 >>> asyncio.run(service.list_resources_for_user(db, "user@example.com")) 

1116 ['converted'] 

1117 

1118 Without team_id (default/public access): 

1119 >>> db2 = MagicMock() 

1120 >>> class FakeResource2: 

1121 ... team_id = None 

1122 >>> fake_resource2 = FakeResource2() 

1123 >>> db2.execute.return_value.scalars.return_value.all.return_value = [fake_resource2] 

1124 >>> service.convert_resource_to_read = MagicMock(return_value="converted2") 

1125 >>> out2 = asyncio.run(service.list_resources_for_user(db2, "user@example.com")) 

1126 >>> out2 

1127 ['converted2'] 

1128 """ 

1129 # First-Party 

1130 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel 

1131 

1132 # Build query following existing patterns from list_resources() 

1133 team_service = TeamManagementService(db) 

1134 user_teams = await team_service.get_user_teams(user_email) 

1135 team_ids = [team.id for team in user_teams] 

1136 

1137 # Build query following existing patterns from list_resources() 

1138 query = select(DbResource) 

1139 

1140 # Apply active/inactive filter 

1141 if not include_inactive: 

1142 query = query.where(DbResource.enabled) 

1143 

1144 if team_id: 

1145 if team_id not in team_ids: 

1146 return [] # No access to team 

1147 

1148 access_conditions = [] 

1149 # Filter by specific team 

1150 access_conditions.append(and_(DbResource.team_id == team_id, DbResource.visibility.in_(["team", "public"]))) 

1151 

1152 access_conditions.append(and_(DbResource.team_id == team_id, DbResource.owner_email == user_email)) 

1153 

1154 query = query.where(or_(*access_conditions)) 

1155 else: 

1156 # Get user's accessible teams 

1157 # Build access conditions following existing patterns 

1158 access_conditions = [] 

1159 # 1. User's personal resources (owner_email matches) 

1160 access_conditions.append(DbResource.owner_email == user_email) 

1161 # 2. Team resources where user is member 

1162 if team_ids: 

1163 access_conditions.append(and_(DbResource.team_id.in_(team_ids), DbResource.visibility.in_(["team", "public"]))) 

1164 # 3. Public resources (if visibility allows) 

1165 access_conditions.append(DbResource.visibility == "public") 

1166 

1167 query = query.where(or_(*access_conditions)) 

1168 

1169 # Apply visibility filter if specified 

1170 if visibility: 

1171 query = query.where(DbResource.visibility == visibility) 

1172 

1173 # Apply pagination following existing patterns 

1174 query = query.offset(skip).limit(limit) 

1175 

1176 resources = db.execute(query).scalars().all() 

1177 

1178 # Batch fetch team names to avoid N+1 queries 

1179 resource_team_ids = {r.team_id for r in resources if r.team_id} 

1180 team_map = {} 

1181 if resource_team_ids: 

1182 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(resource_team_ids), EmailTeam.is_active.is_(True))).all() 

1183 team_map = {str(team.id): team.name for team in teams} 

1184 

1185 db.commit() # Release transaction to avoid idle-in-transaction 

1186 

1187 result = [] 

1188 for t in resources: 

1189 try: 

1190 t.team = team_map.get(str(t.team_id)) if t.team_id else None 

1191 result.append(self.convert_resource_to_read(t, include_metrics=False)) 

1192 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

1193 logger.exception(f"Failed to convert resource {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}") 

1194 # Continue with remaining resources instead of failing completely 

1195 return result 

1196 

1197 async def list_server_resources( 

1198 self, 

1199 db: Session, 

1200 server_id: str, 

1201 include_inactive: bool = False, 

1202 user_email: Optional[str] = None, 

1203 token_teams: Optional[List[str]] = None, 

1204 ) -> List[ResourceRead]: 

1205 """ 

1206 Retrieve a list of registered resources from the database. 

1207 

1208 This method retrieves resources from the database and converts them into a list 

1209 of ResourceRead objects. It supports filtering out inactive resources based on the 

1210 include_inactive parameter. The cursor parameter is reserved for future pagination support 

1211 but is currently not implemented. 

1212 

1213 Args: 

1214 db (Session): The SQLAlchemy database session. 

1215 server_id (str): Server ID 

1216 include_inactive (bool): If True, include inactive resources in the result. 

1217 Defaults to False. 

1218 user_email (Optional[str]): User email for visibility filtering. If None, no filtering applied. 

1219 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API 

1220 token access where the token scope should be respected. 

1221 

1222 Returns: 

1223 List[ResourceRead]: A list of resources represented as ResourceRead objects. 

1224 

1225 Examples: 

1226 >>> from mcpgateway.services.resource_service import ResourceService 

1227 >>> from unittest.mock import MagicMock 

1228 >>> service = ResourceService() 

1229 >>> db = MagicMock() 

1230 >>> resource_read = MagicMock() 

1231 >>> service.convert_resource_to_read = MagicMock(return_value=resource_read) 

1232 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()] 

1233 >>> import asyncio 

1234 >>> result = asyncio.run(service.list_server_resources(db, 'server1')) 

1235 >>> isinstance(result, list) 

1236 True 

1237 >>> # Include inactive branch 

1238 >>> result = asyncio.run(service.list_server_resources(db, 'server1', include_inactive=True)) 

1239 >>> isinstance(result, list) 

1240 True 

1241 """ 

1242 logger.debug(f"Listing resources for server_id: {server_id}, include_inactive: {include_inactive}") 

1243 query = ( 

1244 select(DbResource) 

1245 .join(server_resource_association, DbResource.id == server_resource_association.c.resource_id) 

1246 .where(DbResource.uri_template.is_(None)) 

1247 .where(server_resource_association.c.server_id == server_id) 

1248 ) 

1249 if not include_inactive: 

1250 query = query.where(DbResource.enabled) 

1251 

1252 # Add visibility filtering if user context OR token_teams provided 

1253 # This ensures unauthenticated requests with token_teams=[] only see public resources 

1254 if user_email is not None or token_teams is not None: # empty-string user_email -> public-only filtering (secure default) 

1255 # Use token_teams if provided (for MCP/API token access), otherwise look up from DB 

1256 if token_teams is not None: 

1257 team_ids = token_teams 

1258 elif user_email: 

1259 # First-Party 

1260 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel 

1261 

1262 team_service = TeamManagementService(db) 

1263 user_teams = await team_service.get_user_teams(user_email) 

1264 team_ids = [team.id for team in user_teams] 

1265 else: 

1266 team_ids = [] 

1267 

1268 # Check if this is a public-only token (empty teams array) 

1269 # Public-only tokens can ONLY see public resources - no owner access 

1270 is_public_only_token = token_teams is not None and len(token_teams) == 0 

1271 

1272 access_conditions = [ 

1273 DbResource.visibility == "public", 

1274 ] 

1275 # Only include owner access for non-public-only tokens with user_email 

1276 if not is_public_only_token and user_email: 

1277 access_conditions.append(DbResource.owner_email == user_email) 

1278 if team_ids: 

1279 access_conditions.append(and_(DbResource.team_id.in_(team_ids), DbResource.visibility.in_(["team", "public"]))) 

1280 query = query.where(or_(*access_conditions)) 

1281 

1282 # Cursor-based pagination logic can be implemented here in the future. 

1283 resources = db.execute(query).scalars().all() 

1284 

1285 # Batch fetch team names to avoid N+1 queries 

1286 resource_team_ids = {r.team_id for r in resources if r.team_id} 

1287 team_map = {} 

1288 if resource_team_ids: 

1289 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(resource_team_ids), EmailTeam.is_active.is_(True))).all() 

1290 team_map = {str(team.id): team.name for team in teams} 

1291 

1292 db.commit() # Release transaction to avoid idle-in-transaction 

1293 

1294 result = [] 

1295 for t in resources: 

1296 try: 

1297 t.team = team_map.get(str(t.team_id)) if t.team_id else None 

1298 result.append(self.convert_resource_to_read(t, include_metrics=False)) 

1299 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

1300 logger.exception(f"Failed to convert resource {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}") 

1301 # Continue with remaining resources instead of failing completely 

1302 return result 

1303 

1304 async def _record_resource_metric(self, db: Session, resource: DbResource, start_time: float, success: bool, error_message: Optional[str]) -> None: 

1305 """ 

1306 Records a metric for a resource access. 

1307 

1308 Args: 

1309 db: Database session 

1310 resource: The resource that was accessed 

1311 start_time: Monotonic start time of the access 

1312 success: True if successful, False otherwise 

1313 error_message: Error message if failed, None otherwise 

1314 """ 

1315 end_time = time.monotonic() 

1316 response_time = end_time - start_time 

1317 

1318 metric = ResourceMetric( 

1319 resource_id=resource.id, 

1320 response_time=response_time, 

1321 is_success=success, 

1322 error_message=error_message, 

1323 ) 

1324 db.add(metric) 

1325 db.commit() 

1326 

1327 async def _record_invoke_resource_metric(self, db: Session, resource_id: str, start_time: float, success: bool, error_message: Optional[str]) -> None: 

1328 """ 

1329 Records a metric for invoking resource. 

1330 

1331 Args: 

1332 db: Database Session 

1333 resource_id: unique identifier to access & invoke resource 

1334 start_time: Monotonic start time of the access 

1335 success: True if successful, False otherwise 

1336 error_message: Error message if failed, None otherwise 

1337 """ 

1338 end_time = time.monotonic() 

1339 response_time = end_time - start_time 

1340 

1341 metric = ResourceMetric( 

1342 resource_id=resource_id, 

1343 response_time=response_time, 

1344 is_success=success, 

1345 error_message=error_message, 

1346 ) 

1347 db.add(metric) 

1348 db.commit() 

1349 

1350 def create_ssl_context(self, ca_certificate: str) -> ssl.SSLContext: 

1351 """Create an SSL context with the provided CA certificate. 

1352 

1353 Uses caching to avoid repeated SSL context creation for the same certificate. 

1354 

1355 Args: 

1356 ca_certificate: CA certificate in PEM format 

1357 

1358 Returns: 

1359 ssl.SSLContext: Configured SSL context 

1360 """ 

1361 return get_cached_ssl_context(ca_certificate) 

1362 

1363 async def invoke_resource( # pylint: disable=unused-argument 

1364 self, 

1365 db: Session, 

1366 resource_id: str, 

1367 resource_uri: str, 

1368 resource_template_uri: Optional[str] = None, 

1369 user_identity: Optional[Union[str, Dict[str, Any]]] = None, 

1370 meta_data: Optional[Dict[str, Any]] = None, # Reserved for future MCP SDK support 

1371 resource_obj: Optional[Any] = None, 

1372 gateway_obj: Optional[Any] = None, 

1373 server_id: Optional[str] = None, 

1374 ) -> Any: 

1375 """ 

1376 Invoke a resource via its configured gateway using SSE or StreamableHTTP transport. 

1377 

1378 This method determines the correct URI to invoke, loads the associated resource 

1379 and gateway from the database, validates certificates if applicable, prepares 

1380 authentication headers (OAuth, header-based, or none), and then connects to 

1381 the gateway to read the resource using the appropriate transport. 

1382 

1383 The function supports: 

1384 - CA certificate validation / SSL context creation 

1385 - OAuth client-credentials and authorization-code flow 

1386 - Header-based auth 

1387 - SSE transport gateways 

1388 - StreamableHTTP transport gateways 

1389 

1390 Args: 

1391 db (Session): 

1392 SQLAlchemy session for retrieving resource and gateway information. 

1393 resource_id (str): 

1394 ID of the resource to invoke. 

1395 resource_uri (str): 

1396 Direct resource URI configured for the resource. 

1397 resource_template_uri (Optional[str]): 

1398 URI from the template. Overrides `resource_uri` when provided. 

1399 user_identity (Optional[Union[str, Dict[str, Any]]]): 

1400 Identity of the user making the request, used for session pool isolation. 

1401 Can be a string (email) or a dict with an 'email' key. 

1402 Defaults to "anonymous" for pool isolation if not provided. 

1403 OAuth Authorization Code token lookup uses this user identity. 

1404 meta_data (Optional[Dict[str, Any]]): 

1405 Additional metadata to pass to the gateway during invocation. 

1406 resource_obj (Optional[Any]): 

1407 Pre-fetched DbResource object to skip the internal resource lookup query. 

1408 gateway_obj (Optional[Any]): 

1409 Pre-fetched DbGateway object to skip the internal gateway lookup query. 

1410 server_id (Optional[str]): 

1411 Virtual server ID for server metrics recording. When provided, indicates 

1412 the resource was invoked through a specific virtual server endpoint. 

1413 Direct resource calls (e.g., from admin UI) should pass None. 

1414 

1415 Returns: 

1416 Any: The text content returned by the remote resource, or ``None`` if the 

1417 gateway could not be contacted or an error occurred. 

1418 

1419 Raises: 

1420 Exception: Any unhandled internal errors (e.g., DB issues). 

1421 

1422 --- 

1423 Doctest Examples 

1424 ---------------- 

1425 

1426 >>> class FakeDB: 

1427 ... "Simple DB stub returning fake resource and gateway rows." 

1428 ... def execute(self, query): 

1429 ... class Result: 

1430 ... def scalar_one_or_none(self): 

1431 ... # Return fake objects with the needed attributes 

1432 ... class FakeResource: 

1433 ... id = "res123" 

1434 ... name = "Demo Resource" 

1435 ... gateway_id = "gw1" 

1436 ... return FakeResource() 

1437 ... return Result() 

1438 

1439 >>> class FakeGateway: 

1440 ... id = "gw1" 

1441 ... name = "Fake Gateway" 

1442 ... url = "https://fake.gateway" 

1443 ... ca_certificate = None 

1444 ... ca_certificate_sig = None 

1445 ... transport = "sse" 

1446 ... auth_type = None 

1447 ... auth_value = {} 

1448 

1449 >>> # Monkeypatch the DB lookup for gateway 

1450 >>> def fake_execute_gateway(self, query): 

1451 ... class Result: 

1452 ... def scalar_one_or_none(self_inner): 

1453 ... return FakeGateway() 

1454 ... return Result() 

1455 

1456 >>> FakeDB.execute_gateway = fake_execute_gateway 

1457 

1458 >>> class FakeService: 

1459 ... "Service stub replacing network calls with predictable outputs." 

1460 ... async def invoke_resource(self, db, resource_id, resource_uri, resource_template_uri=None): 

1461 ... # Represent the behavior of a successful SSE response. 

1462 ... return "hello from gateway" 

1463 

1464 >>> svc = FakeService() 

1465 >>> import asyncio 

1466 >>> asyncio.run(svc.invoke_resource(FakeDB(), "res123", "/test")) 

1467 'hello from gateway' 

1468 

1469 --- 

1470 Example: Template URI overrides resource URI 

1471 -------------------------------------------- 

1472 

1473 >>> class FakeService2(FakeService): 

1474 ... async def invoke_resource(self, db, resource_id, resource_uri, resource_template_uri=None): 

1475 ... if resource_template_uri: 

1476 ... return f"using template: {resource_template_uri}" 

1477 ... return f"using direct: {resource_uri}" 

1478 

1479 >>> svc2 = FakeService2() 

1480 >>> asyncio.run(svc2.invoke_resource(FakeDB(), "res123", "/direct", "/template")) 

1481 'using template: /template' 

1482 

1483 """ 

1484 uri = None 

1485 if resource_uri and resource_template_uri: 

1486 uri = resource_template_uri 

1487 elif resource_uri: 

1488 uri = resource_uri 

1489 

1490 logger.info(f"Invoking the resource: {uri}") 

1491 gateway_id = None 

1492 # Use pre-fetched resource if provided (avoids Q5 re-fetch) 

1493 resource_info = resource_obj 

1494 if resource_info is None: 

1495 resource_info = db.execute(select(DbResource).where(DbResource.id == resource_id)).scalar_one_or_none() 

1496 

1497 # Release transaction immediately after resource lookup to prevent idle-in-transaction 

1498 # This is especially important when resource isn't found - we don't want to hold the transaction 

1499 db.commit() 

1500 

1501 # Normalize user_identity to string for session pool isolation. 

1502 if isinstance(user_identity, dict): 

1503 pool_user_identity = user_identity.get("email") or "anonymous" 

1504 elif isinstance(user_identity, str): 

1505 pool_user_identity = user_identity 

1506 else: 

1507 pool_user_identity = "anonymous" 

1508 

1509 oauth_user_email: Optional[str] = None 

1510 if isinstance(user_identity, dict): 

1511 user_email_value = user_identity.get("email") 

1512 if isinstance(user_email_value, str): 

1513 oauth_user_email = user_email_value.strip().lower() or None 

1514 elif isinstance(user_identity, str): 

1515 oauth_user_email = user_identity.strip().lower() or None 

1516 

1517 if resource_info: 

1518 gateway_id = getattr(resource_info, "gateway_id", None) 

1519 resource_name = getattr(resource_info, "name", None) 

1520 # Use pre-fetched gateway if provided (avoids Q6 re-fetch) 

1521 gateway = gateway_obj 

1522 if gateway is None and gateway_id: 

1523 gateway = db.execute(select(DbGateway).where(DbGateway.id == gateway_id)).scalar_one_or_none() 

1524 

1525 # ═══════════════════════════════════════════════════════════════════════════ 

1526 # CRITICAL: Release DB transaction immediately after fetching resource/gateway 

1527 # This ensures the transaction doesn't stay open if there's no gateway 

1528 # or if the function returns early for any reason. 

1529 # ═══════════════════════════════════════════════════════════════════════════ 

1530 db.commit() 

1531 

1532 if gateway: 

1533 

1534 start_time = time.monotonic() 

1535 success = False 

1536 error_message = None 

1537 

1538 # Create database span for observability dashboard 

1539 trace_id = current_trace_id.get() 

1540 db_span_id = None 

1541 db_span_ended = False 

1542 observability_service = ObservabilityService() if trace_id else None 

1543 

1544 if trace_id and observability_service: 

1545 try: 

1546 db_span_id = observability_service.start_span( 

1547 db=db, 

1548 trace_id=trace_id, 

1549 name="invoke.resource", 

1550 attributes={ 

1551 "resource.name": resource_name if resource_name else "unknown", 

1552 "resource.id": str(resource_id) if resource_id else "unknown", 

1553 "resource.uri": str(uri) or "unknown", 

1554 "gateway.transport": getattr(gateway, "transport") or "uknown", 

1555 "gateway.url": getattr(gateway, "url") or "unknown", 

1556 }, 

1557 ) 

1558 logger.debug(f"✓ Created resource.read span: {db_span_id} for resource: {resource_id} & {uri}") 

1559 except Exception as e: 

1560 logger.warning(f"Failed to start the observability span for invoking resource: {e}") 

1561 db_span_id = None 

1562 

1563 with create_span( 

1564 "invoke.resource", 

1565 { 

1566 "resource.name": resource_name if resource_name else "unknown", 

1567 "resource.id": str(resource_id) if resource_id else "unknown", 

1568 "resource.uri": str(uri) or "unknown", 

1569 "gateway.transport": getattr(gateway, "transport") or "uknown", 

1570 "gateway.url": getattr(gateway, "url") or "unknown", 

1571 }, 

1572 ) as span: 

1573 valid = False 

1574 if gateway.ca_certificate: 

1575 if settings.enable_ed25519_signing: 

1576 public_key_pem = settings.ed25519_public_key 

1577 valid = validate_signature(gateway.ca_certificate.encode(), gateway.ca_certificate_sig, public_key_pem) 

1578 else: 

1579 valid = True 

1580 

1581 if valid: 

1582 ssl_context = self.create_ssl_context(gateway.ca_certificate) 

1583 else: 

1584 ssl_context = None 

1585 

1586 def _get_httpx_client_factory( 

1587 headers: dict[str, str] | None = None, 

1588 timeout: httpx.Timeout | None = None, 

1589 auth: httpx.Auth | None = None, 

1590 ) -> httpx.AsyncClient: 

1591 """Factory function to create httpx.AsyncClient with optional CA certificate. 

1592 

1593 Args: 

1594 headers: Optional headers for the client 

1595 timeout: Optional timeout for the client 

1596 auth: Optional auth for the client 

1597 

1598 Returns: 

1599 httpx.AsyncClient: Configured HTTPX async client 

1600 """ 

1601 # First-Party 

1602 from mcpgateway.services.http_client_service import get_default_verify, get_http_timeout # pylint: disable=import-outside-toplevel 

1603 

1604 return httpx.AsyncClient( 

1605 verify=ssl_context if ssl_context else get_default_verify(), # pylint: disable=cell-var-from-loop 

1606 follow_redirects=True, 

1607 headers=headers, 

1608 timeout=timeout if timeout else get_http_timeout(), 

1609 auth=auth, 

1610 limits=httpx.Limits( 

1611 max_connections=settings.httpx_max_connections, 

1612 max_keepalive_connections=settings.httpx_max_keepalive_connections, 

1613 keepalive_expiry=settings.httpx_keepalive_expiry, 

1614 ), 

1615 ) 

1616 

1617 try: 

1618 # ═══════════════════════════════════════════════════════════════════════════ 

1619 # Extract gateway data to local variables BEFORE OAuth handling 

1620 # OAuth client_credentials flow makes network calls, so we must release 

1621 # the DB transaction first to prevent idle-in-transaction during network I/O 

1622 # ═══════════════════════════════════════════════════════════════════════════ 

1623 gateway_url = gateway.url 

1624 gateway_transport = gateway.transport 

1625 gateway_auth_type = gateway.auth_type 

1626 gateway_auth_value = gateway.auth_value 

1627 gateway_oauth_config = gateway.oauth_config 

1628 gateway_name = gateway.name 

1629 gateway_auth_query_params = getattr(gateway, "auth_query_params", None) 

1630 

1631 # Apply query param auth to URL if applicable 

1632 auth_query_params_decrypted: Optional[Dict[str, str]] = None 

1633 if gateway_auth_type == "query_param" and gateway_auth_query_params: 

1634 auth_query_params_decrypted = {} 

1635 for param_key, encrypted_value in gateway_auth_query_params.items(): 

1636 if encrypted_value: 

1637 try: 

1638 decrypted = decode_auth(encrypted_value) 

1639 auth_query_params_decrypted[param_key] = decrypted.get(param_key, "") 

1640 except Exception: # noqa: S110 - intentionally skip failed decryptions 

1641 # Silently skip params that fail decryption (corrupted or old key) 

1642 logger.debug(f"Failed to decrypt query param '{param_key}' for resource") 

1643 if auth_query_params_decrypted: 

1644 gateway_url = apply_query_param_auth(gateway_url, auth_query_params_decrypted) 

1645 

1646 # ═══════════════════════════════════════════════════════════════════════════ 

1647 # CRITICAL: Release DB connection back to pool BEFORE making HTTP/OAuth calls 

1648 # This prevents connection pool exhaustion during slow upstream requests. 

1649 # OAuth client_credentials flow makes network calls to token endpoints. 

1650 # All needed data has been extracted to local variables above. 

1651 # ═══════════════════════════════════════════════════════════════════════════ 

1652 db.commit() # End read-only transaction cleanly 

1653 db.close() 

1654 

1655 # Handle different authentication types (AFTER DB release) 

1656 headers = {} 

1657 if gateway_auth_type == "oauth" and gateway_oauth_config: 

1658 grant_type = gateway_oauth_config.get("grant_type", "client_credentials") 

1659 

1660 if grant_type == "authorization_code": 

1661 # For Authorization Code flow, try to get stored tokens 

1662 try: 

1663 # First-Party 

1664 from mcpgateway.services.token_storage_service import TokenStorageService # pylint: disable=import-outside-toplevel 

1665 

1666 # Use fresh DB session for token lookup (original db was closed) 

1667 access_token = None 

1668 if oauth_user_email: 

1669 with fresh_db_session() as token_db: 

1670 token_storage = TokenStorageService(token_db) 

1671 access_token = await token_storage.get_user_token(gateway_id, oauth_user_email) 

1672 

1673 if access_token: 

1674 headers["Authorization"] = f"Bearer {access_token}" 

1675 else: 

1676 if span: 

1677 span.set_attribute("health.status", "unhealthy") 

1678 span.set_attribute("error.message", "No valid OAuth token for user") 

1679 

1680 except Exception as e: 

1681 logger.error(f"Failed to obtain stored OAuth token for gateway {gateway_name}: {e}") 

1682 if span: 

1683 span.set_attribute("health.status", "unhealthy") 

1684 span.set_attribute("error.message", "Failed to obtain stored OAuth token") 

1685 else: 

1686 # For Client Credentials flow, get token directly (makes network calls) 

1687 try: 

1688 access_token: str = await self.oauth_manager.get_access_token(gateway_oauth_config) 

1689 headers["Authorization"] = f"Bearer {access_token}" 

1690 except Exception as e: 

1691 if span: 

1692 span.set_attribute("health.status", "unhealthy") 

1693 span.set_attribute("error.message", str(e)) 

1694 else: 

1695 # Handle non-OAuth authentication (existing logic) 

1696 auth_data = gateway_auth_value or {} 

1697 if isinstance(auth_data, str): 

1698 headers = decode_auth(auth_data) 

1699 elif isinstance(auth_data, dict): 

1700 headers = {str(k): str(v) for k, v in auth_data.items()} 

1701 else: 

1702 headers = {} 

1703 

1704 async def connect_to_sse_session(server_url: str, uri: str, authentication: Optional[Dict[str, str]] = None) -> str | None: 

1705 """ 

1706 Connect to an SSE-based gateway and retrieve the text content of a resource. 

1707 

1708 This helper establishes an SSE (Server-Sent Events) session with the remote 

1709 gateway, initializes a `ClientSession`, invokes `read_resource()` for the 

1710 given URI, and returns the textual content from the first item in the 

1711 response's `contents` list. 

1712 

1713 If any error occurs (network failure, unexpected response format, session 

1714 initialization failure, etc.), the method logs the exception and returns 

1715 ``None`` instead of raising. 

1716 

1717 Note: 

1718 MCP SDK 1.25.0 read_resource() does not support meta parameter. 

1719 When the SDK adds support, meta_data can be added back here. 

1720 

1721 Args: 

1722 server_url (str): 

1723 The base URL of the SSE gateway to connect to. 

1724 uri (str): 

1725 The resource URI that should be requested from the gateway. 

1726 authentication (Optional[Dict[str, str]]): 

1727 Optional dictionary of headers (e.g., OAuth Bearer tokens) to 

1728 include in the SSE connection request. Defaults to an empty 

1729 dictionary when not provided. 

1730 

1731 Returns: 

1732 str | None: 

1733 The text content returned by the remote resource, or ``None`` if the 

1734 SSE connection fails or the response is invalid. 

1735 

1736 Notes: 

1737 - This function assumes the SSE client context manager yields: 

1738 ``(read_stream, write_stream, get_session_id)``. 

1739 - The expected response object from `session.read_resource()` must have a 

1740 `contents` attribute containing a list, where the first element has a 

1741 `text` attribute. 

1742 """ 

1743 if authentication is None: 

1744 authentication = {} 

1745 try: 

1746 # Use session pool if enabled for 10-20x latency improvement 

1747 use_pool = False 

1748 pool = None 

1749 if settings.mcp_session_pool_enabled: 

1750 try: 

1751 pool = get_mcp_session_pool() 

1752 use_pool = True 

1753 except RuntimeError: 

1754 # Pool not initialized (e.g., in tests), fall back to per-call sessions 

1755 pass 

1756 

1757 if use_pool and pool is not None: 

1758 async with pool.session( 

1759 url=server_url, 

1760 headers=authentication, 

1761 transport_type=TransportType.SSE, 

1762 httpx_client_factory=_get_httpx_client_factory, 

1763 user_identity=pool_user_identity, 

1764 gateway_id=gateway_id, 

1765 ) as pooled: 

1766 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter 

1767 resource_response = await pooled.session.read_resource(uri=uri) 

1768 return getattr(getattr(resource_response, "contents")[0], "text") 

1769 else: 

1770 # Fallback to per-call sessions when pool disabled or not initialized 

1771 async with sse_client(url=server_url, headers=authentication, timeout=settings.health_check_timeout, httpx_client_factory=_get_httpx_client_factory) as ( 

1772 read_stream, 

1773 write_stream, 

1774 ): 

1775 async with ClientSession(read_stream, write_stream) as session: 

1776 _ = await session.initialize() 

1777 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter 

1778 resource_response = await session.read_resource(uri=uri) 

1779 return getattr(getattr(resource_response, "contents")[0], "text") 

1780 except Exception as e: 

1781 # Sanitize error message to prevent URL secrets from leaking in logs 

1782 sanitized_error = sanitize_exception_message(str(e), auth_query_params_decrypted) 

1783 logger.debug(f"Exception while connecting to sse gateway: {sanitized_error}") 

1784 return None 

1785 

1786 async def connect_to_streamablehttp_server(server_url: str, uri: str, authentication: Optional[Dict[str, str]] = None) -> str | None: 

1787 """ 

1788 Connect to a StreamableHTTP gateway and retrieve the text content of a resource. 

1789 

1790 This helper establishes a StreamableHTTP client session with the specified 

1791 gateway, initializes a `ClientSession`, invokes `read_resource()` for the 

1792 given URI, and returns the textual content from the first element in the 

1793 response's `contents` list. 

1794 

1795 If any exception occurs during connection, session initialization, or 

1796 resource reading, the function logs the error and returns ``None`` instead 

1797 of propagating the exception. 

1798 

1799 Note: 

1800 MCP SDK 1.25.0 read_resource() does not support meta parameter. 

1801 When the SDK adds support, meta_data can be added back here. 

1802 

1803 Args: 

1804 server_url (str): 

1805 The endpoint URL of the StreamableHTTP gateway. 

1806 uri (str): 

1807 The resource URI to request from the gateway. 

1808 authentication (Optional[Dict[str, str]]): 

1809 Optional dictionary of authentication headers (e.g., API keys or 

1810 Bearer tokens). Defaults to an empty dictionary when not provided. 

1811 

1812 Returns: 

1813 str | None: 

1814 The text content returned by the StreamableHTTP resource, or ``None`` 

1815 if the connection fails or the response format is invalid. 

1816 

1817 Notes: 

1818 - The `streamablehttp_client` context manager must yield a tuple: 

1819 ``(read_stream, write_stream, get_session_id)``. 

1820 - The expected `resource_response` returned by ``session.read_resource()`` 

1821 must contain a `contents` list, whose first element exposes a `text` 

1822 attribute. 

1823 """ 

1824 if authentication is None: 

1825 authentication = {} 

1826 try: 

1827 # Use session pool if enabled for 10-20x latency improvement 

1828 use_pool = False 

1829 pool = None 

1830 if settings.mcp_session_pool_enabled: 

1831 try: 

1832 pool = get_mcp_session_pool() 

1833 use_pool = True 

1834 except RuntimeError: 

1835 # Pool not initialized (e.g., in tests), fall back to per-call sessions 

1836 pass 

1837 

1838 if use_pool and pool is not None: 

1839 async with pool.session( 

1840 url=server_url, 

1841 headers=authentication, 

1842 transport_type=TransportType.STREAMABLE_HTTP, 

1843 httpx_client_factory=_get_httpx_client_factory, 

1844 user_identity=pool_user_identity, 

1845 gateway_id=gateway_id, 

1846 ) as pooled: 

1847 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter 

1848 resource_response = await pooled.session.read_resource(uri=uri) 

1849 return getattr(getattr(resource_response, "contents")[0], "text") 

1850 else: 

1851 # Fallback to per-call sessions when pool disabled or not initialized 

1852 async with streamablehttp_client(url=server_url, headers=authentication, timeout=settings.health_check_timeout, httpx_client_factory=_get_httpx_client_factory) as ( 

1853 read_stream, 

1854 write_stream, 

1855 _get_session_id, 

1856 ): 

1857 async with ClientSession(read_stream, write_stream) as session: 

1858 _ = await session.initialize() 

1859 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter 

1860 resource_response = await session.read_resource(uri=uri) 

1861 return getattr(getattr(resource_response, "contents")[0], "text") 

1862 except Exception as e: 

1863 # Sanitize error message to prevent URL secrets from leaking in logs 

1864 sanitized_error = sanitize_exception_message(str(e), auth_query_params_decrypted) 

1865 logger.debug(f"Exception while connecting to streamablehttp gateway: {sanitized_error}") 

1866 return None 

1867 

1868 if span: 

1869 span.set_attribute("success", True) 

1870 span.set_attribute("duration.ms", (time.monotonic() - start_time) * 1000) 

1871 

1872 resource_text = "" 

1873 if (gateway_transport).lower() == "sse": 

1874 # Note: meta_data not passed - MCP SDK 1.25.0 read_resource() doesn't support it 

1875 resource_text = await connect_to_sse_session(server_url=gateway_url, authentication=headers, uri=uri) 

1876 else: 

1877 # Note: meta_data not passed - MCP SDK 1.25.0 read_resource() doesn't support it 

1878 resource_text = await connect_to_streamablehttp_server(server_url=gateway_url, authentication=headers, uri=uri) 

1879 success = True # Mark as successful before returning 

1880 return resource_text 

1881 except Exception as e: 

1882 success = False 

1883 error_message = str(e) 

1884 raise 

1885 finally: 

1886 # Metrics are now recorded only in read_resource finally block 

1887 # This eliminates duplicate metrics and provides a single source of truth 

1888 # End Invoke resource span for Observability dashboard 

1889 # NOTE: Use fresh_db_session() since the original db was released 

1890 # before making HTTP calls to prevent connection pool exhaustion 

1891 if db_span_id and observability_service and not db_span_ended: 

1892 try: 

1893 with fresh_db_session() as fresh_db: 

1894 observability_service.end_span( 

1895 db=fresh_db, 

1896 span_id=db_span_id, 

1897 status="ok" if success else "error", 

1898 status_message=error_message if error_message else None, 

1899 ) 

1900 db_span_ended = True 

1901 logger.debug(f"✓ Ended invoke.resource span: {db_span_id}") 

1902 except Exception as e: 

1903 logger.warning(f"Failed to end observability span for invoking resource: {e}") 

1904 

1905 async def read_resource( 

1906 self, 

1907 db: Session, 

1908 resource_id: Optional[Union[int, str]] = None, 

1909 resource_uri: Optional[str] = None, 

1910 request_id: Optional[str] = None, 

1911 user: Optional[str] = None, 

1912 server_id: Optional[str] = None, 

1913 include_inactive: bool = False, 

1914 token_teams: Optional[List[str]] = None, 

1915 plugin_context_table: Optional[PluginContextTable] = None, 

1916 plugin_global_context: Optional[GlobalContext] = None, 

1917 meta_data: Optional[Dict[str, Any]] = None, 

1918 ) -> Union[ResourceContent, ResourceContents]: 

1919 """Read a resource's content with plugin hook support. 

1920 

1921 Args: 

1922 db: Database session. 

1923 resource_id: Optional ID of the resource to read. 

1924 resource_uri: Optional URI of the resource to read. 

1925 request_id: Optional request ID for tracing. 

1926 user: Optional user email for authorization checks. 

1927 server_id: Optional server ID for server scoping enforcement. 

1928 include_inactive: Whether to include inactive resources. Defaults to False. 

1929 token_teams: Optional list of team IDs from token for authorization. 

1930 None = unrestricted admin, [] = public-only, [...] = team-scoped. 

1931 plugin_context_table: Optional plugin context table from previous hooks for cross-hook state sharing. 

1932 plugin_global_context: Optional global context from middleware for consistency across hooks. 

1933 meta_data: Optional metadata dictionary to pass to the gateway during resource reading. 

1934 

1935 Returns: 

1936 Resource content object 

1937 

1938 Raises: 

1939 ResourceNotFoundError: If resource not found or access denied 

1940 ResourceError: If blocked by plugin 

1941 PluginError: If encounters issue with plugin 

1942 PluginViolationError: If plugin violated the request. Example - In case of OPA plugin, if the request is denied by policy. 

1943 ValueError: If neither resource_id nor resource_uri is provided 

1944 

1945 Examples: 

1946 >>> from mcpgateway.common.models import ResourceContent 

1947 >>> from mcpgateway.services.resource_service import ResourceService 

1948 >>> from unittest.mock import MagicMock, PropertyMock 

1949 >>> service = ResourceService() 

1950 >>> db = MagicMock() 

1951 >>> uri = 'http://example.com/resource.txt' 

1952 >>> mock_resource = MagicMock() 

1953 >>> mock_resource.id = 123 

1954 >>> mock_resource.uri = uri 

1955 >>> type(mock_resource).content = PropertyMock(return_value='test') 

1956 >>> db.execute.return_value.scalar_one_or_none.return_value = mock_resource 

1957 >>> db.get.return_value = mock_resource 

1958 >>> import asyncio 

1959 >>> result = asyncio.run(service.read_resource(db, resource_uri=uri)) 

1960 >>> result.__class__.__name__ == 'ResourceContent' 

1961 True 

1962 

1963 Not found case returns ResourceNotFoundError: 

1964 

1965 >>> db2 = MagicMock() 

1966 >>> db2.execute.return_value.scalar_one_or_none.return_value = None 

1967 >>> db2.get.return_value = None 

1968 >>> import asyncio 

1969 >>> # Disable path validation for doctest 

1970 >>> import mcpgateway.config 

1971 >>> old_val = getattr(mcpgateway.config.settings, 'experimental_validate_io', False) 

1972 >>> mcpgateway.config.settings.experimental_validate_io = False 

1973 >>> def _nf(): 

1974 ... try: 

1975 ... asyncio.run(service.read_resource(db2, resource_uri='abc')) 

1976 ... except ResourceNotFoundError: 

1977 ... return True 

1978 >>> result = _nf() 

1979 >>> mcpgateway.config.settings.experimental_validate_io = old_val 

1980 >>> result 

1981 True 

1982 """ 

1983 start_time = time.monotonic() 

1984 success = False 

1985 error_message = None 

1986 resource_db = None 

1987 server_scoped = False 

1988 resource_db_gateway = None # Only set when eager-loaded via Q2's joinedload 

1989 content = None 

1990 uri = resource_uri or "unknown" 

1991 if resource_id: 

1992 resource_db = db.get(DbResource, resource_id, options=[joinedload(DbResource.gateway)]) 

1993 if resource_db: 

1994 uri = resource_db.uri 

1995 resource_db_gateway = resource_db.gateway # Eager-loaded, safe to access 

1996 # Check enabled status in Python (avoids redundant Q3/Q4 re-fetches) 

1997 if not include_inactive and not resource_db.enabled: 

1998 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive") 

1999 content = resource_db.content 

2000 else: 

2001 uri = None 

2002 

2003 # Create database span for observability dashboard 

2004 trace_id = current_trace_id.get() 

2005 db_span_id = None 

2006 db_span_ended = False 

2007 observability_service = ObservabilityService() if trace_id else None 

2008 

2009 if trace_id and observability_service: 

2010 try: 

2011 db_span_id = observability_service.start_span( 

2012 db=db, 

2013 trace_id=trace_id, 

2014 name="resource.read", 

2015 attributes={ 

2016 "resource.uri": str(resource_uri) if resource_uri else "unknown", 

2017 "user": user or "anonymous", 

2018 "server_id": server_id, 

2019 "request_id": request_id, 

2020 "http.url": uri if uri is not None and uri.startswith("http") else None, 

2021 "resource.type": "template" if (uri is not None and "{" in uri and "}" in uri) else "static", 

2022 }, 

2023 ) 

2024 logger.debug(f"✓ Created resource.read span: {db_span_id} for resource: {uri}") 

2025 except Exception as e: 

2026 logger.warning(f"Failed to start observability span for resource reading: {e}") 

2027 db_span_id = None 

2028 

2029 with create_span( 

2030 "resource.read", 

2031 { 

2032 "resource.uri": resource_uri or "unknown", 

2033 "user": user or "anonymous", 

2034 "server_id": server_id, 

2035 "request_id": request_id, 

2036 "http.url": uri if uri is not None and uri.startswith("http") else None, 

2037 "resource.type": "template" if (uri is not None and "{" in uri and "}" in uri) else "static", 

2038 }, 

2039 ) as span: 

2040 try: 

2041 # Generate request ID if not provided 

2042 if not request_id: 

2043 request_id = str(uuid.uuid4()) 

2044 

2045 original_uri = uri 

2046 contexts = None 

2047 

2048 # Check if plugin manager is available and eligible for this request 

2049 plugin_eligible = bool(self._plugin_manager and PLUGINS_AVAILABLE and uri and ("://" in uri)) 

2050 

2051 # Initialize plugin manager if needed (lazy init must happen before has_hooks_for check) 

2052 # pylint: disable=protected-access 

2053 if plugin_eligible and not self._plugin_manager._initialized: 

2054 await self._plugin_manager.initialize() 

2055 # pylint: enable=protected-access 

2056 

2057 # Check if any resource hooks are registered to avoid unnecessary context creation 

2058 has_pre_fetch = plugin_eligible and self._plugin_manager.has_hooks_for(ResourceHookType.RESOURCE_PRE_FETCH) 

2059 has_post_fetch = plugin_eligible and self._plugin_manager.has_hooks_for(ResourceHookType.RESOURCE_POST_FETCH) 

2060 

2061 # Initialize plugin context variables only if hooks are registered 

2062 global_context = None 

2063 if has_pre_fetch or has_post_fetch: 

2064 # Create plugin context 

2065 # Normalize user to an identifier string if provided 

2066 user_id = None 

2067 if user is not None: 

2068 if isinstance(user, dict) and "email" in user: 

2069 user_id = user.get("email") 

2070 elif isinstance(user, str): 

2071 user_id = user 

2072 else: 

2073 # Attempt to fallback to attribute access 

2074 user_id = getattr(user, "email", None) 

2075 

2076 # Use existing global_context from middleware or create new one 

2077 if plugin_global_context: 

2078 global_context = plugin_global_context 

2079 # Update fields with resource-specific information 

2080 if user_id: 

2081 global_context.user = user_id 

2082 if server_id: 

2083 global_context.server_id = server_id 

2084 else: 

2085 # Create new context (fallback when middleware didn't run) 

2086 global_context = GlobalContext(request_id=request_id, user=user_id, server_id=server_id) 

2087 

2088 # Call pre-fetch hooks if registered 

2089 if has_pre_fetch: 

2090 # Create pre-fetch payload 

2091 pre_payload = ResourcePreFetchPayload(uri=uri, metadata={}) 

2092 

2093 # Execute pre-fetch hooks with context from previous hooks 

2094 pre_result, contexts = await self._plugin_manager.invoke_hook( 

2095 ResourceHookType.RESOURCE_PRE_FETCH, 

2096 pre_payload, 

2097 global_context, 

2098 local_contexts=plugin_context_table, # Pass context from previous hooks 

2099 violations_as_exceptions=True, 

2100 ) 

2101 # Use modified URI if plugin changed it 

2102 if pre_result.modified_payload: 

2103 uri = pre_result.modified_payload.uri 

2104 logger.debug(f"Resource URI modified by plugin: {original_uri} -> {uri}") 

2105 

2106 # Validate resource path if experimental validation is enabled 

2107 if getattr(settings, "experimental_validate_io", False) and uri and isinstance(uri, str): 

2108 try: 

2109 SecurityValidator.validate_path(uri, getattr(settings, "allowed_roots", None)) 

2110 except ValueError as e: 

2111 raise ResourceError(f"Path validation failed: {e}") 

2112 

2113 # Original resource fetching logic 

2114 logger.info(f"Fetching resource: {resource_id} (URI: {uri})") 

2115 

2116 # Check if resource's gateway is in direct_proxy mode 

2117 # First, try to find the resource to get its gateway 

2118 # Check for template 

2119 

2120 if resource_db is None and uri is not None: # and "{" in uri and "}" in uri: 

2121 # Matches uri (modified value from pluggins if applicable) 

2122 # with uri from resource DB 

2123 # if uri is of type resource template then resource is retreived from DB 

2124 query = select(DbResource).where(DbResource.uri == str(uri)).where(DbResource.enabled) 

2125 if include_inactive: 

2126 query = select(DbResource).where(DbResource.uri == str(uri)) 

2127 resource_db = db.execute(query).scalar_one_or_none() 

2128 

2129 # Check for direct_proxy mode 

2130 if resource_db and resource_db.gateway and getattr(resource_db.gateway, "gateway_mode", "cache") == "direct_proxy" and settings.mcpgateway_direct_proxy_enabled: 

2131 # SECURITY: Check gateway access before allowing direct proxy 

2132 if not await check_gateway_access(db, resource_db.gateway, user, token_teams): 

2133 raise ResourceNotFoundError(f"Resource not found: {uri}") 

2134 

2135 logger.info(f"Using direct_proxy mode for resource '{uri}' via gateway {resource_db.gateway.id}") 

2136 

2137 try: # First-Party 

2138 # First-Party 

2139 from mcpgateway.common.models import BlobResourceContents, TextResourceContents # pylint: disable=import-outside-toplevel 

2140 

2141 gateway = resource_db.gateway 

2142 

2143 # Prepare headers with gateway auth 

2144 headers = build_gateway_auth_headers(gateway) 

2145 

2146 # Use MCP SDK to connect and read resource 

2147 async with streamablehttp_client(url=gateway.url, headers=headers, timeout=settings.mcpgateway_direct_proxy_timeout) as (read_stream, write_stream, _get_session_id): 

2148 async with ClientSession(read_stream, write_stream) as session: 

2149 await session.initialize() 

2150 

2151 # Note: MCP SDK read_resource() only accepts uri; _meta is not supported 

2152 result = await session.read_resource(uri=uri) 

2153 

2154 # Convert MCP result to MCP-compliant content models 

2155 # result.contents is a list of TextResourceContents or BlobResourceContents 

2156 if result.contents: 

2157 first_content = result.contents[0] 

2158 if hasattr(first_content, "text"): 

2159 content = TextResourceContents(uri=uri, mimeType=first_content.mimeType if hasattr(first_content, "mimeType") else "text/plain", text=first_content.text) 

2160 elif hasattr(first_content, "blob"): 

2161 content = BlobResourceContents( 

2162 uri=uri, mimeType=first_content.mimeType if hasattr(first_content, "mimeType") else "application/octet-stream", blob=first_content.blob 

2163 ) 

2164 else: 

2165 content = TextResourceContents(uri=uri, text="") 

2166 else: 

2167 content = TextResourceContents(uri=uri, text="") 

2168 

2169 success = True 

2170 logger.info(f"[READ RESOURCE] Using direct_proxy mode for gateway {gateway.id} (from X-Context-Forge-Gateway-Id header). Meta Attached: {meta_data is not None}") 

2171 # Skip the rest of the DB lookup logic 

2172 

2173 except Exception as e: 

2174 logger.exception(f"Error in direct_proxy mode for resource '{uri}': {e}") 

2175 raise ResourceError(f"Direct proxy resource read failed: {str(e)}") 

2176 

2177 elif resource_db: 

2178 # Normal cache mode - resource found in DB 

2179 content = resource_db.content 

2180 else: 

2181 # Check the inactivity first 

2182 check_inactivity = db.execute(select(DbResource).where(DbResource.uri == str(resource_uri)).where(not_(DbResource.enabled))).scalar_one_or_none() 

2183 if check_inactivity: 

2184 raise ResourceNotFoundError(f"Resource '{resource_uri}' exists but is inactive") 

2185 

2186 if resource_db is None: 

2187 if resource_uri: 

2188 # if resource_uri is provided 

2189 # modified uri have templatized resource with prefilled value 

2190 # triggers _read_template_resource 

2191 # it internally checks which uri matches the pattern of modified uri and fetches 

2192 # the one which matches else raises ResourceNotFoundError 

2193 try: 

2194 content = await self._read_template_resource(db, uri) or None 

2195 # ═══════════════════════════════════════════════════════════════════════════ 

2196 # SECURITY: Fetch the template's DbResource record for access checking 

2197 # _read_template_resource returns ResourceContent with the template's ID 

2198 # ═══════════════════════════════════════════════════════════════════════════ 

2199 if content is not None and hasattr(content, "id") and content.id: 

2200 template_query = select(DbResource).where(DbResource.id == str(content.id)) 

2201 if not include_inactive: 

2202 template_query = template_query.where(DbResource.enabled) 

2203 resource_db = db.execute(template_query).scalar_one_or_none() 

2204 except Exception as e: 

2205 raise ResourceNotFoundError(f"Resource template not found for '{resource_uri}'") from e 

2206 

2207 if resource_uri: 

2208 if content is None and resource_db is None: 

2209 raise ResourceNotFoundError(f"Resource template not found for '{resource_uri}'") 

2210 

2211 if resource_id and resource_db is None: 

2212 # if resource_id provided but not found by Q2 (shouldn't normally happen, 

2213 # but handles race conditions where resource is deleted between requests) 

2214 query = select(DbResource).where(DbResource.id == str(resource_id)).where(DbResource.enabled) 

2215 if include_inactive: 

2216 query = select(DbResource).where(DbResource.id == str(resource_id)) 

2217 resource_db = db.execute(query).scalar_one_or_none() 

2218 if resource_db: 

2219 original_uri = resource_db.uri or None 

2220 content = resource_db.content 

2221 else: 

2222 check_inactivity = db.execute(select(DbResource).where(DbResource.id == str(resource_id)).where(not_(DbResource.enabled))).scalar_one_or_none() 

2223 if check_inactivity: 

2224 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive") 

2225 raise ResourceNotFoundError(f"Resource not found for the resource id: {resource_id}") 

2226 

2227 # ═══════════════════════════════════════════════════════════════════════════ 

2228 # SECURITY: Check resource access based on visibility and team membership 

2229 # ═══════════════════════════════════════════════════════════════════════════ 

2230 if resource_db: 

2231 if not await self._check_resource_access(db, resource_db, user, token_teams): 

2232 # Don't reveal resource existence - return generic "not found" 

2233 raise ResourceNotFoundError(f"Resource not found: {resource_uri or resource_id}") 

2234 

2235 # ═══════════════════════════════════════════════════════════════════════════ 

2236 # SECURITY: Enforce server scoping if server_id is provided 

2237 # Resource must be attached to the specified virtual server 

2238 # ═══════════════════════════════════════════════════════════════════════════ 

2239 if server_id: 

2240 server_match = db.execute( 

2241 select(server_resource_association.c.resource_id).where( 

2242 server_resource_association.c.server_id == server_id, 

2243 server_resource_association.c.resource_id == resource_db.id, 

2244 ) 

2245 ).first() 

2246 if not server_match: 

2247 raise ResourceNotFoundError(f"Resource not found: {resource_uri or resource_id}") 

2248 server_scoped = True 

2249 

2250 # Set success attributes on span 

2251 if span: 

2252 span.set_attribute("success", True) 

2253 span.set_attribute("duration.ms", (time.monotonic() - start_time) * 1000) 

2254 if content: 

2255 span.set_attribute("content.size", len(str(content))) 

2256 

2257 success = True 

2258 # Return standardized content without breaking callers that expect passthrough 

2259 # Prefer returning first-class content models or objects with content-like attributes. 

2260 # ResourceContent and TextContent already imported at top level 

2261 

2262 # Release transaction before network calls to avoid idle-in-transaction during invoke_resource 

2263 db.commit() 

2264 

2265 # ═══════════════════════════════════════════════════════════════════════════ 

2266 # RESOLVE CONTENT: Fetch actual content from gateway if needed 

2267 # ═══════════════════════════════════════════════════════════════════════════ 

2268 # If content is a Pydantic content model, invoke gateway 

2269 

2270 # ResourceContents covers TextResourceContents and BlobResourceContents (MCP-compliant) 

2271 # ResourceContent is the legacy model for backwards compatibility 

2272 

2273 if isinstance(content, (ResourceContent, ResourceContents, TextContent)): 

2274 # Metrics are recorded in read_resource finally block for all resources 

2275 resource_response = await self.invoke_resource( 

2276 db=db, 

2277 resource_id=getattr(content, "id"), 

2278 resource_uri=getattr(content, "uri") or None, 

2279 resource_template_uri=getattr(content, "text") or None, 

2280 user_identity=user, 

2281 meta_data=meta_data, 

2282 resource_obj=resource_db, 

2283 gateway_obj=resource_db_gateway, 

2284 server_id=server_id, 

2285 ) 

2286 if resource_response: 

2287 setattr(content, "text", resource_response) 

2288 # If content is any object that quacks like content 

2289 elif hasattr(content, "text") or hasattr(content, "blob"): 

2290 # Metrics are recorded in read_resource finally block for all resources 

2291 if hasattr(content, "blob"): 

2292 resource_response = await self.invoke_resource( 

2293 db=db, 

2294 resource_id=getattr(content, "id"), 

2295 resource_uri=getattr(content, "uri") or None, 

2296 resource_template_uri=getattr(content, "blob") or None, 

2297 user_identity=user, 

2298 meta_data=meta_data, 

2299 resource_obj=resource_db, 

2300 gateway_obj=resource_db_gateway, 

2301 server_id=server_id, 

2302 ) 

2303 if resource_response: 

2304 setattr(content, "blob", resource_response) 

2305 elif hasattr(content, "text"): 

2306 resource_response = await self.invoke_resource( 

2307 db=db, 

2308 resource_id=getattr(content, "id"), 

2309 resource_uri=getattr(content, "uri") or None, 

2310 resource_template_uri=getattr(content, "text") or None, 

2311 user_identity=user, 

2312 meta_data=meta_data, 

2313 resource_obj=resource_db, 

2314 gateway_obj=resource_db_gateway, 

2315 server_id=server_id, 

2316 ) 

2317 if resource_response: 

2318 setattr(content, "text", resource_response) 

2319 # Normalize primitive types to ResourceContent 

2320 elif isinstance(content, bytes): 

2321 content = ResourceContent(type="resource", id=str(resource_id), uri=original_uri, blob=content) 

2322 elif isinstance(content, str): 

2323 content = ResourceContent(type="resource", id=str(resource_id), uri=original_uri, text=content) 

2324 else: 

2325 # Fallback to stringified content 

2326 content = ResourceContent(type="resource", id=str(resource_id) or str(content.id), uri=original_uri or content.uri, text=str(content)) 

2327 

2328 # ═══════════════════════════════════════════════════════════════════════════ 

2329 # POST-FETCH HOOKS: Now called AFTER content is resolved from gateway 

2330 # ═══════════════════════════════════════════════════════════════════════════ 

2331 if has_post_fetch: 

2332 post_payload = ResourcePostFetchPayload(uri=original_uri, content=content) 

2333 post_result, _ = await self._plugin_manager.invoke_hook(ResourceHookType.RESOURCE_POST_FETCH, post_payload, global_context, contexts, violations_as_exceptions=True) 

2334 if post_result.modified_payload: 

2335 content = post_result.modified_payload.content 

2336 

2337 return content 

2338 except Exception as e: 

2339 success = False 

2340 error_message = str(e) 

2341 raise 

2342 finally: 

2343 # Record metrics only if we found a resource (not for templates) 

2344 logger.debug(f"read_resource finally block: resource_db={'present' if resource_db else None}, resource_id={resource_db.id if resource_db else None}, server_id={server_id}") 

2345 

2346 if resource_db: 

2347 try: 

2348 metrics_buffer.record_resource_metric( 

2349 resource_id=resource_db.id, 

2350 start_time=start_time, 

2351 success=success, 

2352 error_message=error_message, 

2353 ) 

2354 except Exception as metrics_error: 

2355 logger.warning(f"Failed to record resource metric: {metrics_error}") 

2356 

2357 # Record server metrics ONLY when the server scoping check passed. 

2358 # This prevents recording metrics with unvalidated server_id values 

2359 # from admin API headers (X-Server-ID) or RPC params. 

2360 if resource_db and server_scoped: 

2361 try: 

2362 logger.debug(f"Recording server metric for server_id={server_id}, resource_id={resource_db.id}, success={success}") 

2363 # Record server metric only for the specific virtual server being accessed 

2364 metrics_buffer.record_server_metric( 

2365 server_id=server_id, 

2366 start_time=start_time, 

2367 success=success, 

2368 error_message=error_message, 

2369 ) 

2370 except Exception as metrics_error: 

2371 logger.warning(f"Failed to record server metric: {metrics_error}") 

2372 

2373 # End database span for observability dashboard 

2374 # NOTE: Use fresh_db_session() since db may have been closed by invoke_resource 

2375 if db_span_id and observability_service and not db_span_ended: 

2376 try: 

2377 with fresh_db_session() as fresh_db: 

2378 observability_service.end_span( 

2379 db=fresh_db, 

2380 span_id=db_span_id, 

2381 status="ok" if success else "error", 

2382 status_message=error_message if error_message else None, 

2383 ) 

2384 db_span_ended = True 

2385 logger.debug(f"✓ Ended resource.read span: {db_span_id}") 

2386 except Exception as e: 

2387 logger.warning(f"Failed to end observability span for resource reading: {e}") 

2388 

2389 async def set_resource_state(self, db: Session, resource_id: int, activate: bool, user_email: Optional[str] = None, skip_cache_invalidation: bool = False) -> ResourceRead: 

2390 """ 

2391 Set the activation status of a resource. 

2392 

2393 Args: 

2394 db: Database session 

2395 resource_id: Resource ID 

2396 activate: True to activate, False to deactivate 

2397 user_email: Optional[str] The email of the user to check if the user has permission to modify. 

2398 skip_cache_invalidation: If True, skip cache invalidation (used for batch operations). 

2399 

2400 Returns: 

2401 The updated ResourceRead object 

2402 

2403 Raises: 

2404 ResourceNotFoundError: If the resource is not found. 

2405 ResourceLockConflictError: If the resource is locked by another transaction. 

2406 ResourceError: For other errors. 

2407 PermissionError: If user doesn't own the resource. 

2408 

2409 Examples: 

2410 >>> from mcpgateway.services.resource_service import ResourceService 

2411 >>> from unittest.mock import MagicMock, AsyncMock 

2412 >>> from mcpgateway.schemas import ResourceRead 

2413 >>> service = ResourceService() 

2414 >>> db = MagicMock() 

2415 >>> resource = MagicMock() 

2416 >>> db.get.return_value = resource 

2417 >>> db.commit = MagicMock() 

2418 >>> db.refresh = MagicMock() 

2419 >>> service._notify_resource_activated = AsyncMock() 

2420 >>> service._notify_resource_deactivated = AsyncMock() 

2421 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read') 

2422 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read') 

2423 >>> import asyncio 

2424 >>> asyncio.run(service.set_resource_state(db, 1, True)) 

2425 'resource_read' 

2426 """ 

2427 try: 

2428 # Use nowait=True to fail fast if row is locked, preventing lock contention under high load 

2429 try: 

2430 resource = get_for_update(db, DbResource, resource_id, nowait=True) 

2431 except OperationalError as lock_err: 

2432 # Row is locked by another transaction - fail fast with 409 

2433 db.rollback() 

2434 raise ResourceLockConflictError(f"Resource {resource_id} is currently being modified by another request") from lock_err 

2435 if not resource: 

2436 raise ResourceNotFoundError(f"Resource not found: {resource_id}") 

2437 

2438 if user_email: 

2439 # First-Party 

2440 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

2441 

2442 permission_service = PermissionService(db) 

2443 if not await permission_service.check_resource_ownership(user_email, resource): 

2444 raise PermissionError("Only the owner can activate the Resource" if activate else "Only the owner can deactivate the Resource") 

2445 

2446 # Update status if it's different 

2447 if resource.enabled != activate: 

2448 resource.enabled = activate 

2449 resource.updated_at = datetime.now(timezone.utc) 

2450 db.commit() 

2451 db.refresh(resource) 

2452 

2453 # Invalidate cache after status change (skip for batch operations) 

2454 if not skip_cache_invalidation: 

2455 cache = _get_registry_cache() 

2456 await cache.invalidate_resources() 

2457 

2458 # Notify subscribers 

2459 if activate: 

2460 await self._notify_resource_activated(resource) 

2461 else: 

2462 await self._notify_resource_deactivated(resource) 

2463 

2464 logger.info(f"Resource {resource.uri} {'activated' if activate else 'deactivated'}") 

2465 

2466 # Structured logging: Audit trail for resource state change 

2467 audit_trail.log_action( 

2468 user_id=user_email or "system", 

2469 action="set_resource_state", 

2470 resource_type="resource", 

2471 resource_id=str(resource.id), 

2472 resource_name=resource.name, 

2473 user_email=user_email, 

2474 team_id=resource.team_id, 

2475 new_values={ 

2476 "enabled": resource.enabled, 

2477 }, 

2478 context={ 

2479 "action": "activate" if activate else "deactivate", 

2480 }, 

2481 db=db, 

2482 ) 

2483 

2484 # Structured logging: Log successful resource state change 

2485 structured_logger.log( 

2486 level="INFO", 

2487 message=f"Resource {'activated' if activate else 'deactivated'} successfully", 

2488 event_type="resource_state_changed", 

2489 component="resource_service", 

2490 user_email=user_email, 

2491 team_id=resource.team_id, 

2492 resource_type="resource", 

2493 resource_id=str(resource.id), 

2494 custom_fields={ 

2495 "resource_uri": resource.uri, 

2496 "enabled": resource.enabled, 

2497 }, 

2498 ) 

2499 

2500 resource.team = self._get_team_name(db, resource.team_id) 

2501 return self.convert_resource_to_read(resource) 

2502 except PermissionError as e: 

2503 # Structured logging: Log permission error 

2504 structured_logger.log( 

2505 level="WARNING", 

2506 message="Resource state change failed due to permission error", 

2507 event_type="resource_state_change_permission_denied", 

2508 component="resource_service", 

2509 user_email=user_email, 

2510 resource_type="resource", 

2511 resource_id=str(resource_id), 

2512 error=e, 

2513 ) 

2514 raise e 

2515 except ResourceLockConflictError: 

2516 # Re-raise lock conflicts without wrapping - allows 409 response 

2517 raise 

2518 except ResourceNotFoundError: 

2519 # Re-raise not found without wrapping - allows 404 response 

2520 raise 

2521 except Exception as e: 

2522 db.rollback() 

2523 

2524 # Structured logging: Log generic resource state change failure 

2525 structured_logger.log( 

2526 level="ERROR", 

2527 message="Resource state change failed", 

2528 event_type="resource_state_change_failed", 

2529 component="resource_service", 

2530 user_email=user_email, 

2531 resource_type="resource", 

2532 resource_id=str(resource_id), 

2533 error=e, 

2534 ) 

2535 raise ResourceError(f"Failed to set resource state: {str(e)}") 

2536 

2537 async def subscribe_resource( 

2538 self, 

2539 db: Session, 

2540 subscription: ResourceSubscription, 

2541 *, 

2542 user_email: Optional[str] = None, 

2543 token_teams: Optional[List[str]] = None, 

2544 ) -> None: 

2545 """ 

2546 Subscribe to a resource. 

2547 

2548 Args: 

2549 db: Database session 

2550 subscription: Resource subscription object 

2551 user_email: Requester email used for visibility checks. 

2552 token_teams: Token team scope used for visibility checks. 

2553 

2554 Raises: 

2555 ResourceNotFoundError: If the resource is not found or is inactive 

2556 PermissionError: If the requester is not authorized for the resource 

2557 ResourceError: For other subscription errors 

2558 

2559 Examples: 

2560 >>> from mcpgateway.services.resource_service import ResourceService 

2561 >>> from unittest.mock import MagicMock 

2562 >>> service = ResourceService() 

2563 >>> db = MagicMock() 

2564 >>> subscription = MagicMock() 

2565 >>> import asyncio 

2566 >>> asyncio.run(service.subscribe_resource(db, subscription)) 

2567 """ 

2568 try: 

2569 # Verify resource exists (single query to avoid TOCTOU between active/inactive checks) 

2570 resource = db.execute(select(DbResource).where(DbResource.uri == subscription.uri)).scalar_one_or_none() 

2571 

2572 if not resource: 

2573 raise ResourceNotFoundError(f"Resource not found: {subscription.uri}") 

2574 

2575 if not resource.enabled: 

2576 raise ResourceNotFoundError(f"Resource '{subscription.uri}' exists but is inactive") 

2577 

2578 if not await self._check_resource_access(db, resource, user_email=user_email, token_teams=token_teams): 

2579 raise PermissionError(f"Access denied for resource subscription: {subscription.uri}") 

2580 

2581 # Create subscription 

2582 db_sub = DbSubscription(resource_id=resource.id, subscriber_id=subscription.subscriber_id) 

2583 db.add(db_sub) 

2584 db.commit() 

2585 

2586 logger.info(f"Added subscription for {subscription.uri} by {subscription.subscriber_id}") 

2587 

2588 except PermissionError: 

2589 db.rollback() 

2590 raise 

2591 except Exception as e: 

2592 db.rollback() 

2593 raise ResourceError(f"Failed to subscribe: {str(e)}") 

2594 

2595 async def unsubscribe_resource(self, db: Session, subscription: ResourceSubscription) -> None: 

2596 """ 

2597 Unsubscribe from a resource. 

2598 

2599 Args: 

2600 db: Database session 

2601 subscription: Resource subscription object 

2602 

2603 Raises: 

2604 

2605 Examples: 

2606 >>> from mcpgateway.services.resource_service import ResourceService 

2607 >>> from unittest.mock import MagicMock 

2608 >>> service = ResourceService() 

2609 >>> db = MagicMock() 

2610 >>> subscription = MagicMock() 

2611 >>> import asyncio 

2612 >>> asyncio.run(service.unsubscribe_resource(db, subscription)) 

2613 """ 

2614 try: 

2615 # Find resource 

2616 resource = db.execute(select(DbResource).where(DbResource.uri == subscription.uri)).scalar_one_or_none() 

2617 

2618 if not resource: 

2619 return 

2620 

2621 # Remove subscription 

2622 db.execute(select(DbSubscription).where(DbSubscription.resource_id == resource.id).where(DbSubscription.subscriber_id == subscription.subscriber_id)).delete() 

2623 db.commit() 

2624 

2625 logger.info(f"Removed subscription for {subscription.uri} by {subscription.subscriber_id}") 

2626 

2627 except Exception as e: 

2628 db.rollback() 

2629 logger.error(f"Failed to unsubscribe: {str(e)}") 

2630 

2631 async def update_resource( 

2632 self, 

2633 db: Session, 

2634 resource_id: Union[int, str], 

2635 resource_update: ResourceUpdate, 

2636 modified_by: Optional[str] = None, 

2637 modified_from_ip: Optional[str] = None, 

2638 modified_via: Optional[str] = None, 

2639 modified_user_agent: Optional[str] = None, 

2640 user_email: Optional[str] = None, 

2641 ) -> ResourceRead: 

2642 """ 

2643 Update a resource. 

2644 

2645 Args: 

2646 db: Database session 

2647 resource_id: Resource ID 

2648 resource_update: Resource update object 

2649 modified_by: Username of the person modifying the resource 

2650 modified_from_ip: IP address where the modification request originated 

2651 modified_via: Source of modification (ui/api/import) 

2652 modified_user_agent: User agent string from the modification request 

2653 user_email: Email of user performing update (for ownership check) 

2654 

2655 Returns: 

2656 The updated ResourceRead object 

2657 

2658 Raises: 

2659 ResourceNotFoundError: If the resource is not found 

2660 ResourceURIConflictError: If a resource with the same URI already exists. 

2661 PermissionError: If user doesn't own the resource 

2662 ResourceError: For other update errors 

2663 IntegrityError: If a database integrity error occurs. 

2664 Exception: For unexpected errors 

2665 

2666 Example: 

2667 >>> from mcpgateway.services.resource_service import ResourceService 

2668 >>> from unittest.mock import MagicMock, AsyncMock 

2669 >>> from mcpgateway.schemas import ResourceRead 

2670 >>> service = ResourceService() 

2671 >>> db = MagicMock() 

2672 >>> resource = MagicMock() 

2673 >>> db.get.return_value = resource 

2674 >>> db.commit = MagicMock() 

2675 >>> db.refresh = MagicMock() 

2676 >>> service._notify_resource_updated = AsyncMock() 

2677 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read') 

2678 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read') 

2679 >>> import asyncio 

2680 >>> asyncio.run(service.update_resource(db, 'resource_id', MagicMock())) 

2681 'resource_read' 

2682 """ 

2683 try: 

2684 logger.info(f"Updating resource: {resource_id}") 

2685 resource = get_for_update(db, DbResource, resource_id) 

2686 if not resource: 

2687 raise ResourceNotFoundError(f"Resource not found: {resource_id}") 

2688 

2689 # # Check for uri conflict if uri is being changed and visibility is public 

2690 if resource_update.uri and resource_update.uri != resource.uri: 

2691 visibility = resource_update.visibility or resource.visibility 

2692 team_id = resource_update.team_id or resource.team_id 

2693 if visibility.lower() == "public": 

2694 # Check for existing public resources with the same uri 

2695 existing_resource = get_for_update(db, DbResource, where=and_(DbResource.uri == resource_update.uri, DbResource.visibility == "public", DbResource.id != resource_id)) 

2696 if existing_resource: 

2697 raise ResourceURIConflictError(resource_update.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility) 

2698 elif visibility.lower() == "team" and team_id: 

2699 # Check for existing team resource with the same uri 

2700 existing_resource = get_for_update( 

2701 db, DbResource, where=and_(DbResource.uri == resource_update.uri, DbResource.visibility == "team", DbResource.team_id == team_id, DbResource.id != resource_id) 

2702 ) 

2703 if existing_resource: 

2704 raise ResourceURIConflictError(resource_update.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility) 

2705 

2706 # Check ownership if user_email provided 

2707 if user_email: 

2708 # First-Party 

2709 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

2710 

2711 permission_service = PermissionService(db) 

2712 if not await permission_service.check_resource_ownership(user_email, resource): 

2713 raise PermissionError("Only the owner can update this resource") 

2714 

2715 # Update fields if provided 

2716 if resource_update.uri is not None: 

2717 resource.uri = resource_update.uri 

2718 if resource_update.name is not None: 

2719 resource.name = resource_update.name 

2720 if resource_update.description is not None: 

2721 resource.description = resource_update.description 

2722 if resource_update.mime_type is not None: 

2723 resource.mime_type = resource_update.mime_type 

2724 if resource_update.uri_template is not None: 

2725 resource.uri_template = resource_update.uri_template 

2726 if resource_update.visibility is not None: 

2727 resource.visibility = resource_update.visibility 

2728 

2729 # Update content if provided 

2730 if resource_update.content is not None: 

2731 # Determine content storage 

2732 is_text = resource.mime_type and resource.mime_type.startswith("text/") or isinstance(resource_update.content, str) 

2733 

2734 resource.text_content = resource_update.content if is_text else None 

2735 resource.binary_content = ( 

2736 resource_update.content.encode() if is_text and isinstance(resource_update.content, str) else resource_update.content if isinstance(resource_update.content, bytes) else None 

2737 ) 

2738 resource.size = len(resource_update.content) 

2739 

2740 # Update tags if provided 

2741 if resource_update.tags is not None: 

2742 resource.tags = resource_update.tags 

2743 

2744 # Update metadata fields 

2745 resource.updated_at = datetime.now(timezone.utc) 

2746 if modified_by: 

2747 resource.modified_by = modified_by 

2748 if modified_from_ip: 

2749 resource.modified_from_ip = modified_from_ip 

2750 if modified_via: 

2751 resource.modified_via = modified_via 

2752 if modified_user_agent: 

2753 resource.modified_user_agent = modified_user_agent 

2754 if hasattr(resource, "version") and resource.version is not None: 

2755 resource.version = resource.version + 1 

2756 else: 

2757 resource.version = 1 

2758 db.commit() 

2759 db.refresh(resource) 

2760 

2761 # Invalidate cache after successful update 

2762 cache = _get_registry_cache() 

2763 await cache.invalidate_resources() 

2764 # Also invalidate tags cache since resource tags may have changed 

2765 # First-Party 

2766 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

2767 

2768 await admin_stats_cache.invalidate_tags() 

2769 # First-Party 

2770 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

2771 

2772 metrics_cache.invalidate_prefix("top_resources:") 

2773 metrics_cache.invalidate("resources") 

2774 

2775 # Notify subscribers 

2776 await self._notify_resource_updated(resource) 

2777 

2778 logger.info(f"Updated resource: {resource.uri}") 

2779 

2780 # Structured logging: Audit trail for resource update 

2781 changes = [] 

2782 if resource_update.uri: 

2783 changes.append(f"uri: {resource_update.uri}") 

2784 if resource_update.visibility: 

2785 changes.append(f"visibility: {resource_update.visibility}") 

2786 if resource_update.description: 

2787 changes.append("description updated") 

2788 

2789 audit_trail.log_action( 

2790 user_id=user_email or modified_by or "system", 

2791 action="update_resource", 

2792 resource_type="resource", 

2793 resource_id=str(resource.id), 

2794 resource_name=resource.name, 

2795 user_email=user_email, 

2796 team_id=resource.team_id, 

2797 client_ip=modified_from_ip, 

2798 user_agent=modified_user_agent, 

2799 new_values={ 

2800 "uri": resource.uri, 

2801 "name": resource.name, 

2802 "version": resource.version, 

2803 }, 

2804 context={ 

2805 "modified_via": modified_via, 

2806 "changes": ", ".join(changes) if changes else "metadata only", 

2807 }, 

2808 db=db, 

2809 ) 

2810 

2811 # Structured logging: Log successful resource update 

2812 structured_logger.log( 

2813 level="INFO", 

2814 message="Resource updated successfully", 

2815 event_type="resource_updated", 

2816 component="resource_service", 

2817 user_id=modified_by, 

2818 user_email=user_email, 

2819 team_id=resource.team_id, 

2820 resource_type="resource", 

2821 resource_id=str(resource.id), 

2822 custom_fields={ 

2823 "resource_uri": resource.uri, 

2824 "version": resource.version, 

2825 }, 

2826 ) 

2827 

2828 return self.convert_resource_to_read(resource) 

2829 except PermissionError as pe: 

2830 db.rollback() 

2831 

2832 # Structured logging: Log permission error 

2833 structured_logger.log( 

2834 level="WARNING", 

2835 message="Resource update failed due to permission error", 

2836 event_type="resource_update_permission_denied", 

2837 component="resource_service", 

2838 user_email=user_email, 

2839 resource_type="resource", 

2840 resource_id=str(resource_id), 

2841 error=pe, 

2842 ) 

2843 raise 

2844 except IntegrityError as ie: 

2845 db.rollback() 

2846 logger.error(f"IntegrityErrors in group: {ie}") 

2847 

2848 # Structured logging: Log database integrity error 

2849 structured_logger.log( 

2850 level="ERROR", 

2851 message="Resource update failed due to database integrity error", 

2852 event_type="resource_update_failed", 

2853 component="resource_service", 

2854 user_id=modified_by, 

2855 user_email=user_email, 

2856 resource_type="resource", 

2857 resource_id=str(resource_id), 

2858 error=ie, 

2859 ) 

2860 raise ie 

2861 except ResourceURIConflictError as pe: 

2862 logger.error(f"Resource URI conflict: {pe}") 

2863 

2864 # Structured logging: Log URI conflict error 

2865 structured_logger.log( 

2866 level="WARNING", 

2867 message="Resource update failed due to URI conflict", 

2868 event_type="resource_uri_conflict", 

2869 component="resource_service", 

2870 user_id=modified_by, 

2871 user_email=user_email, 

2872 resource_type="resource", 

2873 resource_id=str(resource_id), 

2874 error=pe, 

2875 ) 

2876 raise pe 

2877 except Exception as e: 

2878 db.rollback() 

2879 if isinstance(e, ResourceNotFoundError): 

2880 # Structured logging: Log not found error 

2881 structured_logger.log( 

2882 level="ERROR", 

2883 message="Resource update failed - resource not found", 

2884 event_type="resource_not_found", 

2885 component="resource_service", 

2886 user_email=user_email, 

2887 resource_type="resource", 

2888 resource_id=str(resource_id), 

2889 error=e, 

2890 ) 

2891 raise e 

2892 

2893 # Structured logging: Log generic resource update failure 

2894 structured_logger.log( 

2895 level="ERROR", 

2896 message="Resource update failed", 

2897 event_type="resource_update_failed", 

2898 component="resource_service", 

2899 user_id=modified_by, 

2900 user_email=user_email, 

2901 resource_type="resource", 

2902 resource_id=str(resource_id), 

2903 error=e, 

2904 ) 

2905 raise ResourceError(f"Failed to update resource: {str(e)}") 

2906 

2907 async def delete_resource(self, db: Session, resource_id: Union[int, str], user_email: Optional[str] = None, purge_metrics: bool = False) -> None: 

2908 """ 

2909 Delete a resource. 

2910 

2911 Args: 

2912 db: Database session 

2913 resource_id: Resource ID 

2914 user_email: Email of user performing delete (for ownership check) 

2915 purge_metrics: If True, delete raw + rollup metrics for this resource 

2916 

2917 Raises: 

2918 ResourceNotFoundError: If the resource is not found 

2919 PermissionError: If user doesn't own the resource 

2920 ResourceError: For other deletion errors 

2921 

2922 Example: 

2923 >>> from mcpgateway.services.resource_service import ResourceService 

2924 >>> from unittest.mock import MagicMock, AsyncMock 

2925 >>> service = ResourceService() 

2926 >>> db = MagicMock() 

2927 >>> resource = MagicMock() 

2928 >>> db.get.return_value = resource 

2929 >>> db.delete = MagicMock() 

2930 >>> db.commit = MagicMock() 

2931 >>> service._notify_resource_deleted = AsyncMock() 

2932 >>> import asyncio 

2933 >>> asyncio.run(service.delete_resource(db, 'resource_id')) 

2934 """ 

2935 try: 

2936 # Find resource by its URI. 

2937 resource = db.execute(select(DbResource).where(DbResource.id == resource_id)).scalar_one_or_none() 

2938 

2939 if not resource: 

2940 # If resource doesn't exist, rollback and re-raise a ResourceNotFoundError. 

2941 db.rollback() 

2942 raise ResourceNotFoundError(f"Resource not found: {resource_id}") 

2943 

2944 # Check ownership if user_email provided 

2945 if user_email: 

2946 # First-Party 

2947 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

2948 

2949 permission_service = PermissionService(db) 

2950 if not await permission_service.check_resource_ownership(user_email, resource): 

2951 raise PermissionError("Only the owner can delete this resource") 

2952 

2953 # Store resource info for notification before deletion. 

2954 resource_info = { 

2955 "id": resource.id, 

2956 "uri": resource.uri, 

2957 "name": resource.name, 

2958 "visibility": getattr(resource, "visibility", "public"), 

2959 "team_id": getattr(resource, "team_id", None), 

2960 "owner_email": getattr(resource, "owner_email", None), 

2961 } 

2962 

2963 # Remove subscriptions using SQLAlchemy's delete() expression. 

2964 db.execute(delete(DbSubscription).where(DbSubscription.resource_id == resource.id)) 

2965 

2966 if purge_metrics: 

2967 with pause_rollup_during_purge(reason=f"purge_resource:{resource.id}"): 

2968 delete_metrics_in_batches(db, ResourceMetric, ResourceMetric.resource_id, resource.id) 

2969 delete_metrics_in_batches(db, ResourceMetricsHourly, ResourceMetricsHourly.resource_id, resource.id) 

2970 

2971 # Hard delete the resource. 

2972 resource_uri = resource.uri 

2973 resource_name = resource.name 

2974 resource_team_id = resource.team_id 

2975 

2976 db.delete(resource) 

2977 db.commit() 

2978 

2979 # Invalidate cache after successful deletion 

2980 cache = _get_registry_cache() 

2981 await cache.invalidate_resources() 

2982 # Also invalidate tags cache since resource tags may have changed 

2983 # First-Party 

2984 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

2985 

2986 await admin_stats_cache.invalidate_tags() 

2987 

2988 # Notify subscribers. 

2989 await self._notify_resource_deleted(resource_info) 

2990 

2991 logger.info(f"Permanently deleted resource: {resource.uri}") 

2992 

2993 # Structured logging: Audit trail for resource deletion 

2994 audit_trail.log_action( 

2995 user_id=user_email or "system", 

2996 action="delete_resource", 

2997 resource_type="resource", 

2998 resource_id=str(resource_info["id"]), 

2999 resource_name=resource_name, 

3000 user_email=user_email, 

3001 team_id=resource_team_id, 

3002 old_values={ 

3003 "uri": resource_uri, 

3004 "name": resource_name, 

3005 }, 

3006 db=db, 

3007 ) 

3008 

3009 # Structured logging: Log successful resource deletion 

3010 structured_logger.log( 

3011 level="INFO", 

3012 message="Resource deleted successfully", 

3013 event_type="resource_deleted", 

3014 component="resource_service", 

3015 user_email=user_email, 

3016 team_id=resource_team_id, 

3017 resource_type="resource", 

3018 resource_id=str(resource_info["id"]), 

3019 custom_fields={ 

3020 "resource_uri": resource_uri, 

3021 "purge_metrics": purge_metrics, 

3022 }, 

3023 ) 

3024 

3025 except PermissionError as pe: 

3026 db.rollback() 

3027 

3028 # Structured logging: Log permission error 

3029 structured_logger.log( 

3030 level="WARNING", 

3031 message="Resource deletion failed due to permission error", 

3032 event_type="resource_delete_permission_denied", 

3033 component="resource_service", 

3034 user_email=user_email, 

3035 resource_type="resource", 

3036 resource_id=str(resource_id), 

3037 error=pe, 

3038 ) 

3039 raise 

3040 except ResourceNotFoundError as rnfe: 

3041 # ResourceNotFoundError is re-raised to be handled in the endpoint. 

3042 # Structured logging: Log not found error 

3043 structured_logger.log( 

3044 level="ERROR", 

3045 message="Resource deletion failed - resource not found", 

3046 event_type="resource_not_found", 

3047 component="resource_service", 

3048 user_email=user_email, 

3049 resource_type="resource", 

3050 resource_id=str(resource_id), 

3051 error=rnfe, 

3052 ) 

3053 raise 

3054 except Exception as e: 

3055 db.rollback() 

3056 

3057 # Structured logging: Log generic resource deletion failure 

3058 structured_logger.log( 

3059 level="ERROR", 

3060 message="Resource deletion failed", 

3061 event_type="resource_deletion_failed", 

3062 component="resource_service", 

3063 user_email=user_email, 

3064 resource_type="resource", 

3065 resource_id=str(resource_id), 

3066 error=e, 

3067 ) 

3068 raise ResourceError(f"Failed to delete resource: {str(e)}") 

3069 

3070 async def get_resource_by_id(self, db: Session, resource_id: str, include_inactive: bool = False) -> ResourceRead: 

3071 """ 

3072 Get a resource by ID. 

3073 

3074 Args: 

3075 db: Database session 

3076 resource_id: Resource ID 

3077 include_inactive: Whether to include inactive resources 

3078 

3079 Returns: 

3080 ResourceRead: The resource object 

3081 

3082 Raises: 

3083 ResourceNotFoundError: If the resource is not found 

3084 

3085 Example: 

3086 >>> from mcpgateway.services.resource_service import ResourceService 

3087 >>> from unittest.mock import MagicMock 

3088 >>> service = ResourceService() 

3089 >>> db = MagicMock() 

3090 >>> resource = MagicMock() 

3091 >>> db.execute.return_value.scalar_one_or_none.return_value = resource 

3092 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read') 

3093 >>> import asyncio 

3094 >>> asyncio.run(service.get_resource_by_id(db, "39334ce0ed2644d79ede8913a66930c9")) 

3095 'resource_read' 

3096 """ 

3097 query = select(DbResource).where(DbResource.id == resource_id) 

3098 

3099 if not include_inactive: 

3100 query = query.where(DbResource.enabled) 

3101 

3102 resource = db.execute(query).scalar_one_or_none() 

3103 

3104 if not resource: 

3105 if not include_inactive: 

3106 # Check if inactive resource exists 

3107 inactive_resource = db.execute(select(DbResource).where(DbResource.id == resource_id).where(not_(DbResource.enabled))).scalar_one_or_none() 

3108 

3109 if inactive_resource: 

3110 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive") 

3111 

3112 raise ResourceNotFoundError(f"Resource not found: {resource_id}") 

3113 

3114 resource_read = self.convert_resource_to_read(resource) 

3115 

3116 structured_logger.log( 

3117 level="INFO", 

3118 message="Resource retrieved successfully", 

3119 event_type="resource_viewed", 

3120 component="resource_service", 

3121 team_id=getattr(resource, "team_id", None), 

3122 resource_type="resource", 

3123 resource_id=str(resource.id), 

3124 custom_fields={ 

3125 "resource_uri": resource.uri, 

3126 "include_inactive": include_inactive, 

3127 }, 

3128 ) 

3129 

3130 return resource_read 

3131 

3132 async def _notify_resource_activated(self, resource: DbResource) -> None: 

3133 """ 

3134 Notify subscribers of resource activation. 

3135 

3136 Args: 

3137 resource: Resource to activate 

3138 """ 

3139 event = { 

3140 "type": "resource_activated", 

3141 "data": { 

3142 "id": resource.id, 

3143 "uri": resource.uri, 

3144 "name": resource.name, 

3145 "enabled": True, 

3146 "visibility": getattr(resource, "visibility", "public"), 

3147 "team_id": getattr(resource, "team_id", None), 

3148 "owner_email": getattr(resource, "owner_email", None), 

3149 }, 

3150 "timestamp": datetime.now(timezone.utc).isoformat(), 

3151 } 

3152 await self._publish_event(event) 

3153 

3154 async def _notify_resource_deactivated(self, resource: DbResource) -> None: 

3155 """ 

3156 Notify subscribers of resource deactivation. 

3157 

3158 Args: 

3159 resource: Resource to deactivate 

3160 """ 

3161 event = { 

3162 "type": "resource_deactivated", 

3163 "data": { 

3164 "id": resource.id, 

3165 "uri": resource.uri, 

3166 "name": resource.name, 

3167 "enabled": False, 

3168 "visibility": getattr(resource, "visibility", "public"), 

3169 "team_id": getattr(resource, "team_id", None), 

3170 "owner_email": getattr(resource, "owner_email", None), 

3171 }, 

3172 "timestamp": datetime.now(timezone.utc).isoformat(), 

3173 } 

3174 await self._publish_event(event) 

3175 

3176 async def _notify_resource_deleted(self, resource_info: Dict[str, Any]) -> None: 

3177 """ 

3178 Notify subscribers of resource deletion. 

3179 

3180 Args: 

3181 resource_info: Dictionary of resource to delete 

3182 """ 

3183 event = { 

3184 "type": "resource_deleted", 

3185 "data": resource_info, 

3186 "timestamp": datetime.now(timezone.utc).isoformat(), 

3187 } 

3188 await self._publish_event(event) 

3189 

3190 async def _notify_resource_removed(self, resource: DbResource) -> None: 

3191 """ 

3192 Notify subscribers of resource removal. 

3193 

3194 Args: 

3195 resource: Resource to remove 

3196 """ 

3197 event = { 

3198 "type": "resource_removed", 

3199 "data": { 

3200 "id": resource.id, 

3201 "uri": resource.uri, 

3202 "name": resource.name, 

3203 "enabled": False, 

3204 "visibility": getattr(resource, "visibility", "public"), 

3205 "team_id": getattr(resource, "team_id", None), 

3206 "owner_email": getattr(resource, "owner_email", None), 

3207 }, 

3208 "timestamp": datetime.now(timezone.utc).isoformat(), 

3209 } 

3210 await self._publish_event(event) 

3211 

3212 async def _event_visible_to_subscriber(self, event: Dict[str, Any], user_email: Optional[str], token_teams: Optional[List[str]]) -> bool: 

3213 """Return whether a resource event is visible to a subscriber context. 

3214 

3215 Args: 

3216 event: Event payload emitted by the resource event stream. 

3217 user_email: Subscriber email. ``None`` only for unrestricted admin context. 

3218 token_teams: Subscriber token team scope. 

3219 

3220 Returns: 

3221 ``True`` when the event is visible to the subscriber, otherwise ``False``. 

3222 """ 

3223 data = event.get("data") if isinstance(event, dict) else None 

3224 if not isinstance(data, dict): 

3225 return False 

3226 

3227 visibility = data.get("visibility") or "public" 

3228 team_id = data.get("team_id") 

3229 owner_email = data.get("owner_email") 

3230 event_resource = SimpleNamespace(visibility=visibility, team_id=team_id, owner_email=owner_email) 

3231 

3232 effective_token_teams = token_teams 

3233 if user_email and effective_token_teams is None: 

3234 # Non-admin scoped flows should pass token_teams explicitly. If not, 

3235 # fail closed to public-only for event filtering. 

3236 effective_token_teams = [] 

3237 

3238 return await self._check_resource_access( 

3239 db=None, # type: ignore[arg-type] 

3240 resource=event_resource, # type: ignore[arg-type] 

3241 user_email=user_email, 

3242 token_teams=effective_token_teams, 

3243 ) 

3244 

3245 async def subscribe_events(self, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> AsyncGenerator[Dict[str, Any], None]: 

3246 """Subscribe to Resource events via the EventService. 

3247 

3248 Args: 

3249 user_email: Requesting user email. ``None`` with ``token_teams=None`` indicates unrestricted admin context. 

3250 token_teams: Token team scope context: 

3251 - ``None`` = unrestricted admin 

3252 - ``[]`` = public-only 

3253 - ``[...]`` = team-scoped access 

3254 

3255 Yields: 

3256 Resource event messages. 

3257 """ 

3258 async for event in self._event_service.subscribe_events(): 

3259 if user_email is None and token_teams is None: 

3260 yield event 

3261 continue 

3262 

3263 if await self._event_visible_to_subscriber(event, user_email, token_teams): 

3264 yield event 

3265 

3266 def _detect_mime_type(self, uri: str, content: Union[str, bytes]) -> str: 

3267 """Detect mime type from URI and content. 

3268 

3269 Args: 

3270 uri: Resource URI 

3271 content: Resource content 

3272 

3273 Returns: 

3274 Detected mime type 

3275 """ 

3276 # Try from URI first 

3277 mime_type, _ = mimetypes.guess_type(uri) 

3278 if mime_type: 

3279 return mime_type 

3280 

3281 # Check content type 

3282 if isinstance(content, str): 

3283 return "text/plain" 

3284 

3285 return "application/octet-stream" 

3286 

3287 async def _read_template_resource(self, db: Session, uri: str, include_inactive: Optional[bool] = False) -> ResourceContent: 

3288 """ 

3289 Read a templated resource. 

3290 

3291 Args: 

3292 db: Database session. 

3293 uri: Template URI with parameters. 

3294 include_inactive: Whether to include inactive resources in DB lookups. 

3295 

3296 Returns: 

3297 ResourceContent: The resolved content from the matching template. 

3298 

3299 Raises: 

3300 ResourceNotFoundError: If no matching template is found. 

3301 ResourceError: For other template resolution errors. 

3302 NotImplementedError: If a binary template resource is encountered. 

3303 """ 

3304 # Find matching template # DRT BREAKPOINT 

3305 template = None 

3306 if not self._template_cache: 

3307 logger.info("_template_cache is empty, fetching exisitng resource templates") 

3308 resource_templates = await self.list_resource_templates(db=db, include_inactive=include_inactive) 

3309 for i in resource_templates: 

3310 self._template_cache[i.name] = i 

3311 for cached in self._template_cache.values(): 

3312 if self._uri_matches_template(uri, cached.uri_template): 

3313 template = cached 

3314 break 

3315 

3316 if template: 

3317 check_inactivity = db.execute(select(DbResource).where(DbResource.id == str(template.id)).where(not_(DbResource.enabled))).scalar_one_or_none() 

3318 if check_inactivity: 

3319 raise ResourceNotFoundError(f"Resource '{template.id}' exists but is inactive") 

3320 else: 

3321 raise ResourceNotFoundError(f"No template matches URI: {uri}") 

3322 

3323 try: 

3324 # Extract parameters 

3325 params = self._extract_template_params(uri, template.uri_template) 

3326 # Generate content 

3327 if template.mime_type and template.mime_type.startswith("text/"): 

3328 content = template.uri_template.format(**params) 

3329 return ResourceContent(type="resource", id=str(template.id) or None, uri=template.uri_template or None, mime_type=template.mime_type or None, text=content) 

3330 # # Handle binary template 

3331 raise NotImplementedError("Binary resource templates not yet supported") 

3332 

3333 except ResourceNotFoundError: 

3334 raise 

3335 except Exception as e: 

3336 raise ResourceError(f"Failed to process template: {str(e)}") from e 

3337 

3338 @staticmethod 

3339 @lru_cache(maxsize=256) 

3340 def _build_regex(template: str) -> re.Pattern: 

3341 """ 

3342 Convert a URI template into a compiled regular expression. 

3343 

3344 This parser supports a subset of RFC 6570–style templates for path 

3345 matching. It extracts path parameters and converts them into named 

3346 regex groups. 

3347 

3348 Supported template features: 

3349 - `{var}` 

3350 A simple path parameter. Matches a single URI segment 

3351 (i.e., any characters except `/`). 

3352 → Translates to `(?P<var>[^/]+)` 

3353 - `{var*}` 

3354 A wildcard parameter. Matches one or more URI segments, 

3355 including `/`. 

3356 → Translates to `(?P<var>.+)` 

3357 - `{?var1,var2}` 

3358 Query-parameter expressions. These are ignored when building 

3359 the regex for path matching and are stripped from the template. 

3360 

3361 Example: 

3362 Template: "files://root/{path*}/meta/{id}{?expand,debug}" 

3363 Regex: r"^files://root/(?P<path>.+)/meta/(?P<id>[^/]+)$" 

3364 

3365 Args: 

3366 template: The URI template string containing parameter expressions. 

3367 

3368 Returns: 

3369 A compiled regular expression (re.Pattern) that can be used to 

3370 match URIs and extract parameter values. 

3371 

3372 Note: 

3373 Results are cached using LRU cache (maxsize=256) to avoid 

3374 recompiling the same template pattern repeatedly. 

3375 """ 

3376 # Remove query parameter syntax for path matching 

3377 template_without_query = re.sub(r"\{\?[^}]+\}", "", template) 

3378 

3379 parts = re.split(r"(\{[^}]+\})", template_without_query) 

3380 pattern = "" 

3381 for part in parts: 

3382 if part.startswith("{") and part.endswith("}"): 

3383 name = part[1:-1] 

3384 if name.endswith("*"): 

3385 name = name[:-1] 

3386 pattern += f"(?P<{name}>.+)" 

3387 else: 

3388 pattern += f"(?P<{name}>[^/]+)" 

3389 else: 

3390 pattern += re.escape(part) 

3391 return re.compile(f"^{pattern}$") 

3392 

3393 @staticmethod 

3394 @lru_cache(maxsize=256) 

3395 def _compile_parse_pattern(template: str) -> parse.Parser: 

3396 """ 

3397 Compile a parse pattern for URI template parameter extraction. 

3398 

3399 Args: 

3400 template: The template pattern (e.g. "file:///{name}/{id}"). 

3401 

3402 Returns: 

3403 Compiled parse.Parser object. 

3404 

3405 Note: 

3406 Results are cached using LRU cache (maxsize=256) to avoid 

3407 recompiling the same template pattern repeatedly. 

3408 """ 

3409 return parse.compile(template) 

3410 

3411 def _extract_template_params(self, uri: str, template: str) -> Dict[str, str]: 

3412 """ 

3413 Extract parameters from a URI based on a template. 

3414 

3415 Args: 

3416 uri: The actual URI containing parameter values. 

3417 template: The template pattern (e.g. "file:///{name}/{id}"). 

3418 

3419 Returns: 

3420 Dict of parameter names and extracted values. 

3421 

3422 Note: 

3423 Uses cached compiled parse patterns for better performance. 

3424 """ 

3425 parser = self._compile_parse_pattern(template) 

3426 result = parser.parse(uri) 

3427 return result.named if result else {} 

3428 

3429 def _uri_matches_template(self, uri: str, template: str) -> bool: 

3430 """ 

3431 Check whether a URI matches a given template pattern. 

3432 

3433 Args: 

3434 uri: The URI to check. 

3435 template: The template pattern. 

3436 

3437 Returns: 

3438 True if the URI matches the template, otherwise False. 

3439 

3440 Note: 

3441 Uses cached compiled regex patterns for better performance. 

3442 """ 

3443 uri_path, _, _ = uri.partition("?") 

3444 regex = self._build_regex(template) 

3445 return bool(regex.match(uri_path)) 

3446 

3447 async def _notify_resource_added(self, resource: DbResource) -> None: 

3448 """ 

3449 Notify subscribers of resource addition. 

3450 

3451 Args: 

3452 resource: Resource to add 

3453 """ 

3454 event = { 

3455 "type": "resource_added", 

3456 "data": { 

3457 "id": resource.id, 

3458 "uri": resource.uri, 

3459 "name": resource.name, 

3460 "description": resource.description, 

3461 "enabled": resource.enabled, 

3462 "visibility": getattr(resource, "visibility", "public"), 

3463 "team_id": getattr(resource, "team_id", None), 

3464 "owner_email": getattr(resource, "owner_email", None), 

3465 }, 

3466 "timestamp": datetime.now(timezone.utc).isoformat(), 

3467 } 

3468 await self._publish_event(event) 

3469 

3470 async def _notify_resource_updated(self, resource: DbResource) -> None: 

3471 """ 

3472 Notify subscribers of resource update. 

3473 

3474 Args: 

3475 resource: Resource to update 

3476 """ 

3477 event = { 

3478 "type": "resource_updated", 

3479 "data": { 

3480 "id": resource.id, 

3481 "uri": resource.uri, 

3482 "enabled": resource.enabled, 

3483 "visibility": getattr(resource, "visibility", "public"), 

3484 "team_id": getattr(resource, "team_id", None), 

3485 "owner_email": getattr(resource, "owner_email", None), 

3486 }, 

3487 "timestamp": datetime.now(timezone.utc).isoformat(), 

3488 } 

3489 await self._publish_event(event) 

3490 

3491 async def _publish_event(self, event: Dict[str, Any]) -> None: 

3492 """ 

3493 Publish event to all subscribers via the EventService. 

3494 

3495 Args: 

3496 event: Event to publish 

3497 """ 

3498 await self._event_service.publish_event(event) 

3499 

3500 # --- Resource templates --- 

3501 async def list_resource_templates( 

3502 self, 

3503 db: Session, 

3504 include_inactive: bool = False, 

3505 user_email: Optional[str] = None, 

3506 token_teams: Optional[List[str]] = None, 

3507 tags: Optional[List[str]] = None, 

3508 visibility: Optional[str] = None, 

3509 server_id: Optional[str] = None, 

3510 ) -> List[ResourceTemplate]: 

3511 """ 

3512 List resource templates with visibility-based access control. 

3513 

3514 Args: 

3515 db: Database session 

3516 include_inactive: Whether to include inactive templates 

3517 user_email: Email of requesting user (for private visibility check) 

3518 token_teams: Teams from JWT. None = admin (no filtering), 

3519 [] = public-only (no owner access), [...] = team-scoped 

3520 tags (Optional[List[str]]): Filter resources by tags. If provided, only resources with at least one matching tag will be returned. 

3521 visibility (Optional[str]): Filter by visibility (private, team, public). 

3522 server_id (Optional[str]): Filter by server ID. If provided, only templates associated with this server will be returned. 

3523 

3524 Returns: 

3525 List of ResourceTemplate objects the user has access to 

3526 

3527 Examples: 

3528 >>> from mcpgateway.services.resource_service import ResourceService 

3529 >>> from unittest.mock import MagicMock, patch 

3530 >>> service = ResourceService() 

3531 >>> db = MagicMock() 

3532 >>> template_obj = MagicMock() 

3533 >>> db.execute.return_value.scalars.return_value.all.return_value = [template_obj] 

3534 >>> with patch('mcpgateway.services.resource_service.ResourceTemplate') as MockResourceTemplate: 

3535 ... MockResourceTemplate.model_validate.return_value = 'resource_template' 

3536 ... import asyncio 

3537 ... result = asyncio.run(service.list_resource_templates(db)) 

3538 ... result == ['resource_template'] 

3539 True 

3540 """ 

3541 query = select(DbResource).where(DbResource.uri_template.isnot(None)) 

3542 

3543 # Filter by server_id if provided (same pattern as list_server_resources) 

3544 if server_id: 

3545 query = query.join(server_resource_association, DbResource.id == server_resource_association.c.resource_id).where(server_resource_association.c.server_id == server_id) 

3546 

3547 if not include_inactive: 

3548 query = query.where(DbResource.enabled) 

3549 

3550 # Apply visibility filtering when token_teams is set (non-admin access) 

3551 if token_teams is not None: 

3552 # Check if this is a public-only token (empty teams array) 

3553 # Public-only tokens can ONLY see public templates - no owner access 

3554 is_public_only_token = len(token_teams) == 0 

3555 

3556 conditions = [DbResource.visibility == "public"] 

3557 

3558 # Only include owner access for non-public-only tokens with user_email 

3559 if not is_public_only_token and user_email: 

3560 conditions.append(DbResource.owner_email == user_email) 

3561 

3562 if token_teams: 

3563 conditions.append(and_(DbResource.team_id.in_(token_teams), DbResource.visibility.in_(["team", "public"]))) 

3564 

3565 query = query.where(or_(*conditions)) 

3566 

3567 # Cursor-based pagination logic can be implemented here in the future. 

3568 if visibility: 

3569 query = query.where(DbResource.visibility == visibility) 

3570 

3571 if tags: 

3572 query = query.where(json_contains_tag_expr(db, DbResource.tags, tags, match_any=True)) 

3573 

3574 templates = db.execute(query).scalars().all() 

3575 result = [ResourceTemplate.model_validate(t) for t in templates] 

3576 return result 

3577 

3578 # --- Metrics --- 

3579 async def aggregate_metrics(self, db: Session) -> ResourceMetrics: 

3580 """ 

3581 Aggregate metrics for all resource invocations across all resources. 

3582 

3583 Combines recent raw metrics (within retention period) with historical 

3584 hourly rollups for complete historical coverage. Uses in-memory caching 

3585 (10s TTL) to reduce database load under high request rates. 

3586 

3587 Args: 

3588 db: Database session 

3589 

3590 Returns: 

3591 ResourceMetrics: Aggregated metrics from raw + hourly rollup tables. 

3592 

3593 Examples: 

3594 >>> from mcpgateway.services.resource_service import ResourceService 

3595 >>> service = ResourceService() 

3596 >>> # Method exists and is callable 

3597 >>> callable(service.aggregate_metrics) 

3598 True 

3599 """ 

3600 # Check cache first (if enabled) 

3601 # First-Party 

3602 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel 

3603 

3604 if is_cache_enabled(): 

3605 cached = metrics_cache.get("resources") 

3606 if cached is not None: 

3607 return ResourceMetrics(**cached) 

3608 

3609 # Use combined raw + rollup query for full historical coverage 

3610 # First-Party 

3611 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel 

3612 

3613 result = aggregate_metrics_combined(db, "resource") 

3614 

3615 metrics = ResourceMetrics( 

3616 total_executions=result.total_executions, 

3617 successful_executions=result.successful_executions, 

3618 failed_executions=result.failed_executions, 

3619 failure_rate=result.failure_rate, 

3620 min_response_time=result.min_response_time, 

3621 max_response_time=result.max_response_time, 

3622 avg_response_time=result.avg_response_time, 

3623 last_execution_time=result.last_execution_time, 

3624 ) 

3625 

3626 # Cache the result as dict for serialization compatibility (if enabled) 

3627 if is_cache_enabled(): 

3628 metrics_cache.set("resources", metrics.model_dump()) 

3629 

3630 return metrics 

3631 

3632 async def reset_metrics(self, db: Session) -> None: 

3633 """ 

3634 Reset all resource metrics by deleting raw and hourly rollup records. 

3635 

3636 Args: 

3637 db: Database session 

3638 

3639 Examples: 

3640 >>> from mcpgateway.services.resource_service import ResourceService 

3641 >>> from unittest.mock import MagicMock 

3642 >>> service = ResourceService() 

3643 >>> db = MagicMock() 

3644 >>> db.execute = MagicMock() 

3645 >>> db.commit = MagicMock() 

3646 >>> import asyncio 

3647 >>> asyncio.run(service.reset_metrics(db)) 

3648 """ 

3649 db.execute(delete(ResourceMetric)) 

3650 db.execute(delete(ResourceMetricsHourly)) 

3651 db.commit() 

3652 

3653 # Invalidate metrics cache 

3654 # First-Party 

3655 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

3656 

3657 metrics_cache.invalidate("resources") 

3658 metrics_cache.invalidate_prefix("top_resources:") 

3659 

3660 

3661# Lazy singleton - created on first access, not at module import time. 

3662# This avoids instantiation when only exception classes are imported. 

3663_resource_service_instance = None # pylint: disable=invalid-name 

3664 

3665 

3666def __getattr__(name: str): 

3667 """Module-level __getattr__ for lazy singleton creation. 

3668 

3669 Args: 

3670 name: The attribute name being accessed. 

3671 

3672 Returns: 

3673 The resource_service singleton instance if name is "resource_service". 

3674 

3675 Raises: 

3676 AttributeError: If the attribute name is not "resource_service". 

3677 """ 

3678 global _resource_service_instance # pylint: disable=global-statement 

3679 if name == "resource_service": 

3680 if _resource_service_instance is None: 

3681 _resource_service_instance = ResourceService() 

3682 return _resource_service_instance 

3683 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")