Coverage for mcpgateway / services / resource_service.py: 98%

1297 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/resource_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Resource Service Implementation. 

8This module implements resource management according to the MCP specification. 

9It handles: 

10- Resource registration and retrieval 

11- Resource templates and URI handling 

12- Resource subscriptions and updates 

13- Content type management 

14- Active/inactive resource management 

15 

16Examples: 

17 >>> from mcpgateway.services.resource_service import ResourceService, ResourceError 

18 >>> service = ResourceService() 

19 >>> isinstance(service._event_service, EventService) 

20 True 

21""" 

22 

23# Standard 

24import binascii 

25from datetime import datetime, timezone 

26from functools import lru_cache 

27import mimetypes 

28import os 

29import re 

30import ssl 

31import time 

32from types import SimpleNamespace 

33from typing import Any, AsyncGenerator, Dict, List, Optional, Union 

34import uuid 

35 

36# Third-Party 

37import httpx 

38from mcp import ClientSession 

39from mcp.client.sse import sse_client 

40from mcp.client.streamable_http import streamablehttp_client 

41import parse 

42from pydantic import ValidationError 

43from sqlalchemy import and_, delete, desc, not_, or_, select 

44from sqlalchemy.exc import IntegrityError, MultipleResultsFound, OperationalError 

45from sqlalchemy.orm import joinedload, selectinload, Session 

46 

47# First-Party 

48from mcpgateway.common.models import ResourceContent, ResourceContents, ResourceTemplate, TextContent 

49from mcpgateway.common.validators import SecurityValidator 

50from mcpgateway.config import settings 

51from mcpgateway.db import EmailTeam 

52from mcpgateway.db import EmailTeamMember as DbEmailTeamMember 

53from mcpgateway.db import fresh_db_session 

54from mcpgateway.db import Gateway as DbGateway 

55from mcpgateway.db import get_for_update 

56from mcpgateway.db import Resource as DbResource 

57from mcpgateway.db import ResourceMetric, ResourceMetricsHourly 

58from mcpgateway.db import ResourceSubscription as DbSubscription 

59from mcpgateway.db import server_resource_association 

60from mcpgateway.observability import create_span, set_span_attribute, set_span_error 

61from mcpgateway.schemas import ResourceCreate, ResourceMetrics, ResourceRead, ResourceSubscription, ResourceUpdate, TopPerformer 

62from mcpgateway.services.audit_trail_service import get_audit_trail_service 

63from mcpgateway.services.base_service import BaseService 

64from mcpgateway.services.content_security import ContentSizeError, ContentTypeError, get_content_security_service 

65from mcpgateway.services.event_service import EventService 

66from mcpgateway.services.logging_service import LoggingService 

67from mcpgateway.services.mcp_session_pool import get_mcp_session_pool, TransportType 

68from mcpgateway.services.metrics_buffer_service import get_metrics_buffer_service 

69from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge 

70from mcpgateway.services.oauth_manager import OAuthManager 

71from mcpgateway.services.observability_service import current_trace_id, ObservabilityService 

72from mcpgateway.services.structured_logger import get_structured_logger 

73from mcpgateway.utils.gateway_access import build_gateway_auth_headers, check_gateway_access 

74from mcpgateway.utils.metrics_common import build_top_performers 

75from mcpgateway.utils.pagination import unified_paginate 

76from mcpgateway.utils.services_auth import decode_auth 

77from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr 

78from mcpgateway.utils.ssl_context_cache import get_cached_ssl_context 

79from mcpgateway.utils.trace_context import format_trace_team_scope 

80from mcpgateway.utils.trace_redaction import is_input_capture_enabled, is_output_capture_enabled, serialize_trace_payload 

81from mcpgateway.utils.url_auth import apply_query_param_auth, sanitize_exception_message 

82from mcpgateway.utils.validate_signature import validate_signature 

83 

84# Plugin support imports (conditional) 

85try: 

86 # First-Party 

87 from mcpgateway.plugins.framework import get_plugin_manager, GlobalContext, PluginContextTable, ResourceHookType, ResourcePostFetchPayload, ResourcePreFetchPayload 

88 

89 PLUGINS_AVAILABLE = True 

90except ImportError: 

91 PLUGINS_AVAILABLE = False 

92 

93# Cache import (lazy to avoid circular dependencies) 

94_REGISTRY_CACHE = None 

95 

96 

97def _get_registry_cache(): 

98 """Get registry cache singleton lazily. 

99 

100 Returns: 

101 RegistryCache instance. 

102 """ 

103 global _REGISTRY_CACHE # pylint: disable=global-statement 

104 if _REGISTRY_CACHE is None: 

105 # First-Party 

106 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel 

107 

108 _REGISTRY_CACHE = registry_cache 

109 return _REGISTRY_CACHE 

110 

111 

112# Initialize logging service first 

113logging_service = LoggingService() 

114logger = logging_service.get_logger(__name__) 

115 

116# Initialize structured logger, audit trail, and metrics buffer for resource operations 

117structured_logger = get_structured_logger("resource_service") 

118audit_trail = get_audit_trail_service() 

119metrics_buffer = get_metrics_buffer_service() 

120 

121 

122class ResourceError(Exception): 

123 """Base class for resource-related errors.""" 

124 

125 

126class ResourceNotFoundError(ResourceError): 

127 """Raised when a requested resource is not found.""" 

128 

129 

130class ResourceURIConflictError(ResourceError): 

131 """Raised when a resource URI conflicts with existing (active or inactive) resource.""" 

132 

133 def __init__(self, uri: str, enabled: bool = True, resource_id: Optional[int] = None, visibility: str = "public") -> None: 

134 """Initialize the error with resource information. 

135 

136 Args: 

137 uri: The conflicting resource URI 

138 enabled: Whether the existing resource is active 

139 resource_id: ID of the existing resource if available 

140 visibility: Visibility status of the resource 

141 """ 

142 self.uri = uri 

143 self.enabled = enabled 

144 self.resource_id = resource_id 

145 message = f"{visibility.capitalize()} Resource already exists with URI: {uri}" 

146 logger.info(f"ResourceURIConflictError: {message}") 

147 if not enabled: 

148 message += f" (currently inactive, ID: {resource_id})" 

149 super().__init__(message) 

150 

151 

152class ResourceValidationError(ResourceError): 

153 """Raised when resource validation fails.""" 

154 

155 

156class ResourceLockConflictError(ResourceError): 

157 """Raised when a resource row is locked by another transaction. 

158 

159 Raises: 

160 ResourceLockConflictError: When attempting to modify a resource that is 

161 currently locked by another concurrent request. 

162 """ 

163 

164 

165def _validate_resource_team_assignment(db: Session, user_email: Optional[str], target_team_id: Optional[str]) -> None: 

166 """Validate team assignment for resource updates. 

167 

168 Args: 

169 db: Database session used for membership checks. 

170 user_email: Requesting user email. When omitted, ownership checks are skipped. 

171 target_team_id: Team identifier to validate. 

172 

173 Raises: 

174 ValueError: If team does not exist or caller lacks ownership. 

175 """ 

176 if not target_team_id: 

177 raise ValueError("Cannot set visibility to 'team' without a team_id") 

178 

179 team = db.query(EmailTeam).filter(EmailTeam.id == target_team_id).first() 

180 if not team: 

181 raise ValueError(f"Team {target_team_id} not found") 

182 

183 if not user_email: 

184 return 

185 

186 membership = ( 

187 db.query(DbEmailTeamMember) 

188 .filter(DbEmailTeamMember.team_id == target_team_id, DbEmailTeamMember.user_email == user_email, DbEmailTeamMember.is_active, DbEmailTeamMember.role == "owner") 

189 .first() 

190 ) 

191 if not membership: 

192 raise ValueError("User membership in team not sufficient for this update.") 

193 

194 

195class ResourceService(BaseService): 

196 """Service for managing resources. 

197 

198 Handles: 

199 - Resource registration and retrieval 

200 - Resource templates and URIs 

201 - Resource subscriptions 

202 - Content type detection 

203 - Active/inactive status management 

204 """ 

205 

206 _visibility_model_cls = DbResource 

207 

208 def __init__(self) -> None: 

209 """Initialize the resource service.""" 

210 self._event_service = EventService(channel_name="mcpgateway:resource_events") 

211 self._template_cache: Dict[str, ResourceTemplate] = {} 

212 self.oauth_manager = OAuthManager(request_timeout=int(os.getenv("OAUTH_REQUEST_TIMEOUT", "30")), max_retries=int(os.getenv("OAUTH_MAX_RETRIES", "3"))) 

213 

214 # Initialize plugin manager if plugins are enabled in settings 

215 self._plugin_manager = None 

216 if PLUGINS_AVAILABLE: 

217 try: 

218 self._plugin_manager = get_plugin_manager() 

219 if self._plugin_manager: 

220 logger.info("Plugin manager initialized for ResourceService with config: %s", settings.plugins.config_file) 

221 except Exception as e: 

222 logger.warning("Plugin manager initialization failed in ResourceService: %s", e) 

223 self._plugin_manager = None 

224 

225 # Initialize mime types 

226 mimetypes.init() 

227 

228 async def initialize(self) -> None: 

229 """Initialize the service.""" 

230 logger.info("Initializing resource service") 

231 await self._event_service.initialize() 

232 

233 async def shutdown(self) -> None: 

234 """Shutdown the service.""" 

235 # Clear subscriptions 

236 await self._event_service.shutdown() 

237 logger.info("Resource service shutdown complete") 

238 

239 async def get_top_resources(self, db: Session, limit: Optional[int] = 5, include_deleted: bool = False) -> List[TopPerformer]: 

240 """Retrieve the top-performing resources based on execution count. 

241 

242 Queries the database to get resources with their metrics, ordered by the number of executions 

243 in descending order. Combines recent raw metrics with historical hourly rollups for complete 

244 historical coverage. Uses the resource URI as the name field for TopPerformer objects. 

245 Returns a list of TopPerformer objects containing resource details and performance metrics. 

246 Results are cached for performance. 

247 

248 Args: 

249 db (Session): Database session for querying resource metrics. 

250 limit (Optional[int]): Maximum number of resources to return. Defaults to 5. 

251 include_deleted (bool): Whether to include deleted resources from rollups. 

252 

253 Returns: 

254 List[TopPerformer]: A list of TopPerformer objects, each containing: 

255 - id: Resource ID. 

256 - name: Resource URI (used as the name field). 

257 - execution_count: Total number of executions. 

258 - avg_response_time: Average response time in seconds, or None if no metrics. 

259 - success_rate: Success rate percentage, or None if no metrics. 

260 - last_execution: Timestamp of the last execution, or None if no metrics. 

261 """ 

262 # Check cache first (if enabled) 

263 # First-Party 

264 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel 

265 

266 effective_limit = limit or 5 

267 cache_key = f"top_resources:{effective_limit}:include_deleted={include_deleted}" 

268 

269 if is_cache_enabled(): 

270 cached = metrics_cache.get(cache_key) 

271 if cached is not None: 

272 return cached 

273 

274 # Use combined query that includes both raw metrics and rollup data 

275 # Use name_column="uri" to maintain backward compatibility (resources show URI as name) 

276 # First-Party 

277 from mcpgateway.services.metrics_query_service import get_top_performers_combined # pylint: disable=import-outside-toplevel 

278 

279 results = get_top_performers_combined( 

280 db=db, 

281 metric_type="resource", 

282 entity_model=DbResource, 

283 limit=effective_limit, 

284 name_column="uri", # Resources use URI as display name 

285 include_deleted=include_deleted, 

286 ) 

287 top_performers = build_top_performers(results) 

288 

289 # Cache the result (if enabled) 

290 if is_cache_enabled(): 

291 metrics_cache.set(cache_key, top_performers) 

292 

293 return top_performers 

294 

295 def convert_resource_to_read(self, resource: DbResource, include_metrics: bool = False) -> ResourceRead: 

296 """ 

297 Converts a DbResource instance into a ResourceRead model, optionally including aggregated metrics. 

298 

299 Args: 

300 resource (DbResource): The ORM instance of the resource. 

301 include_metrics (bool): Whether to include metrics in the result. Defaults to False. 

302 Set to False for list operations to avoid N+1 query issues. 

303 

304 Returns: 

305 ResourceRead: The Pydantic model representing the resource, optionally including aggregated metrics. 

306 

307 Examples: 

308 >>> from types import SimpleNamespace 

309 >>> from datetime import datetime, timezone 

310 >>> svc = ResourceService() 

311 >>> now = datetime.now(timezone.utc) 

312 >>> # Fake metrics 

313 >>> m1 = SimpleNamespace(is_success=True, response_time=0.1, timestamp=now) 

314 >>> m2 = SimpleNamespace(is_success=False, response_time=0.3, timestamp=now) 

315 >>> r = SimpleNamespace( 

316 ... id="ca627760127d409080fdefc309147e08", uri='res://x', name='R', description=None, mime_type='text/plain', size=123, 

317 ... created_at=now, updated_at=now, enabled=True, tags=[{"id": "t", "label": "T"}], metrics=[m1, m2], 

318 ... metrics_summary={"total_executions": 2, "successful_executions": 1, "failed_executions": 1, 

319 ... "failure_rate": 0.5, "min_response_time": 0.1, "max_response_time": 0.3, 

320 ... "avg_response_time": 0.2, "last_execution_time": now} 

321 ... ) 

322 >>> out = svc.convert_resource_to_read(r, include_metrics=True) 

323 >>> out.metrics.total_executions 

324 2 

325 >>> out.metrics.successful_executions 

326 1 

327 """ 

328 resource_dict = resource.__dict__.copy() 

329 # Remove SQLAlchemy state and any pre-existing 'metrics' attribute 

330 resource_dict.pop("_sa_instance_state", None) 

331 resource_dict.pop("metrics", None) 

332 

333 # Ensure required base fields are present even if SQLAlchemy hasn't loaded them into __dict__ yet 

334 resource_dict["id"] = getattr(resource, "id", resource_dict.get("id")) 

335 resource_dict["uri"] = getattr(resource, "uri", resource_dict.get("uri")) 

336 resource_dict["name"] = getattr(resource, "name", resource_dict.get("name")) 

337 resource_dict["description"] = getattr(resource, "description", resource_dict.get("description")) 

338 resource_dict["mime_type"] = getattr(resource, "mime_type", resource_dict.get("mime_type")) 

339 resource_dict["size"] = getattr(resource, "size", resource_dict.get("size")) 

340 resource_dict["created_at"] = getattr(resource, "created_at", resource_dict.get("created_at")) 

341 resource_dict["updated_at"] = getattr(resource, "updated_at", resource_dict.get("updated_at")) 

342 resource_dict["is_active"] = getattr(resource, "is_active", resource_dict.get("is_active")) 

343 resource_dict["enabled"] = getattr(resource, "enabled", resource_dict.get("enabled")) 

344 

345 # Compute aggregated metrics from the resource's metrics list (only if requested) 

346 if include_metrics: 

347 # Use metrics_summary which combines raw + hourly rollup data (matches tool_service pattern) 

348 metrics = resource.metrics_summary 

349 resource_dict["metrics"] = { 

350 "total_executions": metrics["total_executions"], 

351 "successful_executions": metrics["successful_executions"], 

352 "failed_executions": metrics["failed_executions"], 

353 "failure_rate": metrics["failure_rate"], 

354 "min_response_time": metrics["min_response_time"], 

355 "max_response_time": metrics["max_response_time"], 

356 "avg_response_time": metrics["avg_response_time"], 

357 "last_execution_time": metrics["last_execution_time"], 

358 } 

359 else: 

360 resource_dict["metrics"] = None 

361 

362 raw_tags = resource.tags or [] 

363 normalized_tags = [] 

364 for tag in raw_tags: 

365 if isinstance(tag, str): 

366 normalized_tags.append(tag) 

367 continue 

368 if isinstance(tag, dict): 

369 label = tag.get("label") or tag.get("name") 

370 if label: 

371 normalized_tags.append(label) 

372 continue 

373 label = getattr(tag, "label", None) or getattr(tag, "name", None) 

374 if label: 

375 normalized_tags.append(label) 

376 resource_dict["tags"] = normalized_tags 

377 resource_dict["team"] = getattr(resource, "team", None) 

378 

379 # Include metadata fields for proper API response 

380 resource_dict["created_by"] = getattr(resource, "created_by", None) 

381 resource_dict["modified_by"] = getattr(resource, "modified_by", None) 

382 resource_dict["created_at"] = getattr(resource, "created_at", None) 

383 resource_dict["updated_at"] = getattr(resource, "updated_at", None) 

384 resource_dict["version"] = getattr(resource, "version", None) 

385 resource_dict["gateway_id"] = getattr(resource, "gateway_id", None) 

386 return ResourceRead.model_validate(resource_dict) 

387 

388 def _get_team_name(self, db: Session, team_id: Optional[str]) -> Optional[str]: 

389 """Retrieve the team name given a team ID. 

390 

391 Args: 

392 db (Session): Database session for querying teams. 

393 team_id (Optional[str]): The ID of the team. 

394 

395 Returns: 

396 Optional[str]: The name of the team if found, otherwise None. 

397 """ 

398 if not team_id: 

399 return None 

400 team = db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first() 

401 db.commit() # Release transaction to avoid idle-in-transaction 

402 return team.name if team else None 

403 

404 async def register_resource( 

405 self, 

406 db: Session, 

407 resource: ResourceCreate, 

408 created_by: Optional[str] = None, 

409 created_from_ip: Optional[str] = None, 

410 created_via: Optional[str] = None, 

411 created_user_agent: Optional[str] = None, 

412 import_batch_id: Optional[str] = None, 

413 federation_source: Optional[str] = None, 

414 team_id: Optional[str] = None, 

415 owner_email: Optional[str] = None, 

416 visibility: Optional[str] = "public", 

417 ) -> ResourceRead: 

418 """Register a new resource. 

419 

420 MIME Type Resolution Priority: 

421 1. **User-provided type** (highest priority) - Caller explicitly declares content type 

422 2. **URI-detected type** - Fallback via ``mimetypes.guess_type(uri)`` 

423 3. **Content-based fallback** - ``text/plain`` for strings, ``application/octet-stream`` for bytes 

424 

425 Args: 

426 db: Database session 

427 resource: Resource creation schema 

428 created_by: User who created the resource 

429 created_from_ip: IP address of the creator 

430 created_via: Method used to create the resource (e.g., API, UI) 

431 created_user_agent: User agent of the creator 

432 import_batch_id: Optional batch ID for bulk imports 

433 federation_source: Optional source of the resource if federated 

434 team_id (Optional[str]): Team ID to assign the resource to. 

435 owner_email (Optional[str]): Email of the user who owns this resource. 

436 visibility (str): Resource visibility level (private, team, public). 

437 

438 Returns: 

439 Created resource information 

440 

441 Raises: 

442 IntegrityError: If a database integrity error occurs. 

443 ResourceURIConflictError: If a resource with the same URI already exists. 

444 ResourceError: For other resource registration errors 

445 ContentSizeError: For content size exceed 

446 ContentTypeError: If the MIME type is not allowed 

447 

448 Examples: 

449 >>> from mcpgateway.services.resource_service import ResourceService 

450 >>> from unittest.mock import MagicMock, AsyncMock, patch 

451 >>> from mcpgateway.schemas import ResourceRead 

452 >>> service = ResourceService() 

453 >>> db = MagicMock() 

454 >>> resource = MagicMock() 

455 >>> resource.uri = "test://example" 

456 >>> resource.content = "test content" 

457 >>> resource.mime_type = None 

458 >>> db.execute.return_value.scalar_one_or_none.return_value = None 

459 >>> db.add = MagicMock() 

460 >>> db.commit = MagicMock() 

461 >>> db.refresh = MagicMock() 

462 >>> service._notify_resource_added = AsyncMock() 

463 >>> service._detect_mime_type_from_uri = MagicMock(return_value=None) 

464 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read') 

465 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read') 

466 >>> import asyncio 

467 >>> asyncio.run(service.register_resource(db, resource)) 

468 'resource_read' 

469 """ 

470 try: 

471 logger.info(f"Registering resource: {resource.uri}") 

472 

473 # Validate content size BEFORE any database operations 

474 content_security = get_content_security_service() 

475 content_to_validate = "" 

476 

477 # Extract content from resource for validation 

478 # Use raw bytes for accurate size measurement to prevent bypass via non-UTF-8 content 

479 if hasattr(resource, "content") and resource.content: 

480 if isinstance(resource.content, bytes): 

481 # Validate using raw bytes to get accurate size 

482 content_to_validate = resource.content 

483 else: 

484 # Convert string to bytes for consistent size measurement 

485 content_to_validate = str(resource.content) 

486 

487 content_security.validate_resource_size(content=content_to_validate, uri=resource.uri, user_email=created_by, ip_address=created_from_ip) 

488 

489 # MIME type resolution priority: 

490 # 1. User-provided type (caller explicitly declares the content type) 

491 # 2. URI-detected type (fallback when user omits the field) 

492 # 3. Content-based fallback (text/plain for str, application/octet-stream for bytes) 

493 if resource.mime_type: 

494 mime_type = resource.mime_type 

495 else: 

496 mime_type = self._detect_mime_type(resource.uri, resource.content) 

497 logger.info(f"Auto-detected MIME type for {resource.uri}: {mime_type}") 

498 

499 # Validate MIME type against allowlist 

500 content_security.validate_resource_mime_type( 

501 mime_type=mime_type, 

502 uri=resource.uri, 

503 user_email=created_by, 

504 ip_address=created_from_ip, 

505 ) 

506 

507 # Extract gateway_id from resource if present 

508 gateway_id = getattr(resource, "gateway_id", None) 

509 

510 # Check for existing server with the same uri 

511 if visibility.lower() == "public": 

512 logger.info(f"visibility:: {visibility}") 

513 # Check for existing public resource with the same uri and gateway_id 

514 existing_resource = db.execute(select(DbResource).where(DbResource.uri == resource.uri, DbResource.visibility == "public", DbResource.gateway_id == gateway_id)).scalar_one_or_none() 

515 if existing_resource: 

516 raise ResourceURIConflictError(resource.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility) 

517 elif visibility.lower() == "team" and team_id: 

518 # Check for existing team resource with the same uri and gateway_id 

519 existing_resource = db.execute( 

520 select(DbResource).where(DbResource.uri == resource.uri, DbResource.visibility == "team", DbResource.team_id == team_id, DbResource.gateway_id == gateway_id) 

521 ).scalar_one_or_none() 

522 if existing_resource: 

523 raise ResourceURIConflictError(resource.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility) 

524 

525 # Determine content storage (mime_type already detected above) 

526 is_text = mime_type and mime_type.startswith("text/") or isinstance(resource.content, str) 

527 

528 # Create DB model 

529 db_resource = DbResource( 

530 uri=resource.uri, 

531 name=resource.name, 

532 title=resource.title, 

533 description=resource.description, 

534 mime_type=mime_type, 

535 uri_template=resource.uri_template, 

536 text_content=resource.content if is_text else None, 

537 binary_content=(resource.content.encode() if is_text and isinstance(resource.content, str) else resource.content if isinstance(resource.content, bytes) else None), 

538 size=len(resource.content) if resource.content else 0, 

539 tags=resource.tags or [], 

540 created_by=created_by, 

541 created_from_ip=created_from_ip, 

542 created_via=created_via, 

543 created_user_agent=created_user_agent, 

544 import_batch_id=import_batch_id, 

545 federation_source=federation_source, 

546 version=1, 

547 # Team scoping fields - use schema values if provided, otherwise fallback to parameters 

548 team_id=getattr(resource, "team_id", None) or team_id, 

549 owner_email=getattr(resource, "owner_email", None) or owner_email or created_by, 

550 # Endpoint visibility parameter takes precedence over schema default 

551 visibility=visibility if visibility is not None else (getattr(resource, "visibility", None) or "public"), 

552 gateway_id=gateway_id, 

553 ) 

554 

555 # Add to DB 

556 db.add(db_resource) 

557 db.commit() 

558 db.refresh(db_resource) 

559 

560 # Notify subscribers 

561 await self._notify_resource_added(db_resource) 

562 

563 logger.info(f"Registered resource: {resource.uri}") 

564 

565 # Structured logging: Audit trail for resource creation 

566 audit_trail.log_action( 

567 user_id=created_by or "system", 

568 action="create_resource", 

569 resource_type="resource", 

570 resource_id=str(db_resource.id), 

571 resource_name=db_resource.name, 

572 user_email=owner_email, 

573 team_id=team_id, 

574 client_ip=created_from_ip, 

575 user_agent=created_user_agent, 

576 new_values={ 

577 "uri": db_resource.uri, 

578 "name": db_resource.name, 

579 "visibility": visibility, 

580 "mime_type": db_resource.mime_type, 

581 }, 

582 context={ 

583 "created_via": created_via, 

584 "import_batch_id": import_batch_id, 

585 "federation_source": federation_source, 

586 }, 

587 db=db, 

588 ) 

589 

590 # Structured logging: Log successful resource creation 

591 structured_logger.log( 

592 level="INFO", 

593 message="Resource created successfully", 

594 event_type="resource_created", 

595 component="resource_service", 

596 user_id=created_by, 

597 user_email=owner_email, 

598 team_id=team_id, 

599 resource_type="resource", 

600 resource_id=str(db_resource.id), 

601 custom_fields={ 

602 "resource_uri": db_resource.uri, 

603 "resource_name": db_resource.name, 

604 "visibility": visibility, 

605 }, 

606 ) 

607 

608 db_resource.team = self._get_team_name(db, db_resource.team_id) 

609 return self.convert_resource_to_read(db_resource) 

610 except IntegrityError as ie: 

611 logger.error(f"IntegrityErrors in group: {ie}") 

612 

613 # Structured logging: Log database integrity error 

614 structured_logger.log( 

615 level="ERROR", 

616 message="Resource creation failed due to database integrity error", 

617 event_type="resource_creation_failed", 

618 component="resource_service", 

619 user_id=created_by, 

620 user_email=owner_email, 

621 error=ie, 

622 custom_fields={ 

623 "resource_uri": resource.uri, 

624 }, 

625 ) 

626 raise ie 

627 except ResourceURIConflictError as rce: 

628 logger.error(f"ResourceURIConflictError in group: {resource.uri}") 

629 

630 # Structured logging: Log URI conflict error 

631 structured_logger.log( 

632 level="WARNING", 

633 message="Resource creation failed due to URI conflict", 

634 event_type="resource_uri_conflict", 

635 component="resource_service", 

636 user_id=created_by, 

637 user_email=owner_email, 

638 custom_fields={ 

639 "resource_uri": resource.uri, 

640 "visibility": visibility, 

641 }, 

642 ) 

643 raise rce 

644 except ContentSizeError as cse: 

645 

646 structured_logger.log( 

647 level="ERROR", 

648 message=f"Resource content size limit exceeded: {cse.actual_size} bytes (max: {cse.max_size} bytes)", 

649 event_type="resource_size_exceed", 

650 component="resource_service", 

651 user_id=created_by, 

652 user_email=owner_email, 

653 custom_fields={ 

654 "resource_uri": resource.uri, 

655 "visibility": visibility, 

656 }, 

657 ) 

658 raise cse 

659 except ContentTypeError as cte: 

660 

661 structured_logger.log( 

662 level="ERROR", 

663 message=f"Resource MIME type not allowed: {cte.mime_type}", 

664 event_type="resource_mime_type_rejected", 

665 component="resource_service", 

666 user_id=created_by, 

667 user_email=owner_email, 

668 custom_fields={ 

669 "resource_uri": resource.uri, 

670 "mime_type": cte.mime_type, 

671 "visibility": visibility, 

672 }, 

673 ) 

674 raise cte 

675 except Exception as e: 

676 db.rollback() 

677 

678 # Structured logging: Log generic resource creation failure 

679 structured_logger.log( 

680 level="ERROR", 

681 message="Resource creation failed", 

682 event_type="resource_creation_failed", 

683 component="resource_service", 

684 user_id=created_by, 

685 user_email=owner_email, 

686 error=e, 

687 custom_fields={ 

688 "resource_uri": resource.uri, 

689 }, 

690 ) 

691 raise ResourceError(f"Failed to register resource: {str(e)}") 

692 

693 async def register_resources_bulk( 

694 self, 

695 db: Session, 

696 resources: List[ResourceCreate], 

697 created_by: Optional[str] = None, 

698 created_from_ip: Optional[str] = None, 

699 created_via: Optional[str] = None, 

700 created_user_agent: Optional[str] = None, 

701 import_batch_id: Optional[str] = None, 

702 federation_source: Optional[str] = None, 

703 team_id: Optional[str] = None, 

704 owner_email: Optional[str] = None, 

705 visibility: Optional[str] = "public", 

706 conflict_strategy: str = "skip", 

707 ) -> Dict[str, Any]: 

708 """Register multiple resources in bulk with a single commit. 

709 

710 This method provides significant performance improvements over individual 

711 resource registration by: 

712 - Using db.add_all() instead of individual db.add() calls 

713 - Performing a single commit for all resources 

714 - Batch conflict detection 

715 - Chunking for very large imports (>500 items) 

716 

717 Args: 

718 db: Database session 

719 resources: List of resource creation schemas 

720 created_by: Username who created these resources 

721 created_from_ip: IP address of creator 

722 created_via: Creation method (ui, api, import, federation) 

723 created_user_agent: User agent of creation request 

724 import_batch_id: UUID for bulk import operations 

725 federation_source: Source gateway for federated resources 

726 team_id: Team ID to assign the resources to 

727 owner_email: Email of the user who owns these resources 

728 visibility: Resource visibility level (private, team, public) 

729 conflict_strategy: How to handle conflicts (skip, update, rename, fail) 

730 

731 Returns: 

732 Dict with statistics: 

733 - created: Number of resources created 

734 - updated: Number of resources updated 

735 - skipped: Number of resources skipped 

736 - failed: Number of resources that failed 

737 - errors: List of error messages 

738 

739 Raises: 

740 ResourceError: If bulk registration fails critically 

741 

742 Examples: 

743 >>> from mcpgateway.services.resource_service import ResourceService 

744 >>> from unittest.mock import MagicMock 

745 >>> service = ResourceService() 

746 >>> db = MagicMock() 

747 >>> resources = [MagicMock(), MagicMock()] 

748 >>> import asyncio 

749 >>> try: 

750 ... result = asyncio.run(service.register_resources_bulk(db, resources)) 

751 ... except Exception: 

752 ... pass 

753 """ 

754 if not resources: 

755 return {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []} 

756 

757 stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []} 

758 

759 # Process in chunks to avoid memory issues and SQLite parameter limits 

760 chunk_size = 500 

761 

762 for chunk_start in range(0, len(resources), chunk_size): 

763 chunk = resources[chunk_start : chunk_start + chunk_size] 

764 

765 try: 

766 # Batch check for existing resources to detect conflicts 

767 resource_uris = [resource.uri for resource in chunk] 

768 

769 # Build base query conditions 

770 if visibility.lower() == "public": 

771 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "public"] 

772 elif visibility.lower() == "team" and team_id: 

773 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "team", DbResource.team_id == team_id] 

774 else: 

775 # Private resources - check by owner 

776 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "private", DbResource.owner_email == (owner_email or created_by)] 

777 

778 existing_resources_query = select(DbResource).where(*base_conditions) 

779 existing_resources = db.execute(existing_resources_query).scalars().all() 

780 # Use (uri, gateway_id) tuple as key for proper conflict detection with gateway_id scoping 

781 existing_resources_map = {(r.uri, r.gateway_id): r for r in existing_resources} 

782 

783 resources_to_add = [] 

784 resources_to_update = [] 

785 

786 for resource in chunk: 

787 try: 

788 # Validate content size before processing 

789 content_security = get_content_security_service() 

790 if hasattr(resource, "content") and resource.content: 

791 content_security.validate_resource_size( 

792 content=resource.content, 

793 uri=resource.uri, 

794 user_email=created_by, 

795 ip_address=created_from_ip, 

796 ) 

797 

798 # MIME type resolution (same priority as register_resource): 

799 # user-provided > URI-detected > content-based fallback 

800 if getattr(resource, "mime_type", None): 

801 bulk_mime_type = resource.mime_type 

802 else: 

803 bulk_mime_type = self._detect_mime_type(resource.uri, getattr(resource, "content", "") or "") 

804 

805 # Validate MIME type against allowlist 

806 content_security.validate_resource_mime_type( 

807 mime_type=bulk_mime_type, 

808 uri=resource.uri, 

809 user_email=created_by, 

810 ip_address=created_from_ip, 

811 ) 

812 

813 # Use provided parameters or schema values 

814 resource_team_id = team_id if team_id is not None else getattr(resource, "team_id", None) 

815 resource_owner_email = owner_email or getattr(resource, "owner_email", None) or created_by 

816 resource_visibility = visibility if visibility is not None else getattr(resource, "visibility", "public") 

817 resource_gateway_id = getattr(resource, "gateway_id", None) 

818 

819 # Look up existing resource by (uri, gateway_id) tuple 

820 existing_resource = existing_resources_map.get((resource.uri, resource_gateway_id)) 

821 

822 if existing_resource: 

823 # Handle conflict based on strategy 

824 if conflict_strategy == "skip": 

825 stats["skipped"] += 1 

826 continue 

827 if conflict_strategy == "update": 

828 # Update existing resource 

829 existing_resource.name = resource.name 

830 existing_resource.title = getattr(resource, "title", None) 

831 existing_resource.description = resource.description 

832 existing_resource.mime_type = bulk_mime_type 

833 existing_resource.size = getattr(resource, "size", None) 

834 existing_resource.uri_template = resource.uri_template 

835 existing_resource.tags = resource.tags or [] 

836 existing_resource.modified_by = created_by 

837 existing_resource.modified_from_ip = created_from_ip 

838 existing_resource.modified_via = created_via 

839 existing_resource.modified_user_agent = created_user_agent 

840 existing_resource.updated_at = datetime.now(timezone.utc) 

841 existing_resource.version = (existing_resource.version or 1) + 1 

842 

843 resources_to_update.append(existing_resource) 

844 stats["updated"] += 1 

845 elif conflict_strategy == "rename": 

846 # Create with renamed resource 

847 new_uri = f"{resource.uri}_imported_{int(datetime.now().timestamp())}" 

848 db_resource = DbResource( 

849 uri=new_uri, 

850 name=resource.name, 

851 title=getattr(resource, "title", None), 

852 description=resource.description, 

853 mime_type=bulk_mime_type, 

854 size=getattr(resource, "size", None), 

855 uri_template=resource.uri_template, 

856 gateway_id=getattr(resource, "gateway_id", None), 

857 tags=resource.tags or [], 

858 created_by=created_by, 

859 created_from_ip=created_from_ip, 

860 created_via=created_via, 

861 created_user_agent=created_user_agent, 

862 import_batch_id=import_batch_id, 

863 federation_source=federation_source, 

864 version=1, 

865 team_id=resource_team_id, 

866 owner_email=resource_owner_email, 

867 visibility=resource_visibility, 

868 ) 

869 resources_to_add.append(db_resource) 

870 stats["created"] += 1 

871 elif conflict_strategy == "fail": 

872 stats["failed"] += 1 

873 stats["errors"].append(f"Resource URI conflict: {resource.uri}") 

874 continue 

875 else: 

876 # Create new resource 

877 db_resource = DbResource( 

878 uri=resource.uri, 

879 name=resource.name, 

880 title=getattr(resource, "title", None), 

881 description=resource.description, 

882 mime_type=bulk_mime_type, 

883 size=getattr(resource, "size", None), 

884 uri_template=resource.uri_template, 

885 gateway_id=getattr(resource, "gateway_id", None), 

886 tags=resource.tags or [], 

887 created_by=created_by, 

888 created_from_ip=created_from_ip, 

889 created_via=created_via, 

890 created_user_agent=created_user_agent, 

891 import_batch_id=import_batch_id, 

892 federation_source=federation_source, 

893 version=1, 

894 team_id=resource_team_id, 

895 owner_email=resource_owner_email, 

896 visibility=resource_visibility, 

897 ) 

898 resources_to_add.append(db_resource) 

899 stats["created"] += 1 

900 

901 except Exception as e: 

902 stats["failed"] += 1 

903 stats["errors"].append(f"Failed to process resource {resource.uri}: {str(e)}") 

904 logger.warning(f"Failed to process resource {resource.uri} in bulk operation: {str(e)}") 

905 continue 

906 

907 # Bulk add new resources 

908 if resources_to_add: 

909 db.add_all(resources_to_add) 

910 

911 # Commit the chunk 

912 db.commit() 

913 

914 # Refresh resources for notifications and audit trail 

915 for db_resource in resources_to_add: 

916 db.refresh(db_resource) 

917 # Notify subscribers 

918 await self._notify_resource_added(db_resource) 

919 

920 # Log bulk audit trail entry 

921 if resources_to_add or resources_to_update: 

922 audit_trail.log_action( 

923 user_id=created_by or "system", 

924 action="bulk_create_resources" if resources_to_add else "bulk_update_resources", 

925 resource_type="resource", 

926 resource_id=import_batch_id or "bulk_operation", 

927 resource_name=f"Bulk operation: {len(resources_to_add)} created, {len(resources_to_update)} updated", 

928 user_email=owner_email, 

929 team_id=team_id, 

930 client_ip=created_from_ip, 

931 user_agent=created_user_agent, 

932 new_values={ 

933 "resources_created": len(resources_to_add), 

934 "resources_updated": len(resources_to_update), 

935 "visibility": visibility, 

936 }, 

937 context={ 

938 "created_via": created_via, 

939 "import_batch_id": import_batch_id, 

940 "federation_source": federation_source, 

941 "conflict_strategy": conflict_strategy, 

942 }, 

943 db=db, 

944 ) 

945 

946 logger.info(f"Bulk registered {len(resources_to_add)} resources, updated {len(resources_to_update)} resources in chunk") 

947 

948 except Exception as e: 

949 db.rollback() 

950 logger.error(f"Failed to process chunk in bulk resource registration: {str(e)}") 

951 stats["failed"] += len(chunk) 

952 stats["errors"].append(f"Chunk processing failed: {str(e)}") 

953 continue 

954 

955 # Final structured logging 

956 structured_logger.log( 

957 level="INFO", 

958 message="Bulk resource registration completed", 

959 event_type="resources_bulk_created", 

960 component="resource_service", 

961 user_id=created_by, 

962 user_email=owner_email, 

963 team_id=team_id, 

964 resource_type="resource", 

965 custom_fields={ 

966 "resources_created": stats["created"], 

967 "resources_updated": stats["updated"], 

968 "resources_skipped": stats["skipped"], 

969 "resources_failed": stats["failed"], 

970 "total_resources": len(resources), 

971 "visibility": visibility, 

972 "conflict_strategy": conflict_strategy, 

973 }, 

974 ) 

975 

976 return stats 

977 

978 async def _check_resource_access( 

979 self, 

980 db: Session, 

981 resource: DbResource, 

982 user_email: Optional[str], 

983 token_teams: Optional[List[str]], 

984 ) -> bool: 

985 """Check if user has access to a resource based on visibility rules. 

986 

987 Implements the same access control logic as list_resources() for consistency. 

988 

989 Args: 

990 db: Database session for team membership lookup if needed. 

991 resource: Resource ORM object with visibility, team_id, owner_email. 

992 user_email: Email of the requesting user (None = unauthenticated). 

993 token_teams: List of team IDs from token. 

994 - None = unrestricted admin access 

995 - [] = public-only token 

996 - [...] = team-scoped token 

997 

998 Returns: 

999 True if access is allowed, False otherwise. 

1000 """ 

1001 visibility = getattr(resource, "visibility", "public") 

1002 resource_team_id = getattr(resource, "team_id", None) 

1003 resource_owner_email = getattr(resource, "owner_email", None) 

1004 

1005 # Public resources are accessible by everyone 

1006 if visibility == "public": 

1007 return True 

1008 

1009 # Admin bypass: token_teams=None AND user_email=None means unrestricted admin 

1010 # This happens when is_admin=True and no team scoping in token 

1011 if token_teams is None and user_email is None: 

1012 return True 

1013 

1014 # No user context (but not admin) = deny access to non-public resources 

1015 if not user_email: 

1016 return False 

1017 

1018 # Public-only tokens (empty teams array) can ONLY access public resources 

1019 is_public_only_token = token_teams is not None and len(token_teams) == 0 

1020 if is_public_only_token: 

1021 return False # Already checked public above 

1022 

1023 # Owner can access their own private resources 

1024 if visibility == "private" and resource_owner_email and resource_owner_email == user_email: 

1025 return True 

1026 

1027 # Team resources: check team membership (matches list_resources behavior) 

1028 if resource_team_id: 

1029 # Use token_teams if provided, otherwise look up from DB 

1030 if token_teams is not None: 

1031 team_ids = token_teams 

1032 else: 

1033 if db is None: 

1034 logger.warning("Missing database session for team-scoped resource access check") 

1035 return False 

1036 # First-Party 

1037 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel 

1038 

1039 team_service = TeamManagementService(db) 

1040 user_teams = await team_service.get_user_teams(user_email) 

1041 team_ids = [team.id for team in user_teams] 

1042 

1043 # Team/public visibility allows access if user is in the team 

1044 if visibility in ["team", "public"] and resource_team_id in team_ids: 

1045 return True 

1046 

1047 return False 

1048 

1049 async def list_resources( 

1050 self, 

1051 db: Session, 

1052 include_inactive: bool = False, 

1053 cursor: Optional[str] = None, 

1054 tags: Optional[List[str]] = None, 

1055 gateway_id: Optional[str] = None, 

1056 limit: Optional[int] = None, 

1057 page: Optional[int] = None, 

1058 per_page: Optional[int] = None, 

1059 user_email: Optional[str] = None, 

1060 team_id: Optional[str] = None, 

1061 visibility: Optional[str] = None, 

1062 token_teams: Optional[List[str]] = None, 

1063 ) -> Union[tuple[List[ResourceRead], Optional[str]], Dict[str, Any]]: 

1064 """ 

1065 Retrieve a list of registered resources from the database with pagination support. 

1066 

1067 This method retrieves resources from the database and converts them into a list 

1068 of ResourceRead objects. It supports filtering out inactive resources based on the 

1069 include_inactive parameter and cursor-based pagination. 

1070 

1071 Args: 

1072 db (Session): The SQLAlchemy database session. 

1073 include_inactive (bool): If True, include inactive resources in the result. 

1074 Defaults to False. 

1075 cursor (Optional[str], optional): An opaque cursor token for pagination. 

1076 Opaque base64-encoded string containing last item's ID and created_at. 

1077 tags (Optional[List[str]]): Filter resources by tags. If provided, only resources with at least one matching tag will be returned. 

1078 gateway_id (Optional[str]): Filter resources by gateway ID. Accepts the literal value 'null' to match NULL gateway_id. 

1079 limit (Optional[int]): Maximum number of resources to return. Use 0 for all resources (no limit). 

1080 If not specified, uses pagination_default_page_size. 

1081 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

1082 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size. 

1083 user_email (Optional[str]): User email for team-based access control. If None, no access control is applied. 

1084 team_id (Optional[str]): Filter by specific team ID. Requires user_email for access validation. 

1085 visibility (Optional[str]): Filter by visibility (private, team, public). 

1086 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API token access 

1087 where the token scope should be respected instead of the user's full team memberships. 

1088 

1089 Returns: 

1090 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}} 

1091 If cursor is provided or neither: tuple of (list of ResourceRead objects, next_cursor). 

1092 

1093 Examples: 

1094 >>> from mcpgateway.services.resource_service import ResourceService 

1095 >>> from unittest.mock import MagicMock 

1096 >>> service = ResourceService() 

1097 >>> db = MagicMock() 

1098 >>> resource_read = MagicMock() 

1099 >>> service.convert_resource_to_read = MagicMock(return_value=resource_read) 

1100 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()] 

1101 >>> import asyncio 

1102 >>> resources, next_cursor = asyncio.run(service.list_resources(db)) 

1103 >>> isinstance(resources, list) 

1104 True 

1105 

1106 With tags filter: 

1107 >>> db2 = MagicMock() 

1108 >>> bind = MagicMock() 

1109 >>> bind.dialect = MagicMock() 

1110 >>> bind.dialect.name = "sqlite" # or "postgresql" 

1111 >>> db2.get_bind.return_value = bind 

1112 >>> db2.execute.return_value.scalars.return_value.all.return_value = [MagicMock()] 

1113 >>> result2, _ = asyncio.run(service.list_resources(db2, tags=['api'])) 

1114 >>> isinstance(result2, list) 

1115 True 

1116 """ 

1117 with create_span( 

1118 "resource.list", 

1119 { 

1120 "include_inactive": include_inactive, 

1121 "tags.count": len(tags) if tags else 0, 

1122 "gateway_id": gateway_id, 

1123 "limit": limit, 

1124 "page": page, 

1125 "per_page": per_page, 

1126 "user.email": user_email, 

1127 "team.scope": format_trace_team_scope(token_teams) if token_teams is not None else None, 

1128 "team.filter": team_id, 

1129 "visibility": visibility, 

1130 }, 

1131 ): 

1132 # Check cache for first page only (cursor=None) 

1133 # Skip caching when: 

1134 # - user_email is provided (team-filtered results are user-specific) 

1135 # - token_teams is set (scoped access, e.g., public-only or team-scoped tokens) 

1136 # - page-based pagination is used 

1137 # This prevents cache poisoning where admin results could leak to public-only requests 

1138 cache = _get_registry_cache() 

1139 if cursor is None and user_email is None and token_teams is None and page is None: 

1140 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None, gateway_id=gateway_id, limit=limit, visibility=visibility) 

1141 cached = await cache.get("resources", filters_hash) 

1142 if cached is not None: 

1143 # Reconstruct ResourceRead objects from cached dicts 

1144 cached_resources = [ResourceRead.model_validate(r) for r in cached["resources"]] 

1145 return (cached_resources, cached.get("next_cursor")) 

1146 

1147 # Build base query with ordering 

1148 query = select(DbResource).where(DbResource.uri_template.is_(None)).order_by(desc(DbResource.created_at), desc(DbResource.id)) 

1149 

1150 # Apply active/inactive filter 

1151 if not include_inactive: 

1152 query = query.where(DbResource.enabled) 

1153 

1154 query = await self._apply_access_control(query, db, user_email, token_teams, team_id) 

1155 

1156 if visibility: 

1157 query = query.where(DbResource.visibility == visibility) 

1158 

1159 # Add gateway_id filtering if provided 

1160 if gateway_id: 

1161 if gateway_id.lower() == "null": 

1162 query = query.where(DbResource.gateway_id.is_(None)) 

1163 else: 

1164 query = query.where(DbResource.gateway_id == gateway_id) 

1165 

1166 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats) 

1167 if tags: 

1168 query = query.where(json_contains_tag_expr(db, DbResource.tags, tags, match_any=True)) 

1169 

1170 # Use unified pagination helper - handles both page and cursor pagination 

1171 pag_result = await unified_paginate( 

1172 db=db, 

1173 query=query, 

1174 page=page, 

1175 per_page=per_page, 

1176 cursor=cursor, 

1177 limit=limit, 

1178 base_url="/admin/resources", # Used for page-based links 

1179 query_params={"include_inactive": include_inactive} if include_inactive else {}, 

1180 ) 

1181 

1182 next_cursor = None 

1183 # Extract servers based on pagination type 

1184 if page is not None: 

1185 # Page-based: pag_result is a dict 

1186 resources_db = pag_result["data"] 

1187 else: 

1188 # Cursor-based: pag_result is a tuple 

1189 resources_db, next_cursor = pag_result 

1190 

1191 # Fetch team names for the resources (common for both pagination types) 

1192 team_ids_set = {s.team_id for s in resources_db if s.team_id} 

1193 team_map = {} 

1194 if team_ids_set: 

1195 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(team_ids_set), EmailTeam.is_active.is_(True))).all() 

1196 team_map = {team.id: team.name for team in teams} 

1197 

1198 db.commit() # Release transaction to avoid idle-in-transaction 

1199 

1200 # Convert to ResourceRead (common for both pagination types) 

1201 result = [] 

1202 for s in resources_db: 

1203 try: 

1204 s.team = team_map.get(s.team_id) if s.team_id else None 

1205 result.append(self.convert_resource_to_read(s, include_metrics=False)) 

1206 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

1207 logger.exception(f"Failed to convert resource {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}") 

1208 # Continue with remaining resources instead of failing completely 

1209 # Return appropriate format based on pagination type 

1210 if page is not None: 

1211 # Page-based format 

1212 return { 

1213 "data": result, 

1214 "pagination": pag_result["pagination"], 

1215 "links": pag_result["links"], 

1216 } 

1217 

1218 # Cursor-based format 

1219 

1220 # Cache first page results - only for non-user-specific/non-scoped queries 

1221 # Must match the same conditions as cache lookup to prevent cache poisoning 

1222 if cursor is None and user_email is None and token_teams is None: 

1223 try: 

1224 cache_data = {"resources": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor} 

1225 await cache.set("resources", cache_data, filters_hash) 

1226 except AttributeError: 

1227 pass # Skip caching if result objects don't support model_dump (e.g., in doctests) 

1228 

1229 return (result, next_cursor) 

1230 

1231 async def list_resources_for_user( 

1232 self, db: Session, user_email: str, team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100 

1233 ) -> List[ResourceRead]: 

1234 """ 

1235 DEPRECATED: Use list_resources() with user_email parameter instead. 

1236 

1237 List resources user has access to with team filtering. 

1238 

1239 This method is maintained for backward compatibility but is no longer used. 

1240 New code should call list_resources() with user_email, team_id, and visibility parameters. 

1241 

1242 Args: 

1243 db: Database session 

1244 user_email: Email of the user requesting resources 

1245 team_id: Optional team ID to filter by specific team 

1246 visibility: Optional visibility filter (private, team, public) 

1247 include_inactive: Whether to include inactive resources 

1248 skip: Number of resources to skip for pagination 

1249 limit: Maximum number of resources to return 

1250 

1251 Returns: 

1252 List[ResourceRead]: Resources the user has access to 

1253 

1254 Examples: 

1255 >>> from unittest.mock import MagicMock 

1256 >>> import asyncio 

1257 >>> service = ResourceService() 

1258 >>> db = MagicMock() 

1259 >>> # Patch out TeamManagementService so it doesn't run real logic 

1260 >>> import mcpgateway.services.resource_service as _rs 

1261 >>> class FakeTeamService: 

1262 ... def __init__(self, db): pass 

1263 ... async def get_user_teams(self, email): return [] 

1264 >>> _rs.TeamManagementService = FakeTeamService 

1265 >>> # Force DB to return one fake row with a 'team' attribute 

1266 >>> class FakeResource: 

1267 ... team_id = None 

1268 >>> fake_resource = FakeResource() 

1269 >>> db.execute.return_value.scalars.return_value.all.return_value = [fake_resource] 

1270 >>> service.convert_resource_to_read = MagicMock(return_value="converted") 

1271 >>> asyncio.run(service.list_resources_for_user(db, "user@example.com")) 

1272 ['converted'] 

1273 

1274 Without team_id (default/public access): 

1275 >>> db2 = MagicMock() 

1276 >>> class FakeResource2: 

1277 ... team_id = None 

1278 >>> fake_resource2 = FakeResource2() 

1279 >>> db2.execute.return_value.scalars.return_value.all.return_value = [fake_resource2] 

1280 >>> service.convert_resource_to_read = MagicMock(return_value="converted2") 

1281 >>> out2 = asyncio.run(service.list_resources_for_user(db2, "user@example.com")) 

1282 >>> out2 

1283 ['converted2'] 

1284 """ 

1285 # First-Party 

1286 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel 

1287 

1288 # Build query following existing patterns from list_resources() 

1289 team_service = TeamManagementService(db) 

1290 user_teams = await team_service.get_user_teams(user_email) 

1291 team_ids = [team.id for team in user_teams] 

1292 

1293 # Build query following existing patterns from list_resources() 

1294 query = select(DbResource) 

1295 

1296 # Apply active/inactive filter 

1297 if not include_inactive: 

1298 query = query.where(DbResource.enabled) 

1299 

1300 if team_id: 

1301 if team_id not in team_ids: 

1302 return [] # No access to team 

1303 

1304 access_conditions = [] 

1305 # Filter by specific team 

1306 access_conditions.append(and_(DbResource.team_id == team_id, DbResource.visibility.in_(["team", "public"]))) 

1307 

1308 access_conditions.append(and_(DbResource.team_id == team_id, DbResource.owner_email == user_email)) 

1309 

1310 query = query.where(or_(*access_conditions)) 

1311 else: 

1312 # Get user's accessible teams 

1313 # Build access conditions following existing patterns 

1314 access_conditions = [] 

1315 # 1. User's personal resources (owner_email matches) 

1316 access_conditions.append(DbResource.owner_email == user_email) 

1317 # 2. Team resources where user is member 

1318 if team_ids: 

1319 access_conditions.append(and_(DbResource.team_id.in_(team_ids), DbResource.visibility.in_(["team", "public"]))) 

1320 # 3. Public resources (if visibility allows) 

1321 access_conditions.append(DbResource.visibility == "public") 

1322 

1323 query = query.where(or_(*access_conditions)) 

1324 

1325 # Apply visibility filter if specified 

1326 if visibility: 

1327 query = query.where(DbResource.visibility == visibility) 

1328 

1329 # Apply pagination following existing patterns 

1330 query = query.offset(skip).limit(limit) 

1331 

1332 resources = db.execute(query).scalars().all() 

1333 

1334 # Batch fetch team names to avoid N+1 queries 

1335 resource_team_ids = {r.team_id for r in resources if r.team_id} 

1336 team_map = {} 

1337 if resource_team_ids: 

1338 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(resource_team_ids), EmailTeam.is_active.is_(True))).all() 

1339 team_map = {str(team.id): team.name for team in teams} 

1340 

1341 db.commit() # Release transaction to avoid idle-in-transaction 

1342 

1343 result = [] 

1344 for t in resources: 

1345 try: 

1346 t.team = team_map.get(str(t.team_id)) if t.team_id else None 

1347 result.append(self.convert_resource_to_read(t, include_metrics=False)) 

1348 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

1349 logger.exception(f"Failed to convert resource {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}") 

1350 # Continue with remaining resources instead of failing completely 

1351 return result 

1352 

1353 async def list_server_resources( 

1354 self, 

1355 db: Session, 

1356 server_id: str, 

1357 include_inactive: bool = False, 

1358 include_metrics: bool = False, 

1359 user_email: Optional[str] = None, 

1360 token_teams: Optional[List[str]] = None, 

1361 ) -> List[ResourceRead]: 

1362 """ 

1363 Retrieve a list of registered resources from the database. 

1364 

1365 This method retrieves resources from the database and converts them into a list 

1366 of ResourceRead objects. It supports filtering out inactive resources based on the 

1367 include_inactive parameter. The cursor parameter is reserved for future pagination support 

1368 but is currently not implemented. 

1369 

1370 Args: 

1371 db (Session): The SQLAlchemy database session. 

1372 server_id (str): Server ID 

1373 include_inactive (bool): If True, include inactive resources in the result. 

1374 Defaults to False. 

1375 include_metrics (bool): If True, include metrics data in the result. 

1376 Defaults to False. 

1377 user_email (Optional[str]): User email for visibility filtering. If None, no filtering applied. 

1378 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API 

1379 token access where the token scope should be respected. 

1380 

1381 Returns: 

1382 List[ResourceRead]: A list of resources represented as ResourceRead objects. 

1383 

1384 Examples: 

1385 >>> from mcpgateway.services.resource_service import ResourceService 

1386 >>> from unittest.mock import MagicMock 

1387 >>> service = ResourceService() 

1388 >>> db = MagicMock() 

1389 >>> resource_read = MagicMock() 

1390 >>> service.convert_resource_to_read = MagicMock(return_value=resource_read) 

1391 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()] 

1392 >>> import asyncio 

1393 >>> result = asyncio.run(service.list_server_resources(db, 'server1')) 

1394 >>> isinstance(result, list) 

1395 True 

1396 >>> # Include inactive branch 

1397 >>> result = asyncio.run(service.list_server_resources(db, 'server1', include_inactive=True)) 

1398 >>> isinstance(result, list) 

1399 True 

1400 """ 

1401 with create_span( 

1402 "resource.list", 

1403 { 

1404 "server_id": server_id, 

1405 "include_inactive": include_inactive, 

1406 "include_metrics": include_metrics, 

1407 "user.email": user_email, 

1408 "team.scope": format_trace_team_scope(token_teams) if token_teams is not None else None, 

1409 }, 

1410 ): 

1411 logger.debug(f"Listing resources for server_id: {server_id}, include_inactive: {include_inactive}") 

1412 query = ( 

1413 select(DbResource) 

1414 .join(server_resource_association, DbResource.id == server_resource_association.c.resource_id) 

1415 .where(DbResource.uri_template.is_(None)) 

1416 .where(server_resource_association.c.server_id == server_id) 

1417 ) 

1418 

1419 # Eager load metrics relationships to prevent N+1 queries when include_metrics=true 

1420 if include_metrics: 

1421 query = query.options(selectinload(DbResource.metrics), selectinload(DbResource.metrics_hourly)) 

1422 if not include_inactive: 

1423 query = query.where(DbResource.enabled) 

1424 

1425 # Add visibility filtering if user context OR token_teams provided 

1426 # This ensures unauthenticated requests with token_teams=[] only see public resources 

1427 if user_email is not None or token_teams is not None: # empty-string user_email -> public-only filtering (secure default) 

1428 # Use token_teams if provided (for MCP/API token access), otherwise look up from DB 

1429 if token_teams is not None: 

1430 team_ids = token_teams 

1431 elif user_email: 

1432 # First-Party 

1433 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel 

1434 

1435 team_service = TeamManagementService(db) 

1436 user_teams = await team_service.get_user_teams(user_email) 

1437 team_ids = [team.id for team in user_teams] 

1438 else: 

1439 team_ids = [] 

1440 

1441 # Check if this is a public-only token (empty teams array) 

1442 # Public-only tokens can ONLY see public resources - no owner access 

1443 is_public_only_token = token_teams is not None and len(token_teams) == 0 

1444 

1445 access_conditions = [ 

1446 DbResource.visibility == "public", 

1447 ] 

1448 # Only include owner access for non-public-only tokens with user_email 

1449 if not is_public_only_token and user_email: 

1450 access_conditions.append(DbResource.owner_email == user_email) 

1451 if team_ids: 

1452 access_conditions.append(and_(DbResource.team_id.in_(team_ids), DbResource.visibility.in_(["team", "public"]))) 

1453 query = query.where(or_(*access_conditions)) 

1454 

1455 # Cursor-based pagination logic can be implemented here in the future. 

1456 resources = db.execute(query).scalars().all() 

1457 

1458 # Batch fetch team names to avoid N+1 queries 

1459 resource_team_ids = {r.team_id for r in resources if r.team_id} 

1460 team_map = {} 

1461 if resource_team_ids: 

1462 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(resource_team_ids), EmailTeam.is_active.is_(True))).all() 

1463 team_map = {str(team.id): team.name for team in teams} 

1464 

1465 db.commit() # Release transaction to avoid idle-in-transaction 

1466 

1467 result = [] 

1468 for t in resources: 

1469 try: 

1470 t.team = team_map.get(str(t.team_id)) if t.team_id else None 

1471 result.append(self.convert_resource_to_read(t, include_metrics=include_metrics)) 

1472 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

1473 logger.exception(f"Failed to convert resource {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}") 

1474 # Continue with remaining resources instead of failing completely 

1475 return result 

1476 

1477 async def _record_resource_metric(self, db: Session, resource: DbResource, start_time: float, success: bool, error_message: Optional[str]) -> None: 

1478 """ 

1479 Records a metric for a resource access. 

1480 

1481 Args: 

1482 db: Database session 

1483 resource: The resource that was accessed 

1484 start_time: Monotonic start time of the access 

1485 success: True if successful, False otherwise 

1486 error_message: Error message if failed, None otherwise 

1487 """ 

1488 end_time = time.monotonic() 

1489 response_time = end_time - start_time 

1490 

1491 metric = ResourceMetric( 

1492 resource_id=resource.id, 

1493 response_time=response_time, 

1494 is_success=success, 

1495 error_message=error_message, 

1496 ) 

1497 db.add(metric) 

1498 db.commit() 

1499 

1500 async def _record_invoke_resource_metric(self, db: Session, resource_id: str, start_time: float, success: bool, error_message: Optional[str]) -> None: 

1501 """ 

1502 Records a metric for invoking resource. 

1503 

1504 Args: 

1505 db: Database Session 

1506 resource_id: unique identifier to access & invoke resource 

1507 start_time: Monotonic start time of the access 

1508 success: True if successful, False otherwise 

1509 error_message: Error message if failed, None otherwise 

1510 """ 

1511 end_time = time.monotonic() 

1512 response_time = end_time - start_time 

1513 

1514 metric = ResourceMetric( 

1515 resource_id=resource_id, 

1516 response_time=response_time, 

1517 is_success=success, 

1518 error_message=error_message, 

1519 ) 

1520 db.add(metric) 

1521 db.commit() 

1522 

1523 def create_ssl_context(self, ca_certificate: str) -> ssl.SSLContext: 

1524 """Create an SSL context with the provided CA certificate. 

1525 

1526 Uses caching to avoid repeated SSL context creation for the same certificate. 

1527 

1528 Args: 

1529 ca_certificate: CA certificate in PEM format 

1530 

1531 Returns: 

1532 ssl.SSLContext: Configured SSL context 

1533 """ 

1534 return get_cached_ssl_context(ca_certificate) 

1535 

1536 async def invoke_resource( # pylint: disable=unused-argument 

1537 self, 

1538 db: Session, 

1539 resource_id: str, 

1540 resource_uri: str, 

1541 resource_template_uri: Optional[str] = None, 

1542 user_identity: Optional[Union[str, Dict[str, Any]]] = None, 

1543 meta_data: Optional[Dict[str, Any]] = None, # Reserved for future MCP SDK support 

1544 resource_obj: Optional[Any] = None, 

1545 gateway_obj: Optional[Any] = None, 

1546 server_id: Optional[str] = None, 

1547 ) -> Any: 

1548 """ 

1549 Invoke a resource via its configured gateway using SSE or StreamableHTTP transport. 

1550 

1551 This method determines the correct URI to invoke, loads the associated resource 

1552 and gateway from the database, validates certificates if applicable, prepares 

1553 authentication headers (OAuth, header-based, or none), and then connects to 

1554 the gateway to read the resource using the appropriate transport. 

1555 

1556 The function supports: 

1557 - CA certificate validation / SSL context creation 

1558 - OAuth client-credentials and authorization-code flow 

1559 - Header-based auth 

1560 - SSE transport gateways 

1561 - StreamableHTTP transport gateways 

1562 

1563 Args: 

1564 db (Session): 

1565 SQLAlchemy session for retrieving resource and gateway information. 

1566 resource_id (str): 

1567 ID of the resource to invoke. 

1568 resource_uri (str): 

1569 Direct resource URI configured for the resource. 

1570 resource_template_uri (Optional[str]): 

1571 URI from the template. Overrides `resource_uri` when provided. 

1572 user_identity (Optional[Union[str, Dict[str, Any]]]): 

1573 Identity of the user making the request, used for session pool isolation. 

1574 Can be a string (email) or a dict with an 'email' key. 

1575 Defaults to "anonymous" for pool isolation if not provided. 

1576 OAuth Authorization Code token lookup uses this user identity. 

1577 meta_data (Optional[Dict[str, Any]]): 

1578 Additional metadata to pass to the gateway during invocation. 

1579 resource_obj (Optional[Any]): 

1580 Pre-fetched DbResource object to skip the internal resource lookup query. 

1581 gateway_obj (Optional[Any]): 

1582 Pre-fetched DbGateway object to skip the internal gateway lookup query. 

1583 server_id (Optional[str]): 

1584 Virtual server ID for server metrics recording. When provided, indicates 

1585 the resource was invoked through a specific virtual server endpoint. 

1586 Direct resource calls (e.g., from admin UI) should pass None. 

1587 

1588 Returns: 

1589 Any: The text content returned by the remote resource, or ``None`` if the 

1590 gateway could not be contacted or an error occurred. 

1591 

1592 Raises: 

1593 Exception: Any unhandled internal errors (e.g., DB issues). 

1594 

1595 --- 

1596 Doctest Examples 

1597 ---------------- 

1598 

1599 >>> class FakeDB: 

1600 ... "Simple DB stub returning fake resource and gateway rows." 

1601 ... def execute(self, query): 

1602 ... class Result: 

1603 ... def scalar_one_or_none(self): 

1604 ... # Return fake objects with the needed attributes 

1605 ... class FakeResource: 

1606 ... id = "res123" 

1607 ... name = "Demo Resource" 

1608 ... gateway_id = "gw1" 

1609 ... return FakeResource() 

1610 ... return Result() 

1611 

1612 >>> class FakeGateway: 

1613 ... id = "gw1" 

1614 ... name = "Fake Gateway" 

1615 ... url = "https://fake.gateway" 

1616 ... ca_certificate = None 

1617 ... ca_certificate_sig = None 

1618 ... transport = "sse" 

1619 ... auth_type = None 

1620 ... auth_value = {} 

1621 

1622 >>> # Monkeypatch the DB lookup for gateway 

1623 >>> def fake_execute_gateway(self, query): 

1624 ... class Result: 

1625 ... def scalar_one_or_none(self_inner): 

1626 ... return FakeGateway() 

1627 ... return Result() 

1628 

1629 >>> FakeDB.execute_gateway = fake_execute_gateway 

1630 

1631 >>> class FakeService: 

1632 ... "Service stub replacing network calls with predictable outputs." 

1633 ... async def invoke_resource(self, db, resource_id, resource_uri, resource_template_uri=None): 

1634 ... # Represent the behavior of a successful SSE response. 

1635 ... return "hello from gateway" 

1636 

1637 >>> svc = FakeService() 

1638 >>> import asyncio 

1639 >>> asyncio.run(svc.invoke_resource(FakeDB(), "res123", "/test")) 

1640 'hello from gateway' 

1641 

1642 --- 

1643 Example: Template URI overrides resource URI 

1644 -------------------------------------------- 

1645 

1646 >>> class FakeService2(FakeService): 

1647 ... async def invoke_resource(self, db, resource_id, resource_uri, resource_template_uri=None): 

1648 ... if resource_template_uri: 

1649 ... return f"using template: {resource_template_uri}" 

1650 ... return f"using direct: {resource_uri}" 

1651 

1652 >>> svc2 = FakeService2() 

1653 >>> asyncio.run(svc2.invoke_resource(FakeDB(), "res123", "/direct", "/template")) 

1654 'using template: /template' 

1655 

1656 """ 

1657 uri = None 

1658 if resource_uri and resource_template_uri: 

1659 uri = resource_template_uri 

1660 elif resource_uri: 

1661 uri = resource_uri 

1662 

1663 logger.info(f"Invoking the resource: {uri}") 

1664 gateway_id = None 

1665 # Use pre-fetched resource if provided (avoids Q5 re-fetch) 

1666 resource_info = resource_obj 

1667 if resource_info is None: 

1668 resource_info = db.execute(select(DbResource).where(DbResource.id == resource_id)).scalar_one_or_none() 

1669 

1670 # Release transaction immediately after resource lookup to prevent idle-in-transaction 

1671 # This is especially important when resource isn't found - we don't want to hold the transaction 

1672 db.commit() 

1673 

1674 # Normalize user_identity to string for session pool isolation. 

1675 if isinstance(user_identity, dict): 

1676 pool_user_identity = user_identity.get("email") or "anonymous" 

1677 elif isinstance(user_identity, str): 

1678 pool_user_identity = user_identity 

1679 else: 

1680 pool_user_identity = "anonymous" 

1681 

1682 oauth_user_email: Optional[str] = None 

1683 if isinstance(user_identity, dict): 

1684 user_email_value = user_identity.get("email") 

1685 if isinstance(user_email_value, str): 

1686 oauth_user_email = user_email_value.strip().lower() or None 

1687 elif isinstance(user_identity, str): 

1688 oauth_user_email = user_identity.strip().lower() or None 

1689 

1690 if resource_info: 

1691 gateway_id = getattr(resource_info, "gateway_id", None) 

1692 resource_name = getattr(resource_info, "name", None) 

1693 # Use pre-fetched gateway if provided (avoids Q6 re-fetch) 

1694 gateway = gateway_obj 

1695 if gateway is None and gateway_id: 

1696 gateway = db.execute(select(DbGateway).where(DbGateway.id == gateway_id)).scalar_one_or_none() 

1697 

1698 # ═══════════════════════════════════════════════════════════════════════════ 

1699 # CRITICAL: Release DB transaction immediately after fetching resource/gateway 

1700 # This ensures the transaction doesn't stay open if there's no gateway 

1701 # or if the function returns early for any reason. 

1702 # ═══════════════════════════════════════════════════════════════════════════ 

1703 db.commit() 

1704 

1705 if gateway: 

1706 

1707 start_time = time.monotonic() 

1708 success = False 

1709 error_message = None 

1710 

1711 # Create database span for observability dashboard 

1712 trace_id = current_trace_id.get() 

1713 db_span_id = None 

1714 db_span_ended = False 

1715 observability_service = ObservabilityService() if trace_id else None 

1716 

1717 if trace_id and observability_service: 

1718 try: 

1719 db_span_id = observability_service.start_span( 

1720 db=db, 

1721 trace_id=trace_id, 

1722 name="invoke.resource", 

1723 attributes={ 

1724 "resource.name": resource_name if resource_name else "unknown", 

1725 "resource.id": str(resource_id) if resource_id else "unknown", 

1726 "resource.uri": str(uri) or "unknown", 

1727 "gateway.transport": getattr(gateway, "transport") or "uknown", 

1728 "gateway.url": getattr(gateway, "url") or "unknown", 

1729 }, 

1730 ) 

1731 logger.debug(f"✓ Created resource.read span: {db_span_id} for resource: {resource_id} & {uri}") 

1732 except Exception as e: 

1733 logger.warning(f"Failed to start the observability span for invoking resource: {e}") 

1734 db_span_id = None 

1735 

1736 span_attributes = { 

1737 "resource.name": resource_name if resource_name else "unknown", 

1738 "resource.id": str(resource_id) if resource_id else "unknown", 

1739 "resource.uri": str(uri) or "unknown", 

1740 "gateway.transport": getattr(gateway, "transport") or "uknown", 

1741 "gateway.url": getattr(gateway, "url") or "unknown", 

1742 } 

1743 if is_input_capture_enabled("invoke.resource"): 

1744 span_attributes["langfuse.observation.input"] = serialize_trace_payload({"uri": str(uri) if uri else "unknown"}) 

1745 

1746 with create_span("invoke.resource", span_attributes) as span: 

1747 valid = False 

1748 if gateway.ca_certificate: 

1749 if settings.enable_ed25519_signing: 

1750 public_key_pem = settings.ed25519_public_key 

1751 valid = validate_signature(gateway.ca_certificate.encode(), gateway.ca_certificate_sig, public_key_pem) 

1752 else: 

1753 valid = True 

1754 

1755 if valid: 

1756 ssl_context = self.create_ssl_context(gateway.ca_certificate) 

1757 else: 

1758 ssl_context = None 

1759 

1760 def _get_httpx_client_factory( 

1761 headers: dict[str, str] | None = None, 

1762 timeout: httpx.Timeout | None = None, 

1763 auth: httpx.Auth | None = None, 

1764 ) -> httpx.AsyncClient: 

1765 """Factory function to create httpx.AsyncClient with optional CA certificate. 

1766 

1767 Args: 

1768 headers: Optional headers for the client 

1769 timeout: Optional timeout for the client 

1770 auth: Optional auth for the client 

1771 

1772 Returns: 

1773 httpx.AsyncClient: Configured HTTPX async client 

1774 """ 

1775 # First-Party 

1776 from mcpgateway.services.http_client_service import get_default_verify, get_http_timeout # pylint: disable=import-outside-toplevel 

1777 

1778 return httpx.AsyncClient( 

1779 verify=ssl_context if ssl_context else get_default_verify(), # pylint: disable=cell-var-from-loop 

1780 follow_redirects=True, 

1781 headers=headers, 

1782 timeout=timeout if timeout else get_http_timeout(), 

1783 auth=auth, 

1784 limits=httpx.Limits( 

1785 max_connections=settings.httpx_max_connections, 

1786 max_keepalive_connections=settings.httpx_max_keepalive_connections, 

1787 keepalive_expiry=settings.httpx_keepalive_expiry, 

1788 ), 

1789 ) 

1790 

1791 try: 

1792 # ═══════════════════════════════════════════════════════════════════════════ 

1793 # Extract gateway data to local variables BEFORE OAuth handling 

1794 # OAuth client_credentials flow makes network calls, so we must release 

1795 # the DB transaction first to prevent idle-in-transaction during network I/O 

1796 # ═══════════════════════════════════════════════════════════════════════════ 

1797 gateway_url = gateway.url 

1798 gateway_transport = gateway.transport 

1799 gateway_auth_type = gateway.auth_type 

1800 gateway_auth_value = gateway.auth_value 

1801 gateway_oauth_config = gateway.oauth_config 

1802 gateway_name = gateway.name 

1803 gateway_auth_query_params = getattr(gateway, "auth_query_params", None) 

1804 

1805 # Apply query param auth to URL if applicable 

1806 auth_query_params_decrypted: Optional[Dict[str, str]] = None 

1807 if gateway_auth_type == "query_param" and gateway_auth_query_params: 

1808 auth_query_params_decrypted = {} 

1809 for param_key, encrypted_value in gateway_auth_query_params.items(): 

1810 if encrypted_value: 

1811 try: 

1812 decrypted = decode_auth(encrypted_value) 

1813 auth_query_params_decrypted[param_key] = decrypted.get(param_key, "") 

1814 except Exception: # noqa: S110 - intentionally skip failed decryptions 

1815 # Silently skip params that fail decryption (corrupted or old key) 

1816 logger.debug(f"Failed to decrypt query param '{param_key}' for resource") 

1817 if auth_query_params_decrypted: 

1818 gateway_url = apply_query_param_auth(gateway_url, auth_query_params_decrypted) 

1819 

1820 # ═══════════════════════════════════════════════════════════════════════════ 

1821 # CRITICAL: Release DB connection back to pool BEFORE making HTTP/OAuth calls 

1822 # This prevents connection pool exhaustion during slow upstream requests. 

1823 # OAuth client_credentials flow makes network calls to token endpoints. 

1824 # All needed data has been extracted to local variables above. 

1825 # ═══════════════════════════════════════════════════════════════════════════ 

1826 db.commit() # End read-only transaction cleanly 

1827 db.close() 

1828 

1829 # Handle different authentication types (AFTER DB release) 

1830 headers = {} 

1831 if gateway_auth_type == "oauth" and gateway_oauth_config: 

1832 grant_type = gateway_oauth_config.get("grant_type", "client_credentials") 

1833 

1834 if grant_type == "authorization_code": 

1835 # For Authorization Code flow, try to get stored tokens 

1836 try: 

1837 # First-Party 

1838 from mcpgateway.services.token_storage_service import TokenStorageService # pylint: disable=import-outside-toplevel 

1839 

1840 # Use fresh DB session for token lookup (original db was closed) 

1841 access_token = None 

1842 if oauth_user_email: 

1843 with fresh_db_session() as token_db: 

1844 token_storage = TokenStorageService(token_db) 

1845 access_token = await token_storage.get_user_token(gateway_id, oauth_user_email) 

1846 

1847 if access_token: 

1848 headers["Authorization"] = f"Bearer {access_token}" 

1849 else: 

1850 if span: 

1851 set_span_attribute(span, "health.status", "unhealthy") 

1852 set_span_error(span, "No valid OAuth token for user") 

1853 

1854 except Exception as e: 

1855 logger.error(f"Failed to obtain stored OAuth token for gateway {gateway_name}: {e}") 

1856 if span: 

1857 set_span_attribute(span, "health.status", "unhealthy") 

1858 set_span_error(span, "Failed to obtain stored OAuth token") 

1859 else: 

1860 # For Client Credentials flow, get token directly (makes network calls) 

1861 try: 

1862 access_token: str = await self.oauth_manager.get_access_token(gateway_oauth_config) 

1863 headers["Authorization"] = f"Bearer {access_token}" 

1864 except Exception as e: 

1865 if span: 

1866 set_span_attribute(span, "health.status", "unhealthy") 

1867 set_span_error(span, e) 

1868 else: 

1869 # Handle non-OAuth authentication (existing logic) 

1870 auth_data = gateway_auth_value or {} 

1871 if isinstance(auth_data, str): 

1872 headers = decode_auth(auth_data) 

1873 elif isinstance(auth_data, dict): 

1874 headers = {str(k): str(v) for k, v in auth_data.items()} 

1875 else: 

1876 headers = {} 

1877 

1878 async def connect_to_sse_session(server_url: str, uri: str, authentication: Optional[Dict[str, str]] = None) -> str | None: 

1879 """ 

1880 Connect to an SSE-based gateway and retrieve the text content of a resource. 

1881 

1882 This helper establishes an SSE (Server-Sent Events) session with the remote 

1883 gateway, initializes a `ClientSession`, invokes `read_resource()` for the 

1884 given URI, and returns the textual content from the first item in the 

1885 response's `contents` list. 

1886 

1887 If any error occurs (network failure, unexpected response format, session 

1888 initialization failure, etc.), the method logs the exception and returns 

1889 ``None`` instead of raising. 

1890 

1891 Note: 

1892 MCP SDK 1.25.0 read_resource() does not support meta parameter. 

1893 When the SDK adds support, meta_data can be added back here. 

1894 

1895 Args: 

1896 server_url (str): 

1897 The base URL of the SSE gateway to connect to. 

1898 uri (str): 

1899 The resource URI that should be requested from the gateway. 

1900 authentication (Optional[Dict[str, str]]): 

1901 Optional dictionary of headers (e.g., OAuth Bearer tokens) to 

1902 include in the SSE connection request. Defaults to an empty 

1903 dictionary when not provided. 

1904 

1905 Returns: 

1906 str | None: 

1907 The text content returned by the remote resource, or ``None`` if the 

1908 SSE connection fails or the response is invalid. 

1909 

1910 Notes: 

1911 - This function assumes the SSE client context manager yields: 

1912 ``(read_stream, write_stream, get_session_id)``. 

1913 - The expected response object from `session.read_resource()` must have a 

1914 `contents` attribute containing a list, where the first element has a 

1915 `text` attribute. 

1916 """ 

1917 if authentication is None: 

1918 authentication = {} 

1919 try: 

1920 # Use session pool if enabled for 10-20x latency improvement 

1921 use_pool = False 

1922 pool = None 

1923 if settings.mcp_session_pool_enabled: 

1924 try: 

1925 pool = get_mcp_session_pool() 

1926 use_pool = True 

1927 except RuntimeError: 

1928 # Pool not initialized (e.g., in tests), fall back to per-call sessions 

1929 pass 

1930 

1931 if use_pool and pool is not None: 

1932 async with pool.session( 

1933 url=server_url, 

1934 headers=authentication, 

1935 transport_type=TransportType.SSE, 

1936 httpx_client_factory=_get_httpx_client_factory, 

1937 user_identity=pool_user_identity, 

1938 gateway_id=gateway_id, 

1939 ) as pooled: 

1940 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter 

1941 resource_response = await pooled.session.read_resource(uri=uri) 

1942 return getattr(getattr(resource_response, "contents")[0], "text") 

1943 else: 

1944 # Fallback to per-call sessions when pool disabled or not initialized 

1945 async with sse_client(url=server_url, headers=authentication, timeout=settings.health_check_timeout, httpx_client_factory=_get_httpx_client_factory) as ( 

1946 read_stream, 

1947 write_stream, 

1948 ): 

1949 async with ClientSession(read_stream, write_stream) as session: 

1950 _ = await session.initialize() 

1951 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter 

1952 resource_response = await session.read_resource(uri=uri) 

1953 return getattr(getattr(resource_response, "contents")[0], "text") 

1954 except Exception as e: 

1955 # Sanitize error message to prevent URL secrets from leaking in logs 

1956 sanitized_error = sanitize_exception_message(str(e), auth_query_params_decrypted) 

1957 logger.debug(f"Exception while connecting to sse gateway: {sanitized_error}") 

1958 return None 

1959 

1960 async def connect_to_streamablehttp_server(server_url: str, uri: str, authentication: Optional[Dict[str, str]] = None) -> str | None: 

1961 """ 

1962 Connect to a StreamableHTTP gateway and retrieve the text content of a resource. 

1963 

1964 This helper establishes a StreamableHTTP client session with the specified 

1965 gateway, initializes a `ClientSession`, invokes `read_resource()` for the 

1966 given URI, and returns the textual content from the first element in the 

1967 response's `contents` list. 

1968 

1969 If any exception occurs during connection, session initialization, or 

1970 resource reading, the function logs the error and returns ``None`` instead 

1971 of propagating the exception. 

1972 

1973 Note: 

1974 MCP SDK 1.25.0 read_resource() does not support meta parameter. 

1975 When the SDK adds support, meta_data can be added back here. 

1976 

1977 Args: 

1978 server_url (str): 

1979 The endpoint URL of the StreamableHTTP gateway. 

1980 uri (str): 

1981 The resource URI to request from the gateway. 

1982 authentication (Optional[Dict[str, str]]): 

1983 Optional dictionary of authentication headers (e.g., API keys or 

1984 Bearer tokens). Defaults to an empty dictionary when not provided. 

1985 

1986 Returns: 

1987 str | None: 

1988 The text content returned by the StreamableHTTP resource, or ``None`` 

1989 if the connection fails or the response format is invalid. 

1990 

1991 Notes: 

1992 - The `streamablehttp_client` context manager must yield a tuple: 

1993 ``(read_stream, write_stream, get_session_id)``. 

1994 - The expected `resource_response` returned by ``session.read_resource()`` 

1995 must contain a `contents` list, whose first element exposes a `text` 

1996 attribute. 

1997 """ 

1998 if authentication is None: 

1999 authentication = {} 

2000 try: 

2001 # Use session pool if enabled for 10-20x latency improvement 

2002 use_pool = False 

2003 pool = None 

2004 if settings.mcp_session_pool_enabled: 

2005 try: 

2006 pool = get_mcp_session_pool() 

2007 use_pool = True 

2008 except RuntimeError: 

2009 # Pool not initialized (e.g., in tests), fall back to per-call sessions 

2010 pass 

2011 

2012 if use_pool and pool is not None: 

2013 async with pool.session( 

2014 url=server_url, 

2015 headers=authentication, 

2016 transport_type=TransportType.STREAMABLE_HTTP, 

2017 httpx_client_factory=_get_httpx_client_factory, 

2018 user_identity=pool_user_identity, 

2019 gateway_id=gateway_id, 

2020 ) as pooled: 

2021 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter 

2022 resource_response = await pooled.session.read_resource(uri=uri) 

2023 return getattr(getattr(resource_response, "contents")[0], "text") 

2024 else: 

2025 # Fallback to per-call sessions when pool disabled or not initialized 

2026 async with streamablehttp_client(url=server_url, headers=authentication, timeout=settings.health_check_timeout, httpx_client_factory=_get_httpx_client_factory) as ( 

2027 read_stream, 

2028 write_stream, 

2029 _get_session_id, 

2030 ): 

2031 async with ClientSession(read_stream, write_stream) as session: 

2032 _ = await session.initialize() 

2033 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter 

2034 resource_response = await session.read_resource(uri=uri) 

2035 return getattr(getattr(resource_response, "contents")[0], "text") 

2036 except Exception as e: 

2037 # Sanitize error message to prevent URL secrets from leaking in logs 

2038 sanitized_error = sanitize_exception_message(str(e), auth_query_params_decrypted) 

2039 logger.debug(f"Exception while connecting to streamablehttp gateway: {sanitized_error}") 

2040 return None 

2041 

2042 if span: 

2043 set_span_attribute(span, "success", True) 

2044 set_span_attribute(span, "duration.ms", (time.monotonic() - start_time) * 1000) 

2045 

2046 resource_text = "" 

2047 if (gateway_transport).lower() == "sse": 

2048 # Note: meta_data not passed - MCP SDK 1.25.0 read_resource() doesn't support it 

2049 resource_text = await connect_to_sse_session(server_url=gateway_url, authentication=headers, uri=uri) 

2050 else: 

2051 # Note: meta_data not passed - MCP SDK 1.25.0 read_resource() doesn't support it 

2052 resource_text = await connect_to_streamablehttp_server(server_url=gateway_url, authentication=headers, uri=uri) 

2053 if span and resource_text is not None and is_output_capture_enabled("invoke.resource"): 

2054 set_span_attribute(span, "langfuse.observation.output", serialize_trace_payload({"content": resource_text})) 

2055 success = True # Mark as successful before returning 

2056 return resource_text 

2057 except Exception as e: 

2058 success = False 

2059 error_message = str(e) 

2060 raise 

2061 finally: 

2062 # Metrics are now recorded only in read_resource finally block 

2063 # This eliminates duplicate metrics and provides a single source of truth 

2064 # End Invoke resource span for Observability dashboard 

2065 # NOTE: Use fresh_db_session() since the original db was released 

2066 # before making HTTP calls to prevent connection pool exhaustion 

2067 if db_span_id and observability_service and not db_span_ended: 

2068 try: 

2069 with fresh_db_session() as fresh_db: 

2070 observability_service.end_span( 

2071 db=fresh_db, 

2072 span_id=db_span_id, 

2073 status="ok" if success else "error", 

2074 status_message=error_message if error_message else None, 

2075 ) 

2076 db_span_ended = True 

2077 logger.debug(f"✓ Ended invoke.resource span: {db_span_id}") 

2078 except Exception as e: 

2079 logger.warning(f"Failed to end observability span for invoking resource: {e}") 

2080 

2081 async def read_resource( 

2082 self, 

2083 db: Session, 

2084 resource_id: Optional[Union[int, str]] = None, 

2085 resource_uri: Optional[str] = None, 

2086 request_id: Optional[str] = None, 

2087 user: Optional[str] = None, 

2088 server_id: Optional[str] = None, 

2089 include_inactive: bool = False, 

2090 token_teams: Optional[List[str]] = None, 

2091 plugin_context_table: Optional[PluginContextTable] = None, 

2092 plugin_global_context: Optional[GlobalContext] = None, 

2093 meta_data: Optional[Dict[str, Any]] = None, 

2094 ) -> Union[ResourceContent, ResourceContents]: 

2095 """Read a resource's content with plugin hook support. 

2096 

2097 Args: 

2098 db: Database session. 

2099 resource_id: Optional ID of the resource to read. 

2100 resource_uri: Optional URI of the resource to read. 

2101 request_id: Optional request ID for tracing. 

2102 user: Optional user email for authorization checks. 

2103 server_id: Optional server ID for server scoping enforcement. 

2104 include_inactive: Whether to include inactive resources. Defaults to False. 

2105 token_teams: Optional list of team IDs from token for authorization. 

2106 None = unrestricted admin, [] = public-only, [...] = team-scoped. 

2107 plugin_context_table: Optional plugin context table from previous hooks for cross-hook state sharing. 

2108 plugin_global_context: Optional global context from middleware for consistency across hooks. 

2109 meta_data: Optional metadata dictionary to pass to the gateway during resource reading. 

2110 

2111 Returns: 

2112 Resource content object 

2113 

2114 Raises: 

2115 ResourceNotFoundError: If resource not found or access denied 

2116 ResourceError: If blocked by plugin 

2117 PluginError: If encounters issue with plugin 

2118 PluginViolationError: If plugin violated the request. Example - In case of OPA plugin, if the request is denied by policy. 

2119 ValueError: If neither resource_id nor resource_uri is provided 

2120 

2121 Examples: 

2122 >>> from mcpgateway.common.models import ResourceContent 

2123 >>> from mcpgateway.services.resource_service import ResourceService 

2124 >>> from unittest.mock import MagicMock, PropertyMock 

2125 >>> service = ResourceService() 

2126 >>> db = MagicMock() 

2127 >>> uri = 'http://example.com/resource.txt' 

2128 >>> mock_resource = MagicMock() 

2129 >>> mock_resource.id = 123 

2130 >>> mock_resource.uri = uri 

2131 >>> type(mock_resource).content = PropertyMock(return_value='test') 

2132 >>> db.execute.return_value.scalar_one_or_none.return_value = mock_resource 

2133 >>> db.get.return_value = mock_resource 

2134 >>> import asyncio 

2135 >>> result = asyncio.run(service.read_resource(db, resource_uri=uri)) 

2136 >>> result.__class__.__name__ == 'ResourceContent' 

2137 True 

2138 

2139 Not found case returns ResourceNotFoundError: 

2140 

2141 >>> db2 = MagicMock() 

2142 >>> db2.execute.return_value.scalar_one_or_none.return_value = None 

2143 >>> db2.get.return_value = None 

2144 >>> import asyncio 

2145 >>> # Disable path validation for doctest 

2146 >>> import mcpgateway.config 

2147 >>> old_val = getattr(mcpgateway.config.settings, 'experimental_validate_io', False) 

2148 >>> mcpgateway.config.settings.experimental_validate_io = False 

2149 >>> def _nf(): 

2150 ... try: 

2151 ... asyncio.run(service.read_resource(db2, resource_uri='abc')) 

2152 ... except ResourceNotFoundError: 

2153 ... return True 

2154 >>> result = _nf() 

2155 >>> mcpgateway.config.settings.experimental_validate_io = old_val 

2156 >>> result 

2157 True 

2158 """ 

2159 start_time = time.monotonic() 

2160 success = False 

2161 error_message = None 

2162 resource_db = None 

2163 server_scoped = False 

2164 resource_db_gateway = None # Only set when eager-loaded via Q2's joinedload 

2165 content = None 

2166 uri = resource_uri or "unknown" 

2167 if resource_id: 

2168 resource_db = db.get(DbResource, resource_id, options=[joinedload(DbResource.gateway)]) 

2169 if resource_db: 

2170 uri = resource_db.uri 

2171 resource_db_gateway = resource_db.gateway # Eager-loaded, safe to access 

2172 # Check enabled status in Python (avoids redundant Q3/Q4 re-fetches) 

2173 if not include_inactive and not resource_db.enabled: 

2174 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive") 

2175 content = resource_db.content 

2176 else: 

2177 uri = None 

2178 

2179 # Create database span for observability dashboard 

2180 trace_id = current_trace_id.get() 

2181 db_span_id = None 

2182 db_span_ended = False 

2183 observability_service = ObservabilityService() if trace_id else None 

2184 

2185 if trace_id and observability_service: 

2186 try: 

2187 db_span_id = observability_service.start_span( 

2188 db=db, 

2189 trace_id=trace_id, 

2190 name="resource.read", 

2191 attributes={ 

2192 "resource.uri": str(resource_uri) if resource_uri else "unknown", 

2193 "user": user or "anonymous", 

2194 "server_id": server_id, 

2195 "request_id": request_id, 

2196 "http.url": uri if uri is not None and uri.startswith("http") else None, 

2197 "resource.type": "template" if (uri is not None and "{" in uri and "}" in uri) else "static", 

2198 }, 

2199 ) 

2200 logger.debug(f"✓ Created resource.read span: {db_span_id} for resource: {uri}") 

2201 except Exception as e: 

2202 logger.warning(f"Failed to start observability span for resource reading: {e}") 

2203 db_span_id = None 

2204 

2205 span_attributes = { 

2206 "resource.uri": resource_uri or "unknown", 

2207 "user": user or "anonymous", 

2208 "server_id": server_id, 

2209 "request_id": request_id, 

2210 "http.url": uri if uri is not None and uri.startswith("http") else None, 

2211 "resource.type": "template" if (uri is not None and "{" in uri and "}" in uri) else "static", 

2212 } 

2213 if is_input_capture_enabled("resource.read"): 

2214 span_attributes["langfuse.observation.input"] = serialize_trace_payload({"uri": resource_uri or resource_id or "unknown"}) 

2215 

2216 with create_span("resource.read", span_attributes) as span: 

2217 try: 

2218 # Generate request ID if not provided 

2219 if not request_id: 

2220 request_id = str(uuid.uuid4()) 

2221 

2222 original_uri = uri 

2223 contexts = None 

2224 

2225 # Check if plugin manager is available and eligible for this request 

2226 plugin_eligible = bool(self._plugin_manager and PLUGINS_AVAILABLE and uri and ("://" in uri)) 

2227 

2228 # Initialize plugin manager if needed (lazy init must happen before has_hooks_for check) 

2229 # pylint: disable=protected-access 

2230 if plugin_eligible and not self._plugin_manager._initialized: 

2231 await self._plugin_manager.initialize() 

2232 # pylint: enable=protected-access 

2233 

2234 # Check if any resource hooks are registered to avoid unnecessary context creation 

2235 has_pre_fetch = plugin_eligible and self._plugin_manager.has_hooks_for(ResourceHookType.RESOURCE_PRE_FETCH) 

2236 has_post_fetch = plugin_eligible and self._plugin_manager.has_hooks_for(ResourceHookType.RESOURCE_POST_FETCH) 

2237 

2238 # Initialize plugin context variables only if hooks are registered 

2239 global_context = None 

2240 if has_pre_fetch or has_post_fetch: 

2241 # Create plugin context 

2242 # Normalize user to an identifier string if provided 

2243 user_id = None 

2244 if user is not None: 

2245 if isinstance(user, dict) and "email" in user: 

2246 user_id = user.get("email") 

2247 elif isinstance(user, str): 

2248 user_id = user 

2249 else: 

2250 # Attempt to fallback to attribute access 

2251 user_id = getattr(user, "email", None) 

2252 

2253 # Use existing global_context from middleware or create new one 

2254 if plugin_global_context: 

2255 global_context = plugin_global_context 

2256 # Update fields with resource-specific information 

2257 if user_id: 

2258 global_context.user = user_id 

2259 if server_id: 

2260 global_context.server_id = server_id 

2261 else: 

2262 # Create new context (fallback when middleware didn't run) 

2263 global_context = GlobalContext(request_id=request_id, user=user_id, server_id=server_id) 

2264 

2265 # Call pre-fetch hooks if registered 

2266 if has_pre_fetch: 

2267 # Create pre-fetch payload 

2268 pre_payload = ResourcePreFetchPayload(uri=uri, metadata={}) 

2269 

2270 # Execute pre-fetch hooks with context from previous hooks 

2271 pre_result, contexts = await self._plugin_manager.invoke_hook( 

2272 ResourceHookType.RESOURCE_PRE_FETCH, 

2273 pre_payload, 

2274 global_context, 

2275 local_contexts=plugin_context_table, # Pass context from previous hooks 

2276 violations_as_exceptions=True, 

2277 ) 

2278 # Use modified URI if plugin changed it 

2279 if pre_result.modified_payload: 

2280 uri = pre_result.modified_payload.uri 

2281 logger.debug(f"Resource URI modified by plugin: {original_uri} -> {uri}") 

2282 

2283 # Validate resource path if experimental validation is enabled 

2284 if getattr(settings, "experimental_validate_io", False) and uri and isinstance(uri, str): 

2285 try: 

2286 SecurityValidator.validate_path(uri, getattr(settings, "allowed_roots", None)) 

2287 except ValueError as e: 

2288 raise ResourceError(f"Path validation failed: {e}") 

2289 

2290 # Original resource fetching logic 

2291 logger.info(f"Fetching resource: {resource_id} (URI: {uri})") 

2292 

2293 # Check if resource's gateway is in direct_proxy mode 

2294 # First, try to find the resource to get its gateway 

2295 # Check for template 

2296 

2297 if resource_db is None and uri is not None: # and "{" in uri and "}" in uri: 

2298 # Matches uri (modified value from pluggins if applicable) 

2299 # with uri from resource DB 

2300 # if uri is of type resource template then resource is retreived from DB 

2301 query = select(DbResource) 

2302 if server_id: 

2303 query = query.join( 

2304 server_resource_association, 

2305 server_resource_association.c.resource_id == DbResource.id, 

2306 ).where(server_resource_association.c.server_id == server_id) 

2307 query = query.where(DbResource.uri == str(uri)).where(DbResource.enabled) 

2308 if include_inactive: 

2309 query = select(DbResource) 

2310 if server_id: 

2311 query = query.join( 

2312 server_resource_association, 

2313 server_resource_association.c.resource_id == DbResource.id, 

2314 ).where(server_resource_association.c.server_id == server_id) 

2315 query = query.where(DbResource.uri == str(uri)) 

2316 try: 

2317 resource_db = db.execute(query).scalar_one_or_none() 

2318 except MultipleResultsFound as exc: 

2319 if server_id: 

2320 raise ResourceError(f"Multiple resources matched URI '{uri}' for server '{server_id}'.") from exc 

2321 raise ResourceError(f"Resource URI '{uri}' is ambiguous across multiple servers; use /servers/{{id}}/mcp.") from exc 

2322 

2323 # Check for direct_proxy mode 

2324 if resource_db and resource_db.gateway and getattr(resource_db.gateway, "gateway_mode", "cache") == "direct_proxy" and settings.mcpgateway_direct_proxy_enabled: 

2325 # SECURITY: Check gateway access before allowing direct proxy 

2326 if not await check_gateway_access(db, resource_db.gateway, user, token_teams): 

2327 raise ResourceNotFoundError(f"Resource not found: {uri}") 

2328 

2329 logger.info(f"Using direct_proxy mode for resource '{uri}' via gateway {resource_db.gateway.id}") 

2330 

2331 try: # First-Party 

2332 # First-Party 

2333 from mcpgateway.common.models import BlobResourceContents, TextResourceContents # pylint: disable=import-outside-toplevel 

2334 

2335 gateway = resource_db.gateway 

2336 

2337 # Prepare headers with gateway auth 

2338 headers = build_gateway_auth_headers(gateway) 

2339 

2340 # Use MCP SDK to connect and read resource 

2341 async with streamablehttp_client(url=gateway.url, headers=headers, timeout=settings.mcpgateway_direct_proxy_timeout) as (read_stream, write_stream, _get_session_id): 

2342 async with ClientSession(read_stream, write_stream) as session: 

2343 await session.initialize() 

2344 

2345 # Note: MCP SDK read_resource() only accepts uri; _meta is not supported 

2346 result = await session.read_resource(uri=uri) 

2347 

2348 # Convert MCP result to MCP-compliant content models 

2349 # result.contents is a list of TextResourceContents or BlobResourceContents 

2350 if result.contents: 

2351 first_content = result.contents[0] 

2352 if hasattr(first_content, "text"): 

2353 content = TextResourceContents(uri=uri, mimeType=first_content.mimeType if hasattr(first_content, "mimeType") else "text/plain", text=first_content.text) 

2354 elif hasattr(first_content, "blob"): 

2355 content = BlobResourceContents( 

2356 uri=uri, mimeType=first_content.mimeType if hasattr(first_content, "mimeType") else "application/octet-stream", blob=first_content.blob 

2357 ) 

2358 else: 

2359 content = TextResourceContents(uri=uri, text="") 

2360 else: 

2361 content = TextResourceContents(uri=uri, text="") 

2362 

2363 success = True 

2364 logger.info( 

2365 f"[READ RESOURCE] Using direct_proxy mode for gateway {SecurityValidator.sanitize_log_message(gateway.id)} (from X-Context-Forge-Gateway-Id header). Meta Attached: {meta_data is not None}" 

2366 ) 

2367 # Skip the rest of the DB lookup logic 

2368 

2369 except Exception as e: 

2370 logger.exception(f"Error in direct_proxy mode for resource '{uri}': {e}") 

2371 raise ResourceError(f"Direct proxy resource read failed: {str(e)}") 

2372 

2373 elif resource_db: 

2374 # Normal cache mode - resource found in DB 

2375 content = resource_db.content 

2376 else: 

2377 # Check the inactivity first using the same server scope that 

2378 # governed the active lookup. Without this, duplicate URIs 

2379 # across different virtual servers/gateways can produce 

2380 # ambiguous results even though the current request is 

2381 # already scoped to a single server. 

2382 inactive_query = select(DbResource) 

2383 if server_id: 

2384 inactive_query = inactive_query.join( 

2385 server_resource_association, 

2386 server_resource_association.c.resource_id == DbResource.id, 

2387 ).where(server_resource_association.c.server_id == server_id) 

2388 try: 

2389 check_inactivity = db.execute(inactive_query.where(DbResource.uri == str(resource_uri)).where(not_(DbResource.enabled))).scalar_one_or_none() 

2390 except MultipleResultsFound as exc: 

2391 if server_id: 

2392 raise ResourceError(f"Multiple inactive resources matched URI '{resource_uri}' for server '{server_id}'.") from exc 

2393 raise ResourceError(f"Resource URI '{resource_uri}' is ambiguous across multiple servers; use /servers/{{id}}/mcp.") from exc 

2394 if check_inactivity: 

2395 raise ResourceNotFoundError(f"Resource '{resource_uri}' exists but is inactive") 

2396 

2397 if resource_db is None: 

2398 if resource_uri: 

2399 # if resource_uri is provided 

2400 # modified uri have templatized resource with prefilled value 

2401 # triggers _read_template_resource 

2402 # it internally checks which uri matches the pattern of modified uri and fetches 

2403 # the one which matches else raises ResourceNotFoundError 

2404 try: 

2405 content = await self._read_template_resource(db, uri) or None 

2406 # ═══════════════════════════════════════════════════════════════════════════ 

2407 # SECURITY: Fetch the template's DbResource record for access checking 

2408 # _read_template_resource returns ResourceContent with the template's ID 

2409 # ═══════════════════════════════════════════════════════════════════════════ 

2410 if content is not None and hasattr(content, "id") and content.id: 

2411 template_query = select(DbResource).where(DbResource.id == str(content.id)) 

2412 if not include_inactive: 

2413 template_query = template_query.where(DbResource.enabled) 

2414 resource_db = db.execute(template_query).scalar_one_or_none() 

2415 except Exception as e: 

2416 raise ResourceNotFoundError(f"Resource template not found for '{resource_uri}'") from e 

2417 

2418 if resource_uri: 

2419 if content is None and resource_db is None: 

2420 raise ResourceNotFoundError(f"Resource template not found for '{resource_uri}'") 

2421 

2422 if resource_id and resource_db is None: 

2423 # if resource_id provided but not found by Q2 (shouldn't normally happen, 

2424 # but handles race conditions where resource is deleted between requests) 

2425 query = select(DbResource).where(DbResource.id == str(resource_id)).where(DbResource.enabled) 

2426 if include_inactive: 

2427 query = select(DbResource).where(DbResource.id == str(resource_id)) 

2428 resource_db = db.execute(query).scalar_one_or_none() 

2429 if resource_db: 

2430 original_uri = resource_db.uri or None 

2431 content = resource_db.content 

2432 else: 

2433 check_inactivity = db.execute(select(DbResource).where(DbResource.id == str(resource_id)).where(not_(DbResource.enabled))).scalar_one_or_none() 

2434 if check_inactivity: 

2435 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive") 

2436 raise ResourceNotFoundError(f"Resource not found for the resource id: {resource_id}") 

2437 

2438 # ═══════════════════════════════════════════════════════════════════════════ 

2439 # SECURITY: Check resource access based on visibility and team membership 

2440 # ═══════════════════════════════════════════════════════════════════════════ 

2441 if resource_db: 

2442 if not await self._check_resource_access(db, resource_db, user, token_teams): 

2443 # Don't reveal resource existence - return generic "not found" 

2444 raise ResourceNotFoundError(f"Resource not found: {resource_uri or resource_id}") 

2445 

2446 # ═══════════════════════════════════════════════════════════════════════════ 

2447 # SECURITY: Enforce server scoping if server_id is provided 

2448 # Resource must be attached to the specified virtual server 

2449 # ═══════════════════════════════════════════════════════════════════════════ 

2450 if server_id: 

2451 server_match = db.execute( 

2452 select(server_resource_association.c.resource_id).where( 

2453 server_resource_association.c.server_id == server_id, 

2454 server_resource_association.c.resource_id == resource_db.id, 

2455 ) 

2456 ).first() 

2457 if not server_match: 

2458 raise ResourceNotFoundError(f"Resource not found: {resource_uri or resource_id}") 

2459 server_scoped = True 

2460 

2461 # Set success attributes on span 

2462 if span: 

2463 set_span_attribute(span, "success", True) 

2464 set_span_attribute(span, "duration.ms", (time.monotonic() - start_time) * 1000) 

2465 if content: 

2466 set_span_attribute(span, "content.size", len(str(content))) 

2467 

2468 success = True 

2469 # Return standardized content without breaking callers that expect passthrough 

2470 # Prefer returning first-class content models or objects with content-like attributes. 

2471 # ResourceContent and TextContent already imported at top level 

2472 

2473 # Release transaction before network calls to avoid idle-in-transaction during invoke_resource 

2474 db.commit() 

2475 

2476 # ═══════════════════════════════════════════════════════════════════════════ 

2477 # RESOLVE CONTENT: Fetch actual content from gateway if needed 

2478 # ═══════════════════════════════════════════════════════════════════════════ 

2479 # If content is a Pydantic content model, invoke gateway 

2480 

2481 # ResourceContents covers TextResourceContents and BlobResourceContents (MCP-compliant) 

2482 # ResourceContent is the legacy model for backwards compatibility 

2483 

2484 if isinstance(content, (ResourceContent, ResourceContents, TextContent)): 

2485 # Metrics are recorded in read_resource finally block for all resources 

2486 resource_response = await self.invoke_resource( 

2487 db=db, 

2488 resource_id=getattr(content, "id"), 

2489 resource_uri=getattr(content, "uri") or None, 

2490 resource_template_uri=getattr(content, "text") or None, 

2491 user_identity=user, 

2492 meta_data=meta_data, 

2493 resource_obj=resource_db, 

2494 gateway_obj=resource_db_gateway, 

2495 server_id=server_id, 

2496 ) 

2497 if resource_response: 

2498 setattr(content, "text", resource_response) 

2499 # If content is any object that quacks like content 

2500 elif hasattr(content, "text") or hasattr(content, "blob"): 

2501 # Metrics are recorded in read_resource finally block for all resources 

2502 if hasattr(content, "blob"): 

2503 resource_response = await self.invoke_resource( 

2504 db=db, 

2505 resource_id=getattr(content, "id"), 

2506 resource_uri=getattr(content, "uri") or None, 

2507 resource_template_uri=getattr(content, "blob") or None, 

2508 user_identity=user, 

2509 meta_data=meta_data, 

2510 resource_obj=resource_db, 

2511 gateway_obj=resource_db_gateway, 

2512 server_id=server_id, 

2513 ) 

2514 if resource_response: 

2515 setattr(content, "blob", resource_response) 

2516 elif hasattr(content, "text"): 

2517 resource_response = await self.invoke_resource( 

2518 db=db, 

2519 resource_id=getattr(content, "id"), 

2520 resource_uri=getattr(content, "uri") or None, 

2521 resource_template_uri=getattr(content, "text") or None, 

2522 user_identity=user, 

2523 meta_data=meta_data, 

2524 resource_obj=resource_db, 

2525 gateway_obj=resource_db_gateway, 

2526 server_id=server_id, 

2527 ) 

2528 if resource_response: 

2529 setattr(content, "text", resource_response) 

2530 # Normalize primitive types to ResourceContent 

2531 elif isinstance(content, bytes): 

2532 content = ResourceContent(type="resource", id=str(resource_id), uri=original_uri, blob=content) 

2533 elif isinstance(content, str): 

2534 content = ResourceContent(type="resource", id=str(resource_id), uri=original_uri, text=content) 

2535 else: 

2536 # Fallback to stringified content 

2537 content = ResourceContent(type="resource", id=str(resource_id) or str(content.id), uri=original_uri or content.uri, text=str(content)) 

2538 

2539 # ═══════════════════════════════════════════════════════════════════════════ 

2540 # POST-FETCH HOOKS: Now called AFTER content is resolved from gateway 

2541 # ═══════════════════════════════════════════════════════════════════════════ 

2542 if has_post_fetch: 

2543 post_payload = ResourcePostFetchPayload(uri=original_uri, content=content) 

2544 post_result, _ = await self._plugin_manager.invoke_hook(ResourceHookType.RESOURCE_POST_FETCH, post_payload, global_context, contexts, violations_as_exceptions=True) 

2545 if post_result.modified_payload: 

2546 content = post_result.modified_payload.content 

2547 

2548 if span and content is not None and is_output_capture_enabled("resource.read"): 

2549 set_span_attribute(span, "langfuse.observation.output", serialize_trace_payload(content)) 

2550 

2551 return content 

2552 except Exception as e: 

2553 success = False 

2554 error_message = str(e) 

2555 raise 

2556 finally: 

2557 # Record metrics only if we found a resource (not for templates) 

2558 logger.debug(f"read_resource finally block: resource_db={'present' if resource_db else None}, resource_id={resource_db.id if resource_db else None}, server_id={server_id}") 

2559 

2560 if resource_db: 

2561 try: 

2562 metrics_buffer.record_resource_metric( 

2563 resource_id=resource_db.id, 

2564 start_time=start_time, 

2565 success=success, 

2566 error_message=error_message, 

2567 ) 

2568 except Exception as metrics_error: 

2569 logger.warning(f"Failed to record resource metric: {metrics_error}") 

2570 

2571 # Record server metrics ONLY when the server scoping check passed. 

2572 # This prevents recording metrics with unvalidated server_id values 

2573 # from admin API headers (X-Server-ID) or RPC params. 

2574 if resource_db and server_scoped: 

2575 try: 

2576 logger.debug(f"Recording server metric for server_id={server_id}, resource_id={resource_db.id}, success={success}") 

2577 # Record server metric only for the specific virtual server being accessed 

2578 metrics_buffer.record_server_metric( 

2579 server_id=server_id, 

2580 start_time=start_time, 

2581 success=success, 

2582 error_message=error_message, 

2583 ) 

2584 except Exception as metrics_error: 

2585 logger.warning(f"Failed to record server metric: {metrics_error}") 

2586 

2587 # End database span for observability dashboard 

2588 # NOTE: Use fresh_db_session() since db may have been closed by invoke_resource 

2589 if db_span_id and observability_service and not db_span_ended: 

2590 try: 

2591 with fresh_db_session() as fresh_db: 

2592 observability_service.end_span( 

2593 db=fresh_db, 

2594 span_id=db_span_id, 

2595 status="ok" if success else "error", 

2596 status_message=error_message if error_message else None, 

2597 ) 

2598 db_span_ended = True 

2599 logger.debug(f"✓ Ended resource.read span: {db_span_id}") 

2600 except Exception as e: 

2601 logger.warning(f"Failed to end observability span for resource reading: {e}") 

2602 

2603 async def set_resource_state(self, db: Session, resource_id: int, activate: bool, user_email: Optional[str] = None, skip_cache_invalidation: bool = False) -> ResourceRead: 

2604 """ 

2605 Set the activation status of a resource. 

2606 

2607 Args: 

2608 db: Database session 

2609 resource_id: Resource ID 

2610 activate: True to activate, False to deactivate 

2611 user_email: Optional[str] The email of the user to check if the user has permission to modify. 

2612 skip_cache_invalidation: If True, skip cache invalidation (used for batch operations). 

2613 

2614 Returns: 

2615 The updated ResourceRead object 

2616 

2617 Raises: 

2618 ResourceNotFoundError: If the resource is not found. 

2619 ResourceLockConflictError: If the resource is locked by another transaction. 

2620 ResourceError: For other errors. 

2621 PermissionError: If user doesn't own the resource. 

2622 

2623 Examples: 

2624 >>> from mcpgateway.services.resource_service import ResourceService 

2625 >>> from unittest.mock import MagicMock, AsyncMock 

2626 >>> from mcpgateway.schemas import ResourceRead 

2627 >>> service = ResourceService() 

2628 >>> db = MagicMock() 

2629 >>> resource = MagicMock() 

2630 >>> db.get.return_value = resource 

2631 >>> db.commit = MagicMock() 

2632 >>> db.refresh = MagicMock() 

2633 >>> service._notify_resource_activated = AsyncMock() 

2634 >>> service._notify_resource_deactivated = AsyncMock() 

2635 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read') 

2636 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read') 

2637 >>> import asyncio 

2638 >>> asyncio.run(service.set_resource_state(db, 1, True)) 

2639 'resource_read' 

2640 """ 

2641 try: 

2642 # Use nowait=True to fail fast if row is locked, preventing lock contention under high load 

2643 try: 

2644 resource = get_for_update(db, DbResource, resource_id, nowait=True) 

2645 except OperationalError as lock_err: 

2646 # Row is locked by another transaction - fail fast with 409 

2647 db.rollback() 

2648 raise ResourceLockConflictError(f"Resource {resource_id} is currently being modified by another request") from lock_err 

2649 if not resource: 

2650 raise ResourceNotFoundError(f"Resource not found: {resource_id}") 

2651 

2652 if user_email: 

2653 # First-Party 

2654 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

2655 

2656 permission_service = PermissionService(db) 

2657 if not await permission_service.check_resource_ownership(user_email, resource): 

2658 raise PermissionError("Only the owner can activate the Resource" if activate else "Only the owner can deactivate the Resource") 

2659 

2660 # Update status if it's different 

2661 if resource.enabled != activate: 

2662 resource.enabled = activate 

2663 resource.updated_at = datetime.now(timezone.utc) 

2664 db.commit() 

2665 db.refresh(resource) 

2666 

2667 # Invalidate cache after status change (skip for batch operations) 

2668 if not skip_cache_invalidation: 

2669 cache = _get_registry_cache() 

2670 await cache.invalidate_resources() 

2671 

2672 # Notify subscribers 

2673 if activate: 

2674 await self._notify_resource_activated(resource) 

2675 else: 

2676 await self._notify_resource_deactivated(resource) 

2677 

2678 logger.info(f"Resource {resource.uri} {'activated' if activate else 'deactivated'}") 

2679 

2680 # Structured logging: Audit trail for resource state change 

2681 audit_trail.log_action( 

2682 user_id=user_email or "system", 

2683 action="set_resource_state", 

2684 resource_type="resource", 

2685 resource_id=str(resource.id), 

2686 resource_name=resource.name, 

2687 user_email=user_email, 

2688 team_id=resource.team_id, 

2689 new_values={ 

2690 "enabled": resource.enabled, 

2691 }, 

2692 context={ 

2693 "action": "activate" if activate else "deactivate", 

2694 }, 

2695 db=db, 

2696 ) 

2697 

2698 # Structured logging: Log successful resource state change 

2699 structured_logger.log( 

2700 level="INFO", 

2701 message=f"Resource {'activated' if activate else 'deactivated'} successfully", 

2702 event_type="resource_state_changed", 

2703 component="resource_service", 

2704 user_email=user_email, 

2705 team_id=resource.team_id, 

2706 resource_type="resource", 

2707 resource_id=str(resource.id), 

2708 custom_fields={ 

2709 "resource_uri": resource.uri, 

2710 "enabled": resource.enabled, 

2711 }, 

2712 ) 

2713 

2714 resource.team = self._get_team_name(db, resource.team_id) 

2715 return self.convert_resource_to_read(resource) 

2716 except PermissionError as e: 

2717 # Structured logging: Log permission error 

2718 structured_logger.log( 

2719 level="WARNING", 

2720 message="Resource state change failed due to permission error", 

2721 event_type="resource_state_change_permission_denied", 

2722 component="resource_service", 

2723 user_email=user_email, 

2724 resource_type="resource", 

2725 resource_id=str(resource_id), 

2726 error=e, 

2727 ) 

2728 raise e 

2729 except ResourceLockConflictError: 

2730 # Re-raise lock conflicts without wrapping - allows 409 response 

2731 raise 

2732 except ResourceNotFoundError: 

2733 # Re-raise not found without wrapping - allows 404 response 

2734 raise 

2735 except Exception as e: 

2736 db.rollback() 

2737 

2738 # Structured logging: Log generic resource state change failure 

2739 structured_logger.log( 

2740 level="ERROR", 

2741 message="Resource state change failed", 

2742 event_type="resource_state_change_failed", 

2743 component="resource_service", 

2744 user_email=user_email, 

2745 resource_type="resource", 

2746 resource_id=str(resource_id), 

2747 error=e, 

2748 ) 

2749 raise ResourceError(f"Failed to set resource state: {str(e)}") 

2750 

2751 async def subscribe_resource( 

2752 self, 

2753 db: Session, 

2754 subscription: ResourceSubscription, 

2755 *, 

2756 user_email: Optional[str] = None, 

2757 token_teams: Optional[List[str]] = None, 

2758 ) -> None: 

2759 """ 

2760 Subscribe to a resource. 

2761 

2762 Args: 

2763 db: Database session 

2764 subscription: Resource subscription object 

2765 user_email: Requester email used for visibility checks. 

2766 token_teams: Token team scope used for visibility checks. 

2767 

2768 Raises: 

2769 ResourceNotFoundError: If the resource is not found or is inactive 

2770 PermissionError: If the requester is not authorized for the resource 

2771 ResourceError: For other subscription errors 

2772 

2773 Examples: 

2774 >>> from mcpgateway.services.resource_service import ResourceService 

2775 >>> from unittest.mock import MagicMock 

2776 >>> service = ResourceService() 

2777 >>> db = MagicMock() 

2778 >>> subscription = MagicMock() 

2779 >>> import asyncio 

2780 >>> asyncio.run(service.subscribe_resource(db, subscription)) 

2781 """ 

2782 try: 

2783 # Verify resource exists (single query to avoid TOCTOU between active/inactive checks) 

2784 resource = db.execute(select(DbResource).where(DbResource.uri == subscription.uri)).scalar_one_or_none() 

2785 

2786 if not resource: 

2787 raise ResourceNotFoundError(f"Resource not found: {subscription.uri}") 

2788 

2789 if not resource.enabled: 

2790 raise ResourceNotFoundError(f"Resource '{subscription.uri}' exists but is inactive") 

2791 

2792 if not await self._check_resource_access(db, resource, user_email=user_email, token_teams=token_teams): 

2793 raise PermissionError(f"Access denied for resource subscription: {subscription.uri}") 

2794 

2795 # Create subscription 

2796 db_sub = DbSubscription(resource_id=resource.id, subscriber_id=subscription.subscriber_id) 

2797 db.add(db_sub) 

2798 db.commit() 

2799 

2800 logger.info(f"Added subscription for {subscription.uri} by {subscription.subscriber_id}") 

2801 

2802 except PermissionError: 

2803 db.rollback() 

2804 raise 

2805 except Exception as e: 

2806 db.rollback() 

2807 raise ResourceError(f"Failed to subscribe: {str(e)}") 

2808 

2809 async def unsubscribe_resource(self, db: Session, subscription: ResourceSubscription) -> None: 

2810 """ 

2811 Unsubscribe from a resource. 

2812 

2813 Args: 

2814 db: Database session 

2815 subscription: Resource subscription object 

2816 

2817 Raises: 

2818 

2819 Examples: 

2820 >>> from mcpgateway.services.resource_service import ResourceService 

2821 >>> from unittest.mock import MagicMock 

2822 >>> service = ResourceService() 

2823 >>> db = MagicMock() 

2824 >>> subscription = MagicMock() 

2825 >>> import asyncio 

2826 >>> asyncio.run(service.unsubscribe_resource(db, subscription)) 

2827 """ 

2828 try: 

2829 # Find resource 

2830 resource = db.execute(select(DbResource).where(DbResource.uri == subscription.uri)).scalar_one_or_none() 

2831 

2832 if not resource: 

2833 return 

2834 

2835 # Remove subscription 

2836 db.execute(select(DbSubscription).where(DbSubscription.resource_id == resource.id).where(DbSubscription.subscriber_id == subscription.subscriber_id)).delete() 

2837 db.commit() 

2838 

2839 logger.info(f"Removed subscription for {subscription.uri} by {subscription.subscriber_id}") 

2840 

2841 except Exception as e: 

2842 db.rollback() 

2843 logger.error(f"Failed to unsubscribe: {str(e)}") 

2844 

2845 async def update_resource( 

2846 self, 

2847 db: Session, 

2848 resource_id: Union[int, str], 

2849 resource_update: ResourceUpdate, 

2850 modified_by: Optional[str] = None, 

2851 modified_from_ip: Optional[str] = None, 

2852 modified_via: Optional[str] = None, 

2853 modified_user_agent: Optional[str] = None, 

2854 user_email: Optional[str] = None, 

2855 ) -> ResourceRead: 

2856 """ 

2857 Update a resource. 

2858 

2859 MIME Type Resolution Priority: 

2860 1. **User-provided type** (highest priority) - Caller explicitly declares content type 

2861 2. **URI-detected type** - Fallback when empty string provided 

2862 3. **Content-based fallback** - If empty string and no URI detection 

2863 4. **Preserve existing** - If None/not provided 

2864 

2865 This ensures accuracy by preferring URL-detected types (e.g., .md → text/markdown) 

2866 over potentially incorrect user input. 

2867 

2868 Args: 

2869 db: Database session 

2870 resource_id: Resource ID 

2871 resource_update: Resource update object 

2872 modified_by: Username of the person modifying the resource 

2873 modified_from_ip: IP address where the modification request originated 

2874 modified_via: Source of modification (ui/api/import) 

2875 modified_user_agent: User agent string from the modification request 

2876 user_email: Email of user performing update (for ownership check) 

2877 

2878 Returns: 

2879 The updated ResourceRead object 

2880 

2881 Raises: 

2882 ResourceNotFoundError: If the resource is not found 

2883 ResourceURIConflictError: If a resource with the same URI already exists. 

2884 PermissionError: If user doesn't own the resource 

2885 ResourceError: For other update errors 

2886 IntegrityError: If a database integrity error occurs. 

2887 Exception: For unexpected errors 

2888 ContentSizeError: For content size exceed 

2889 ContentTypeError: If the MIME type is not allowed 

2890 

2891 Example: 

2892 >>> from mcpgateway.services.resource_service import ResourceService 

2893 >>> from unittest.mock import MagicMock, AsyncMock 

2894 >>> from mcpgateway.schemas import ResourceRead, ResourceUpdate 

2895 >>> service = ResourceService() 

2896 >>> db = MagicMock() 

2897 >>> resource = MagicMock() 

2898 >>> resource.uri = "test://example" 

2899 >>> resource.visibility = "private" 

2900 >>> resource.team_id = None 

2901 >>> db.get.return_value = resource 

2902 >>> db.commit = MagicMock() 

2903 >>> db.refresh = MagicMock() 

2904 >>> service._notify_resource_updated = AsyncMock() 

2905 >>> service._detect_mime_type_from_uri = MagicMock(return_value=None) 

2906 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read') 

2907 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read') 

2908 >>> import asyncio 

2909 >>> asyncio.run(service.update_resource(db, 'resource_id', ResourceUpdate())) 

2910 'resource_read' 

2911 """ 

2912 try: 

2913 logger.info(f"Updating resource: {resource_id}") 

2914 resource = get_for_update(db, DbResource, resource_id) 

2915 if not resource: 

2916 raise ResourceNotFoundError(f"Resource not found: {resource_id}") 

2917 

2918 # # Check for uri conflict if uri is being changed and visibility is public 

2919 if resource_update.uri and resource_update.uri != resource.uri: 

2920 visibility = resource_update.visibility or resource.visibility 

2921 team_id = resource_update.team_id or resource.team_id 

2922 if visibility.lower() == "public": 

2923 # Check for existing public resources with the same uri 

2924 existing_resource = get_for_update(db, DbResource, where=and_(DbResource.uri == resource_update.uri, DbResource.visibility == "public", DbResource.id != resource_id)) 

2925 if existing_resource: 

2926 raise ResourceURIConflictError(resource_update.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility) 

2927 elif visibility.lower() == "team" and team_id: 

2928 # Check for existing team resource with the same uri 

2929 existing_resource = get_for_update( 

2930 db, DbResource, where=and_(DbResource.uri == resource_update.uri, DbResource.visibility == "team", DbResource.team_id == team_id, DbResource.id != resource_id) 

2931 ) 

2932 if existing_resource: 

2933 raise ResourceURIConflictError(resource_update.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility) 

2934 

2935 # Check ownership if user_email provided 

2936 if user_email: 

2937 # First-Party 

2938 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

2939 

2940 permission_service = PermissionService(db) 

2941 if not await permission_service.check_resource_ownership(user_email, resource): 

2942 raise PermissionError("Only the owner can update this resource") 

2943 

2944 # Update fields if provided 

2945 if resource_update.uri is not None: 

2946 resource.uri = resource_update.uri 

2947 if resource_update.name is not None: 

2948 resource.name = resource_update.name 

2949 if resource_update.description is not None: 

2950 resource.description = resource_update.description 

2951 if resource_update.title is not None: 

2952 resource.title = resource_update.title 

2953 # Resolve the new MIME type into a local variable and validate 

2954 # BEFORE writing to the tracked model to avoid dirty session state. 

2955 # Priority: user-provided > URI-detected > content-based fallback. 

2956 resolved_mime_type = None 

2957 if resource_update.mime_type is not None: 

2958 if resource_update.mime_type: 

2959 # Non-empty: user explicitly provided a type — trust it 

2960 resolved_mime_type = resource_update.mime_type 

2961 else: 

2962 # Empty string: auto-detect from URI/content 

2963 content_for_detection = resource_update.content if resource_update.content is not None else (resource.text_content or resource.binary_content) 

2964 uri_for_detection = resource_update.uri if resource_update.uri is not None else resource.uri 

2965 resolved_mime_type = self._detect_mime_type(uri_for_detection, content_for_detection) 

2966 logger.info(f"Auto-detected MIME type for resource {resource_id}: {resolved_mime_type}") 

2967 elif resource_update.uri is not None: 

2968 # URI changed but no MIME type provided — try URI detection as fallback 

2969 url_detected_mime = self._detect_mime_type_from_uri(resource_update.uri) 

2970 if url_detected_mime: 

2971 resolved_mime_type = url_detected_mime 

2972 

2973 # Validate the candidate MIME type BEFORE mutating the model 

2974 content_security = get_content_security_service() 

2975 if resolved_mime_type is not None: 

2976 content_security.validate_resource_mime_type( 

2977 mime_type=resolved_mime_type, 

2978 uri=resource_update.uri or resource.uri, 

2979 user_email=modified_by or user_email, 

2980 ip_address=modified_from_ip, 

2981 ) 

2982 # Validation passed — safe to assign 

2983 resource.mime_type = resolved_mime_type 

2984 

2985 if resource_update.uri_template is not None: 

2986 resource.uri_template = resource_update.uri_template 

2987 if resource_update.visibility is not None: 

2988 # Validate visibility transitions 

2989 if resource_update.visibility == "team": 

2990 target_team_id = resource_update.team_id if resource_update.team_id is not None else resource.team_id 

2991 _validate_resource_team_assignment(db, user_email, target_team_id) 

2992 resource.visibility = resource_update.visibility 

2993 

2994 # Update content if provided 

2995 if resource_update.content is not None: 

2996 # Validate content size before updating 

2997 content_security.validate_resource_size( 

2998 content=resource_update.content, 

2999 uri=resource_update.uri or resource.uri, 

3000 user_email=modified_by or user_email, 

3001 ip_address=modified_from_ip, 

3002 ) 

3003 

3004 # Determine content storage 

3005 is_text = resource.mime_type and resource.mime_type.startswith("text/") or isinstance(resource_update.content, str) 

3006 

3007 resource.text_content = resource_update.content if is_text else None 

3008 resource.binary_content = ( 

3009 resource_update.content.encode() if is_text and isinstance(resource_update.content, str) else resource_update.content if isinstance(resource_update.content, bytes) else None 

3010 ) 

3011 resource.size = len(resource_update.content) 

3012 

3013 # Update tags if provided 

3014 if resource_update.tags is not None: 

3015 resource.tags = resource_update.tags 

3016 

3017 # Update team assignment if provided, validating ownership 

3018 if resource_update.team_id is not None: 

3019 if resource_update.team_id != resource.team_id: 

3020 _validate_resource_team_assignment(db, user_email, resource_update.team_id) 

3021 resource.team_id = resource_update.team_id 

3022 

3023 # Update metadata fields 

3024 resource.updated_at = datetime.now(timezone.utc) 

3025 if modified_by: 

3026 resource.modified_by = modified_by 

3027 if modified_from_ip: 

3028 resource.modified_from_ip = modified_from_ip 

3029 if modified_via: 

3030 resource.modified_via = modified_via 

3031 if modified_user_agent: 

3032 resource.modified_user_agent = modified_user_agent 

3033 if hasattr(resource, "version") and resource.version is not None: 

3034 resource.version = resource.version + 1 

3035 else: 

3036 resource.version = 1 

3037 db.commit() 

3038 db.refresh(resource) 

3039 

3040 # Invalidate cache after successful update 

3041 cache = _get_registry_cache() 

3042 await cache.invalidate_resources() 

3043 # Also invalidate tags cache since resource tags may have changed 

3044 # First-Party 

3045 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

3046 

3047 await admin_stats_cache.invalidate_tags() 

3048 # First-Party 

3049 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

3050 

3051 metrics_cache.invalidate_prefix("top_resources:") 

3052 metrics_cache.invalidate("resources") 

3053 

3054 # Notify subscribers 

3055 await self._notify_resource_updated(resource) 

3056 

3057 logger.info(f"Updated resource: {resource.uri}") 

3058 

3059 # Structured logging: Audit trail for resource update 

3060 changes = [] 

3061 if resource_update.uri: 

3062 changes.append(f"uri: {resource_update.uri}") 

3063 if resource_update.visibility: 

3064 changes.append(f"visibility: {resource_update.visibility}") 

3065 if resource_update.description: 

3066 changes.append("description updated") 

3067 

3068 audit_trail.log_action( 

3069 user_id=user_email or modified_by or "system", 

3070 action="update_resource", 

3071 resource_type="resource", 

3072 resource_id=str(resource.id), 

3073 resource_name=resource.name, 

3074 user_email=user_email, 

3075 team_id=resource.team_id, 

3076 client_ip=modified_from_ip, 

3077 user_agent=modified_user_agent, 

3078 new_values={ 

3079 "uri": resource.uri, 

3080 "name": resource.name, 

3081 "version": resource.version, 

3082 }, 

3083 context={ 

3084 "modified_via": modified_via, 

3085 "changes": ", ".join(changes) if changes else "metadata only", 

3086 }, 

3087 db=db, 

3088 ) 

3089 

3090 # Structured logging: Log successful resource update 

3091 structured_logger.log( 

3092 level="INFO", 

3093 message="Resource updated successfully", 

3094 event_type="resource_updated", 

3095 component="resource_service", 

3096 user_id=modified_by, 

3097 user_email=user_email, 

3098 team_id=resource.team_id, 

3099 resource_type="resource", 

3100 resource_id=str(resource.id), 

3101 custom_fields={ 

3102 "resource_uri": resource.uri, 

3103 "version": resource.version, 

3104 }, 

3105 ) 

3106 

3107 return self.convert_resource_to_read(resource) 

3108 except PermissionError as pe: 

3109 db.rollback() 

3110 

3111 # Structured logging: Log permission error 

3112 structured_logger.log( 

3113 level="WARNING", 

3114 message="Resource update failed due to permission error", 

3115 event_type="resource_update_permission_denied", 

3116 component="resource_service", 

3117 user_email=user_email, 

3118 resource_type="resource", 

3119 resource_id=str(resource_id), 

3120 error=pe, 

3121 ) 

3122 raise 

3123 except IntegrityError as ie: 

3124 db.rollback() 

3125 logger.error(f"IntegrityErrors in group: {ie}") 

3126 

3127 # Structured logging: Log database integrity error 

3128 structured_logger.log( 

3129 level="ERROR", 

3130 message="Resource update failed due to database integrity error", 

3131 event_type="resource_update_failed", 

3132 component="resource_service", 

3133 user_id=modified_by, 

3134 user_email=user_email, 

3135 resource_type="resource", 

3136 resource_id=str(resource_id), 

3137 error=ie, 

3138 ) 

3139 raise ie 

3140 except ContentSizeError as cse: 

3141 db.rollback() 

3142 structured_logger.log( 

3143 level="ERROR", 

3144 message=f"Resource content size limit exceeded: {cse.actual_size} bytes (max: {cse.max_size} bytes)", 

3145 event_type="resource_content_size_exceed", 

3146 component="resource_service", 

3147 resource_type="resource", 

3148 user_id=modified_by, 

3149 user_email=user_email, 

3150 resource_id=str(resource_id), 

3151 error=cse, 

3152 ) 

3153 raise cse 

3154 except ContentTypeError as cte: 

3155 db.rollback() 

3156 structured_logger.log( 

3157 level="ERROR", 

3158 message=f"Resource MIME type not allowed: {cte.mime_type}", 

3159 event_type="resource_mime_type_rejected", 

3160 component="resource_service", 

3161 resource_type="resource", 

3162 user_id=modified_by, 

3163 user_email=user_email, 

3164 resource_id=str(resource_id), 

3165 error=cte, 

3166 custom_fields={ 

3167 "mime_type": cte.mime_type, 

3168 }, 

3169 ) 

3170 raise cte 

3171 except ResourceURIConflictError as pe: 

3172 logger.error(f"Resource URI conflict: {pe}") 

3173 

3174 # Structured logging: Log URI conflict error 

3175 structured_logger.log( 

3176 level="WARNING", 

3177 message="Resource update failed due to URI conflict", 

3178 event_type="resource_uri_conflict", 

3179 component="resource_service", 

3180 user_id=modified_by, 

3181 user_email=user_email, 

3182 resource_type="resource", 

3183 resource_id=str(resource_id), 

3184 error=pe, 

3185 ) 

3186 raise pe 

3187 except Exception as e: 

3188 db.rollback() 

3189 if isinstance(e, ResourceNotFoundError): 

3190 # Structured logging: Log not found error 

3191 structured_logger.log( 

3192 level="ERROR", 

3193 message="Resource update failed - resource not found", 

3194 event_type="resource_not_found", 

3195 component="resource_service", 

3196 user_email=user_email, 

3197 resource_type="resource", 

3198 resource_id=str(resource_id), 

3199 error=e, 

3200 ) 

3201 raise e 

3202 

3203 # Structured logging: Log generic resource update failure 

3204 structured_logger.log( 

3205 level="ERROR", 

3206 message="Resource update failed", 

3207 event_type="resource_update_failed", 

3208 component="resource_service", 

3209 user_id=modified_by, 

3210 user_email=user_email, 

3211 resource_type="resource", 

3212 resource_id=str(resource_id), 

3213 error=e, 

3214 ) 

3215 raise ResourceError(f"Failed to update resource: {str(e)}") 

3216 

3217 async def delete_resource(self, db: Session, resource_id: Union[int, str], user_email: Optional[str] = None, purge_metrics: bool = False) -> None: 

3218 """ 

3219 Delete a resource. 

3220 

3221 Args: 

3222 db: Database session 

3223 resource_id: Resource ID 

3224 user_email: Email of user performing delete (for ownership check) 

3225 purge_metrics: If True, delete raw + rollup metrics for this resource 

3226 

3227 Raises: 

3228 ResourceNotFoundError: If the resource is not found 

3229 PermissionError: If user doesn't own the resource 

3230 ResourceError: For other deletion errors 

3231 

3232 Example: 

3233 >>> from mcpgateway.services.resource_service import ResourceService 

3234 >>> from unittest.mock import MagicMock, AsyncMock 

3235 >>> service = ResourceService() 

3236 >>> db = MagicMock() 

3237 >>> resource = MagicMock() 

3238 >>> db.get.return_value = resource 

3239 >>> db.delete = MagicMock() 

3240 >>> db.commit = MagicMock() 

3241 >>> service._notify_resource_deleted = AsyncMock() 

3242 >>> import asyncio 

3243 >>> asyncio.run(service.delete_resource(db, 'resource_id')) 

3244 """ 

3245 try: 

3246 # Find resource by its URI. 

3247 resource = db.execute(select(DbResource).where(DbResource.id == resource_id)).scalar_one_or_none() 

3248 

3249 if not resource: 

3250 # If resource doesn't exist, rollback and re-raise a ResourceNotFoundError. 

3251 db.rollback() 

3252 raise ResourceNotFoundError(f"Resource not found: {resource_id}") 

3253 

3254 # Check ownership if user_email provided 

3255 if user_email: 

3256 # First-Party 

3257 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

3258 

3259 permission_service = PermissionService(db) 

3260 if not await permission_service.check_resource_ownership(user_email, resource): 

3261 raise PermissionError("Only the owner can delete this resource") 

3262 

3263 # Store resource info for notification before deletion. 

3264 resource_info = { 

3265 "id": resource.id, 

3266 "uri": resource.uri, 

3267 "name": resource.name, 

3268 "visibility": getattr(resource, "visibility", "public"), 

3269 "team_id": getattr(resource, "team_id", None), 

3270 "owner_email": getattr(resource, "owner_email", None), 

3271 } 

3272 

3273 # Remove subscriptions using SQLAlchemy's delete() expression. 

3274 db.execute(delete(DbSubscription).where(DbSubscription.resource_id == resource.id)) 

3275 

3276 if purge_metrics: 

3277 with pause_rollup_during_purge(reason=f"purge_resource:{resource.id}"): 

3278 delete_metrics_in_batches(db, ResourceMetric, ResourceMetric.resource_id, resource.id) 

3279 delete_metrics_in_batches(db, ResourceMetricsHourly, ResourceMetricsHourly.resource_id, resource.id) 

3280 

3281 # Hard delete the resource. 

3282 resource_uri = resource.uri 

3283 resource_name = resource.name 

3284 resource_team_id = resource.team_id 

3285 

3286 db.delete(resource) 

3287 db.commit() 

3288 

3289 # Invalidate cache after successful deletion 

3290 cache = _get_registry_cache() 

3291 await cache.invalidate_resources() 

3292 # Also invalidate tags cache since resource tags may have changed 

3293 # First-Party 

3294 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

3295 

3296 await admin_stats_cache.invalidate_tags() 

3297 

3298 # Notify subscribers. 

3299 await self._notify_resource_deleted(resource_info) 

3300 

3301 logger.info(f"Permanently deleted resource: {resource.uri}") 

3302 

3303 # Structured logging: Audit trail for resource deletion 

3304 audit_trail.log_action( 

3305 user_id=user_email or "system", 

3306 action="delete_resource", 

3307 resource_type="resource", 

3308 resource_id=str(resource_info["id"]), 

3309 resource_name=resource_name, 

3310 user_email=user_email, 

3311 team_id=resource_team_id, 

3312 old_values={ 

3313 "uri": resource_uri, 

3314 "name": resource_name, 

3315 }, 

3316 db=db, 

3317 ) 

3318 

3319 # Structured logging: Log successful resource deletion 

3320 structured_logger.log( 

3321 level="INFO", 

3322 message="Resource deleted successfully", 

3323 event_type="resource_deleted", 

3324 component="resource_service", 

3325 user_email=user_email, 

3326 team_id=resource_team_id, 

3327 resource_type="resource", 

3328 resource_id=str(resource_info["id"]), 

3329 custom_fields={ 

3330 "resource_uri": resource_uri, 

3331 "purge_metrics": purge_metrics, 

3332 }, 

3333 ) 

3334 

3335 except PermissionError as pe: 

3336 db.rollback() 

3337 

3338 # Structured logging: Log permission error 

3339 structured_logger.log( 

3340 level="WARNING", 

3341 message="Resource deletion failed due to permission error", 

3342 event_type="resource_delete_permission_denied", 

3343 component="resource_service", 

3344 user_email=user_email, 

3345 resource_type="resource", 

3346 resource_id=str(resource_id), 

3347 error=pe, 

3348 ) 

3349 raise 

3350 except ResourceNotFoundError as rnfe: 

3351 # ResourceNotFoundError is re-raised to be handled in the endpoint. 

3352 # Structured logging: Log not found error 

3353 structured_logger.log( 

3354 level="ERROR", 

3355 message="Resource deletion failed - resource not found", 

3356 event_type="resource_not_found", 

3357 component="resource_service", 

3358 user_email=user_email, 

3359 resource_type="resource", 

3360 resource_id=str(resource_id), 

3361 error=rnfe, 

3362 ) 

3363 raise 

3364 except Exception as e: 

3365 db.rollback() 

3366 

3367 # Structured logging: Log generic resource deletion failure 

3368 structured_logger.log( 

3369 level="ERROR", 

3370 message="Resource deletion failed", 

3371 event_type="resource_deletion_failed", 

3372 component="resource_service", 

3373 user_email=user_email, 

3374 resource_type="resource", 

3375 resource_id=str(resource_id), 

3376 error=e, 

3377 ) 

3378 raise ResourceError(f"Failed to delete resource: {str(e)}") 

3379 

3380 async def get_resource_by_id(self, db: Session, resource_id: str, include_inactive: bool = False) -> ResourceRead: 

3381 """ 

3382 Get a resource by ID. 

3383 

3384 Args: 

3385 db: Database session 

3386 resource_id: Resource ID 

3387 include_inactive: Whether to include inactive resources 

3388 

3389 Returns: 

3390 ResourceRead: The resource object 

3391 

3392 Raises: 

3393 ResourceNotFoundError: If the resource is not found 

3394 

3395 Example: 

3396 >>> from mcpgateway.services.resource_service import ResourceService 

3397 >>> from unittest.mock import MagicMock 

3398 >>> service = ResourceService() 

3399 >>> db = MagicMock() 

3400 >>> resource = MagicMock() 

3401 >>> db.execute.return_value.scalar_one_or_none.return_value = resource 

3402 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read') 

3403 >>> import asyncio 

3404 >>> asyncio.run(service.get_resource_by_id(db, "39334ce0ed2644d79ede8913a66930c9")) 

3405 'resource_read' 

3406 """ 

3407 with create_span("resource.get", {"resource.id": resource_id, "include_inactive": include_inactive}): 

3408 query = select(DbResource).where(DbResource.id == resource_id) 

3409 

3410 if not include_inactive: 

3411 query = query.where(DbResource.enabled) 

3412 

3413 resource = db.execute(query).scalar_one_or_none() 

3414 

3415 if not resource: 

3416 if not include_inactive: 

3417 # Check if inactive resource exists 

3418 inactive_resource = db.execute(select(DbResource).where(DbResource.id == resource_id).where(not_(DbResource.enabled))).scalar_one_or_none() 

3419 

3420 if inactive_resource: 

3421 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive") 

3422 

3423 raise ResourceNotFoundError(f"Resource not found: {resource_id}") 

3424 

3425 resource_read = self.convert_resource_to_read(resource) 

3426 

3427 structured_logger.log( 

3428 level="INFO", 

3429 message="Resource retrieved successfully", 

3430 event_type="resource_viewed", 

3431 component="resource_service", 

3432 team_id=getattr(resource, "team_id", None), 

3433 resource_type="resource", 

3434 resource_id=str(resource.id), 

3435 custom_fields={ 

3436 "resource_uri": resource.uri, 

3437 "include_inactive": include_inactive, 

3438 }, 

3439 ) 

3440 

3441 return resource_read 

3442 

3443 async def _notify_resource_activated(self, resource: DbResource) -> None: 

3444 """ 

3445 Notify subscribers of resource activation. 

3446 

3447 Args: 

3448 resource: Resource to activate 

3449 """ 

3450 event = { 

3451 "type": "resource_activated", 

3452 "data": { 

3453 "id": resource.id, 

3454 "uri": resource.uri, 

3455 "name": resource.name, 

3456 "enabled": True, 

3457 "visibility": getattr(resource, "visibility", "public"), 

3458 "team_id": getattr(resource, "team_id", None), 

3459 "owner_email": getattr(resource, "owner_email", None), 

3460 }, 

3461 "timestamp": datetime.now(timezone.utc).isoformat(), 

3462 } 

3463 await self._publish_event(event) 

3464 

3465 async def _notify_resource_deactivated(self, resource: DbResource) -> None: 

3466 """ 

3467 Notify subscribers of resource deactivation. 

3468 

3469 Args: 

3470 resource: Resource to deactivate 

3471 """ 

3472 event = { 

3473 "type": "resource_deactivated", 

3474 "data": { 

3475 "id": resource.id, 

3476 "uri": resource.uri, 

3477 "name": resource.name, 

3478 "enabled": False, 

3479 "visibility": getattr(resource, "visibility", "public"), 

3480 "team_id": getattr(resource, "team_id", None), 

3481 "owner_email": getattr(resource, "owner_email", None), 

3482 }, 

3483 "timestamp": datetime.now(timezone.utc).isoformat(), 

3484 } 

3485 await self._publish_event(event) 

3486 

3487 async def _notify_resource_deleted(self, resource_info: Dict[str, Any]) -> None: 

3488 """ 

3489 Notify subscribers of resource deletion. 

3490 

3491 Args: 

3492 resource_info: Dictionary of resource to delete 

3493 """ 

3494 event = { 

3495 "type": "resource_deleted", 

3496 "data": resource_info, 

3497 "timestamp": datetime.now(timezone.utc).isoformat(), 

3498 } 

3499 await self._publish_event(event) 

3500 

3501 async def _notify_resource_removed(self, resource: DbResource) -> None: 

3502 """ 

3503 Notify subscribers of resource removal. 

3504 

3505 Args: 

3506 resource: Resource to remove 

3507 """ 

3508 event = { 

3509 "type": "resource_removed", 

3510 "data": { 

3511 "id": resource.id, 

3512 "uri": resource.uri, 

3513 "name": resource.name, 

3514 "enabled": False, 

3515 "visibility": getattr(resource, "visibility", "public"), 

3516 "team_id": getattr(resource, "team_id", None), 

3517 "owner_email": getattr(resource, "owner_email", None), 

3518 }, 

3519 "timestamp": datetime.now(timezone.utc).isoformat(), 

3520 } 

3521 await self._publish_event(event) 

3522 

3523 async def _event_visible_to_subscriber(self, event: Dict[str, Any], user_email: Optional[str], token_teams: Optional[List[str]]) -> bool: 

3524 """Return whether a resource event is visible to a subscriber context. 

3525 

3526 Args: 

3527 event: Event payload emitted by the resource event stream. 

3528 user_email: Subscriber email. ``None`` only for unrestricted admin context. 

3529 token_teams: Subscriber token team scope. 

3530 

3531 Returns: 

3532 ``True`` when the event is visible to the subscriber, otherwise ``False``. 

3533 """ 

3534 data = event.get("data") if isinstance(event, dict) else None 

3535 if not isinstance(data, dict): 

3536 return False 

3537 

3538 visibility = data.get("visibility") or "public" 

3539 team_id = data.get("team_id") 

3540 owner_email = data.get("owner_email") 

3541 event_resource = SimpleNamespace(visibility=visibility, team_id=team_id, owner_email=owner_email) 

3542 

3543 effective_token_teams = token_teams 

3544 if user_email and effective_token_teams is None: 

3545 # Non-admin scoped flows should pass token_teams explicitly. If not, 

3546 # fail closed to public-only for event filtering. 

3547 effective_token_teams = [] 

3548 

3549 return await self._check_resource_access( 

3550 db=None, # type: ignore[arg-type] 

3551 resource=event_resource, # type: ignore[arg-type] 

3552 user_email=user_email, 

3553 token_teams=effective_token_teams, 

3554 ) 

3555 

3556 async def subscribe_events(self, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> AsyncGenerator[Dict[str, Any], None]: 

3557 """Subscribe to Resource events via the EventService. 

3558 

3559 Args: 

3560 user_email: Requesting user email. ``None`` with ``token_teams=None`` indicates unrestricted admin context. 

3561 token_teams: Token team scope context: 

3562 - ``None`` = unrestricted admin 

3563 - ``[]`` = public-only 

3564 - ``[...]`` = team-scoped access 

3565 

3566 Yields: 

3567 Resource event messages. 

3568 """ 

3569 async for event in self._event_service.subscribe_events(): 

3570 if user_email is None and token_teams is None: 

3571 yield event 

3572 continue 

3573 

3574 if await self._event_visible_to_subscriber(event, user_email, token_teams): 

3575 yield event 

3576 

3577 def _detect_mime_type_from_uri(self, uri: str) -> Optional[str]: 

3578 """Detect MIME type from URI only (no fallback). 

3579 

3580 Args: 

3581 uri: Resource URI 

3582 

3583 Returns: 

3584 Detected MIME type from URI, or None if cannot be determined 

3585 

3586 Examples: 

3587 >>> service = ResourceService() 

3588 >>> service._detect_mime_type_from_uri("https://example.com/file.md") 

3589 'text/markdown' 

3590 >>> service._detect_mime_type_from_uri("https://example.com/unknown") 

3591 

3592 """ 

3593 mime_type, _ = mimetypes.guess_type(uri) 

3594 return mime_type 

3595 

3596 def _detect_mime_type(self, uri: str, content: Union[str, bytes]) -> str: 

3597 """Detect mime type from URI and content with fallback. 

3598 

3599 Priority: 

3600 1. Try to detect from URI extension 

3601 2. Fallback to content-based detection 

3602 

3603 Args: 

3604 uri: Resource URI 

3605 content: Resource content 

3606 

3607 Returns: 

3608 Detected mime type (always returns a value) 

3609 

3610 Examples: 

3611 >>> service = ResourceService() 

3612 >>> service._detect_mime_type("file.txt", "content") 

3613 'text/plain' 

3614 >>> service._detect_mime_type("unknown", "text content") 

3615 'text/plain' 

3616 """ 

3617 # Try from URI first 

3618 mime_type = self._detect_mime_type_from_uri(uri) 

3619 if mime_type: 

3620 return mime_type 

3621 

3622 # Fallback based on content type 

3623 if isinstance(content, str): 

3624 return "text/plain" 

3625 

3626 return "application/octet-stream" 

3627 

3628 async def _read_template_resource(self, db: Session, uri: str, include_inactive: Optional[bool] = False) -> ResourceContent: 

3629 """ 

3630 Read a templated resource. 

3631 

3632 Args: 

3633 db: Database session. 

3634 uri: Template URI with parameters. 

3635 include_inactive: Whether to include inactive resources in DB lookups. 

3636 

3637 Returns: 

3638 ResourceContent: The resolved content from the matching template. 

3639 

3640 Raises: 

3641 ResourceNotFoundError: If no matching template is found. 

3642 ResourceError: For other template resolution errors. 

3643 NotImplementedError: If a binary template resource is encountered. 

3644 """ 

3645 # Find matching template # DRT BREAKPOINT 

3646 template = None 

3647 if not self._template_cache: 

3648 logger.info("_template_cache is empty, fetching exisitng resource templates") 

3649 resource_templates = await self.list_resource_templates(db=db, include_inactive=include_inactive) 

3650 for i in resource_templates: 

3651 self._template_cache[i.name] = i 

3652 for cached in self._template_cache.values(): 

3653 if self._uri_matches_template(uri, cached.uri_template): 

3654 template = cached 

3655 break 

3656 

3657 if template: 

3658 check_inactivity = db.execute(select(DbResource).where(DbResource.id == str(template.id)).where(not_(DbResource.enabled))).scalar_one_or_none() 

3659 if check_inactivity: 

3660 raise ResourceNotFoundError(f"Resource '{template.id}' exists but is inactive") 

3661 else: 

3662 raise ResourceNotFoundError(f"No template matches URI: {uri}") 

3663 

3664 try: 

3665 # Extract parameters 

3666 params = self._extract_template_params(uri, template.uri_template) 

3667 # Generate content 

3668 if template.mime_type and template.mime_type.startswith("text/"): 

3669 content = template.uri_template.format(**params) 

3670 return ResourceContent(type="resource", id=str(template.id) or None, uri=template.uri_template or None, mime_type=template.mime_type or None, text=content) 

3671 # # Handle binary template 

3672 raise NotImplementedError("Binary resource templates not yet supported") 

3673 

3674 except ResourceNotFoundError: 

3675 raise 

3676 except Exception as e: 

3677 raise ResourceError(f"Failed to process template: {str(e)}") from e 

3678 

3679 @staticmethod 

3680 @lru_cache(maxsize=256) 

3681 def _build_regex(template: str) -> re.Pattern: 

3682 """ 

3683 Convert a URI template into a compiled regular expression. 

3684 

3685 This parser supports a subset of RFC 6570–style templates for path 

3686 matching. It extracts path parameters and converts them into named 

3687 regex groups. 

3688 

3689 Supported template features: 

3690 - `{var}` 

3691 A simple path parameter. Matches a single URI segment 

3692 (i.e., any characters except `/`). 

3693 → Translates to `(?P<var>[^/]+)` 

3694 - `{var*}` 

3695 A wildcard parameter. Matches one or more URI segments, 

3696 including `/`. 

3697 → Translates to `(?P<var>.+)` 

3698 - `{?var1,var2}` 

3699 Query-parameter expressions. These are ignored when building 

3700 the regex for path matching and are stripped from the template. 

3701 

3702 Example: 

3703 Template: "files://root/{path*}/meta/{id}{?expand,debug}" 

3704 Regex: r"^files://root/(?P<path>.+)/meta/(?P<id>[^/]+)$" 

3705 

3706 Args: 

3707 template: The URI template string containing parameter expressions. 

3708 

3709 Returns: 

3710 A compiled regular expression (re.Pattern) that can be used to 

3711 match URIs and extract parameter values. 

3712 

3713 Note: 

3714 Results are cached using LRU cache (maxsize=256) to avoid 

3715 recompiling the same template pattern repeatedly. 

3716 """ 

3717 # Remove query parameter syntax for path matching 

3718 template_without_query = re.sub(r"\{\?[^}]+\}", "", template) 

3719 

3720 parts = re.split(r"(\{[^}]+\})", template_without_query) 

3721 pattern = "" 

3722 for part in parts: 

3723 if part.startswith("{") and part.endswith("}"): 

3724 name = part[1:-1] 

3725 if name.endswith("*"): 

3726 name = name[:-1] 

3727 pattern += f"(?P<{name}>.+)" 

3728 else: 

3729 pattern += f"(?P<{name}>[^/]+)" 

3730 else: 

3731 pattern += re.escape(part) 

3732 return re.compile(f"^{pattern}$") 

3733 

3734 @staticmethod 

3735 @lru_cache(maxsize=256) 

3736 def _compile_parse_pattern(template: str) -> parse.Parser: 

3737 """ 

3738 Compile a parse pattern for URI template parameter extraction. 

3739 

3740 Args: 

3741 template: The template pattern (e.g. "file:///{name}/{id}"). 

3742 

3743 Returns: 

3744 Compiled parse.Parser object. 

3745 

3746 Note: 

3747 Results are cached using LRU cache (maxsize=256) to avoid 

3748 recompiling the same template pattern repeatedly. 

3749 """ 

3750 return parse.compile(template) 

3751 

3752 def _extract_template_params(self, uri: str, template: str) -> Dict[str, str]: 

3753 """ 

3754 Extract parameters from a URI based on a template. 

3755 

3756 Args: 

3757 uri: The actual URI containing parameter values. 

3758 template: The template pattern (e.g. "file:///{name}/{id}"). 

3759 

3760 Returns: 

3761 Dict of parameter names and extracted values. 

3762 

3763 Note: 

3764 Uses cached compiled parse patterns for better performance. 

3765 """ 

3766 parser = self._compile_parse_pattern(template) 

3767 result = parser.parse(uri) 

3768 return result.named if result else {} 

3769 

3770 def _uri_matches_template(self, uri: str, template: str) -> bool: 

3771 """ 

3772 Check whether a URI matches a given template pattern. 

3773 

3774 Args: 

3775 uri: The URI to check. 

3776 template: The template pattern. 

3777 

3778 Returns: 

3779 True if the URI matches the template, otherwise False. 

3780 

3781 Note: 

3782 Uses cached compiled regex patterns for better performance. 

3783 """ 

3784 uri_path, _, _ = uri.partition("?") 

3785 regex = self._build_regex(template) 

3786 return bool(regex.match(uri_path)) 

3787 

3788 async def _notify_resource_added(self, resource: DbResource) -> None: 

3789 """ 

3790 Notify subscribers of resource addition. 

3791 

3792 Args: 

3793 resource: Resource to add 

3794 """ 

3795 event = { 

3796 "type": "resource_added", 

3797 "data": { 

3798 "id": resource.id, 

3799 "uri": resource.uri, 

3800 "name": resource.name, 

3801 "description": resource.description, 

3802 "enabled": resource.enabled, 

3803 "visibility": getattr(resource, "visibility", "public"), 

3804 "team_id": getattr(resource, "team_id", None), 

3805 "owner_email": getattr(resource, "owner_email", None), 

3806 }, 

3807 "timestamp": datetime.now(timezone.utc).isoformat(), 

3808 } 

3809 await self._publish_event(event) 

3810 

3811 async def _notify_resource_updated(self, resource: DbResource) -> None: 

3812 """ 

3813 Notify subscribers of resource update. 

3814 

3815 Args: 

3816 resource: Resource to update 

3817 """ 

3818 event = { 

3819 "type": "resource_updated", 

3820 "data": { 

3821 "id": resource.id, 

3822 "uri": resource.uri, 

3823 "enabled": resource.enabled, 

3824 "visibility": getattr(resource, "visibility", "public"), 

3825 "team_id": getattr(resource, "team_id", None), 

3826 "owner_email": getattr(resource, "owner_email", None), 

3827 }, 

3828 "timestamp": datetime.now(timezone.utc).isoformat(), 

3829 } 

3830 await self._publish_event(event) 

3831 

3832 async def _publish_event(self, event: Dict[str, Any]) -> None: 

3833 """ 

3834 Publish event to all subscribers via the EventService. 

3835 

3836 Args: 

3837 event: Event to publish 

3838 """ 

3839 await self._event_service.publish_event(event) 

3840 

3841 # --- Resource templates --- 

3842 async def list_resource_templates( 

3843 self, 

3844 db: Session, 

3845 include_inactive: bool = False, 

3846 user_email: Optional[str] = None, 

3847 token_teams: Optional[List[str]] = None, 

3848 tags: Optional[List[str]] = None, 

3849 visibility: Optional[str] = None, 

3850 server_id: Optional[str] = None, 

3851 ) -> List[ResourceTemplate]: 

3852 """ 

3853 List resource templates with visibility-based access control. 

3854 

3855 Args: 

3856 db: Database session 

3857 include_inactive: Whether to include inactive templates 

3858 user_email: Email of requesting user (for private visibility check) 

3859 token_teams: Teams from JWT. None = admin (no filtering), 

3860 [] = public-only (no owner access), [...] = team-scoped 

3861 tags (Optional[List[str]]): Filter resources by tags. If provided, only resources with at least one matching tag will be returned. 

3862 visibility (Optional[str]): Filter by visibility (private, team, public). 

3863 server_id (Optional[str]): Filter by server ID. If provided, only templates associated with this server will be returned. 

3864 

3865 Returns: 

3866 List of ResourceTemplate objects the user has access to 

3867 

3868 Examples: 

3869 >>> from mcpgateway.services.resource_service import ResourceService 

3870 >>> from unittest.mock import MagicMock, patch 

3871 >>> service = ResourceService() 

3872 >>> db = MagicMock() 

3873 >>> template_obj = MagicMock() 

3874 >>> db.execute.return_value.scalars.return_value.all.return_value = [template_obj] 

3875 >>> with patch('mcpgateway.services.resource_service.ResourceTemplate') as MockResourceTemplate: 

3876 ... MockResourceTemplate.model_validate.return_value = 'resource_template' 

3877 ... import asyncio 

3878 ... result = asyncio.run(service.list_resource_templates(db)) 

3879 ... result == ['resource_template'] 

3880 True 

3881 """ 

3882 with create_span( 

3883 "resource_template.list", 

3884 { 

3885 "include_inactive": include_inactive, 

3886 "server_id": server_id, 

3887 "tags.count": len(tags) if tags else 0, 

3888 "visibility": visibility, 

3889 }, 

3890 ): 

3891 query = select(DbResource).where(DbResource.uri_template.isnot(None)) 

3892 

3893 # Filter by server_id if provided (same pattern as list_server_resources) 

3894 if server_id: 

3895 query = query.join(server_resource_association, DbResource.id == server_resource_association.c.resource_id).where(server_resource_association.c.server_id == server_id) 

3896 

3897 if not include_inactive: 

3898 query = query.where(DbResource.enabled) 

3899 

3900 # Apply visibility filtering when token_teams is set (non-admin access) 

3901 if token_teams is not None: 

3902 # Check if this is a public-only token (empty teams array) 

3903 # Public-only tokens can ONLY see public templates - no owner access 

3904 is_public_only_token = len(token_teams) == 0 

3905 

3906 conditions = [DbResource.visibility == "public"] 

3907 

3908 # Only include owner access for non-public-only tokens with user_email 

3909 if not is_public_only_token and user_email: 

3910 conditions.append(DbResource.owner_email == user_email) 

3911 

3912 if token_teams: 

3913 conditions.append(and_(DbResource.team_id.in_(token_teams), DbResource.visibility.in_(["team", "public"]))) 

3914 

3915 query = query.where(or_(*conditions)) 

3916 

3917 # Cursor-based pagination logic can be implemented here in the future. 

3918 if visibility: 

3919 query = query.where(DbResource.visibility == visibility) 

3920 

3921 if tags: 

3922 query = query.where(json_contains_tag_expr(db, DbResource.tags, tags, match_any=True)) 

3923 

3924 templates = db.execute(query).scalars().all() 

3925 result = [ResourceTemplate.model_validate(t) for t in templates] 

3926 return result 

3927 

3928 # --- Metrics --- 

3929 async def aggregate_metrics(self, db: Session) -> ResourceMetrics: 

3930 """ 

3931 Aggregate metrics for all resource invocations across all resources. 

3932 

3933 Combines recent raw metrics (within retention period) with historical 

3934 hourly rollups for complete historical coverage. Uses in-memory caching 

3935 (10s TTL) to reduce database load under high request rates. 

3936 

3937 Args: 

3938 db: Database session 

3939 

3940 Returns: 

3941 ResourceMetrics: Aggregated metrics from raw + hourly rollup tables. 

3942 

3943 Examples: 

3944 >>> from mcpgateway.services.resource_service import ResourceService 

3945 >>> service = ResourceService() 

3946 >>> # Method exists and is callable 

3947 >>> callable(service.aggregate_metrics) 

3948 True 

3949 """ 

3950 # Check cache first (if enabled) 

3951 # First-Party 

3952 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel 

3953 

3954 if is_cache_enabled(): 

3955 cached = metrics_cache.get("resources") 

3956 if cached is not None: 

3957 return ResourceMetrics(**cached) 

3958 

3959 # Use combined raw + rollup query for full historical coverage 

3960 # First-Party 

3961 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel 

3962 

3963 result = aggregate_metrics_combined(db, "resource") 

3964 

3965 metrics = ResourceMetrics( 

3966 total_executions=result.total_executions, 

3967 successful_executions=result.successful_executions, 

3968 failed_executions=result.failed_executions, 

3969 failure_rate=result.failure_rate, 

3970 min_response_time=result.min_response_time, 

3971 max_response_time=result.max_response_time, 

3972 avg_response_time=result.avg_response_time, 

3973 last_execution_time=result.last_execution_time, 

3974 ) 

3975 

3976 # Cache the result as dict for serialization compatibility (if enabled) 

3977 if is_cache_enabled(): 

3978 metrics_cache.set("resources", metrics.model_dump()) 

3979 

3980 return metrics 

3981 

3982 async def reset_metrics(self, db: Session) -> None: 

3983 """ 

3984 Reset all resource metrics by deleting raw and hourly rollup records. 

3985 

3986 Args: 

3987 db: Database session 

3988 

3989 Examples: 

3990 >>> from mcpgateway.services.resource_service import ResourceService 

3991 >>> from unittest.mock import MagicMock 

3992 >>> service = ResourceService() 

3993 >>> db = MagicMock() 

3994 >>> db.execute = MagicMock() 

3995 >>> db.commit = MagicMock() 

3996 >>> import asyncio 

3997 >>> asyncio.run(service.reset_metrics(db)) 

3998 """ 

3999 db.execute(delete(ResourceMetric)) 

4000 db.execute(delete(ResourceMetricsHourly)) 

4001 db.commit() 

4002 

4003 # Invalidate metrics cache 

4004 # First-Party 

4005 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

4006 

4007 metrics_cache.invalidate("resources") 

4008 metrics_cache.invalidate_prefix("top_resources:") 

4009 

4010 

4011# Lazy singleton - created on first access, not at module import time. 

4012# This avoids instantiation when only exception classes are imported. 

4013_resource_service_instance = None # pylint: disable=invalid-name 

4014 

4015 

4016def __getattr__(name: str): 

4017 """Module-level __getattr__ for lazy singleton creation. 

4018 

4019 Args: 

4020 name: The attribute name being accessed. 

4021 

4022 Returns: 

4023 The resource_service singleton instance if name is "resource_service". 

4024 

4025 Raises: 

4026 AttributeError: If the attribute name is not "resource_service". 

4027 """ 

4028 global _resource_service_instance # pylint: disable=global-statement 

4029 if name == "resource_service": 

4030 if _resource_service_instance is None: 

4031 _resource_service_instance = ResourceService() 

4032 return _resource_service_instance 

4033 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")