Coverage for mcpgateway / services / resource_service.py: 100%

1154 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/resource_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Resource Service Implementation. 

8This module implements resource management according to the MCP specification. 

9It handles: 

10- Resource registration and retrieval 

11- Resource templates and URI handling 

12- Resource subscriptions and updates 

13- Content type management 

14- Active/inactive resource management 

15 

16Examples: 

17 >>> from mcpgateway.services.resource_service import ResourceService, ResourceError 

18 >>> service = ResourceService() 

19 >>> isinstance(service._event_service, EventService) 

20 True 

21""" 

22 

23# Standard 

24import binascii 

25from datetime import datetime, timezone 

26from functools import lru_cache 

27import mimetypes 

28import os 

29import re 

30import ssl 

31import time 

32from typing import Any, AsyncGenerator, Dict, List, Optional, Union 

33import uuid 

34 

35# Third-Party 

36import httpx 

37from mcp import ClientSession 

38from mcp.client.sse import sse_client 

39from mcp.client.streamable_http import streamablehttp_client 

40import parse 

41from pydantic import ValidationError 

42from sqlalchemy import and_, delete, desc, not_, or_, select 

43from sqlalchemy.exc import IntegrityError, OperationalError 

44from sqlalchemy.orm import joinedload, Session 

45 

46# First-Party 

47from mcpgateway.common.models import ResourceContent, ResourceTemplate, TextContent 

48from mcpgateway.common.validators import SecurityValidator 

49from mcpgateway.config import settings 

50from mcpgateway.db import EmailTeam, fresh_db_session 

51from mcpgateway.db import Gateway as DbGateway 

52from mcpgateway.db import get_for_update 

53from mcpgateway.db import Resource as DbResource 

54from mcpgateway.db import ResourceMetric, ResourceMetricsHourly 

55from mcpgateway.db import ResourceSubscription as DbSubscription 

56from mcpgateway.db import server_resource_association 

57from mcpgateway.observability import create_span 

58from mcpgateway.schemas import ResourceCreate, ResourceMetrics, ResourceRead, ResourceSubscription, ResourceUpdate, TopPerformer 

59from mcpgateway.services.audit_trail_service import get_audit_trail_service 

60from mcpgateway.services.event_service import EventService 

61from mcpgateway.services.logging_service import LoggingService 

62from mcpgateway.services.mcp_session_pool import get_mcp_session_pool, TransportType 

63from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge 

64from mcpgateway.services.oauth_manager import OAuthManager 

65from mcpgateway.services.observability_service import current_trace_id, ObservabilityService 

66from mcpgateway.services.structured_logger import get_structured_logger 

67from mcpgateway.utils.metrics_common import build_top_performers 

68from mcpgateway.utils.pagination import unified_paginate 

69from mcpgateway.utils.services_auth import decode_auth 

70from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr 

71from mcpgateway.utils.ssl_context_cache import get_cached_ssl_context 

72from mcpgateway.utils.url_auth import apply_query_param_auth, sanitize_exception_message 

73from mcpgateway.utils.validate_signature import validate_signature 

74 

75# Plugin support imports (conditional) 

76try: 

77 # First-Party 

78 from mcpgateway.plugins.framework import GlobalContext, PluginContextTable, PluginManager, ResourceHookType, ResourcePostFetchPayload, ResourcePreFetchPayload 

79 

80 PLUGINS_AVAILABLE = True 

81except ImportError: 

82 PLUGINS_AVAILABLE = False 

83 

84# Cache import (lazy to avoid circular dependencies) 

85_REGISTRY_CACHE = None 

86 

87 

88def _get_registry_cache(): 

89 """Get registry cache singleton lazily. 

90 

91 Returns: 

92 RegistryCache instance. 

93 """ 

94 global _REGISTRY_CACHE # pylint: disable=global-statement 

95 if _REGISTRY_CACHE is None: 

96 # First-Party 

97 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel 

98 

99 _REGISTRY_CACHE = registry_cache 

100 return _REGISTRY_CACHE 

101 

102 

103# Initialize logging service first 

104logging_service = LoggingService() 

105logger = logging_service.get_logger(__name__) 

106 

107# Initialize structured logger and audit trail for resource operations 

108structured_logger = get_structured_logger("resource_service") 

109audit_trail = get_audit_trail_service() 

110 

111 

112class ResourceError(Exception): 

113 """Base class for resource-related errors.""" 

114 

115 

116class ResourceNotFoundError(ResourceError): 

117 """Raised when a requested resource is not found.""" 

118 

119 

120class ResourceURIConflictError(ResourceError): 

121 """Raised when a resource URI conflicts with existing (active or inactive) resource.""" 

122 

123 def __init__(self, uri: str, enabled: bool = True, resource_id: Optional[int] = None, visibility: str = "public") -> None: 

124 """Initialize the error with resource information. 

125 

126 Args: 

127 uri: The conflicting resource URI 

128 enabled: Whether the existing resource is active 

129 resource_id: ID of the existing resource if available 

130 visibility: Visibility status of the resource 

131 """ 

132 self.uri = uri 

133 self.enabled = enabled 

134 self.resource_id = resource_id 

135 message = f"{visibility.capitalize()} Resource already exists with URI: {uri}" 

136 logger.info(f"ResourceURIConflictError: {message}") 

137 if not enabled: 

138 message += f" (currently inactive, ID: {resource_id})" 

139 super().__init__(message) 

140 

141 

142class ResourceValidationError(ResourceError): 

143 """Raised when resource validation fails.""" 

144 

145 

146class ResourceLockConflictError(ResourceError): 

147 """Raised when a resource row is locked by another transaction. 

148 

149 Raises: 

150 ResourceLockConflictError: When attempting to modify a resource that is 

151 currently locked by another concurrent request. 

152 """ 

153 

154 

155class ResourceService: 

156 """Service for managing resources. 

157 

158 Handles: 

159 - Resource registration and retrieval 

160 - Resource templates and URIs 

161 - Resource subscriptions 

162 - Content type detection 

163 - Active/inactive status management 

164 """ 

165 

166 def __init__(self) -> None: 

167 """Initialize the resource service.""" 

168 self._event_service = EventService(channel_name="mcpgateway:resource_events") 

169 self._template_cache: Dict[str, ResourceTemplate] = {} 

170 self.oauth_manager = OAuthManager(request_timeout=int(os.getenv("OAUTH_REQUEST_TIMEOUT", "30")), max_retries=int(os.getenv("OAUTH_MAX_RETRIES", "3"))) 

171 

172 # Initialize plugin manager if plugins are enabled in settings 

173 self._plugin_manager = None 

174 if PLUGINS_AVAILABLE: 

175 try: 

176 # Support env overrides for testability without reloading settings 

177 env_flag = os.getenv("PLUGINS_ENABLED") 

178 if env_flag is not None: 

179 env_enabled = env_flag.strip().lower() in {"1", "true", "yes", "on"} 

180 plugins_enabled = env_enabled 

181 else: 

182 plugins_enabled = settings.plugins_enabled 

183 

184 config_file = os.getenv("PLUGIN_CONFIG_FILE", settings.plugin_config_file) 

185 

186 if plugins_enabled: 

187 self._plugin_manager = PluginManager(config_file) 

188 logger.info(f"Plugin manager initialized for ResourceService with config: {config_file}") 

189 except Exception as e: 

190 logger.warning(f"Plugin manager initialization failed in ResourceService: {e}") 

191 self._plugin_manager = None 

192 

193 # Initialize mime types 

194 mimetypes.init() 

195 

196 async def initialize(self) -> None: 

197 """Initialize the service.""" 

198 logger.info("Initializing resource service") 

199 await self._event_service.initialize() 

200 

201 async def shutdown(self) -> None: 

202 """Shutdown the service.""" 

203 # Clear subscriptions 

204 await self._event_service.shutdown() 

205 logger.info("Resource service shutdown complete") 

206 

207 async def get_top_resources(self, db: Session, limit: Optional[int] = 5, include_deleted: bool = False) -> List[TopPerformer]: 

208 """Retrieve the top-performing resources based on execution count. 

209 

210 Queries the database to get resources with their metrics, ordered by the number of executions 

211 in descending order. Combines recent raw metrics with historical hourly rollups for complete 

212 historical coverage. Uses the resource URI as the name field for TopPerformer objects. 

213 Returns a list of TopPerformer objects containing resource details and performance metrics. 

214 Results are cached for performance. 

215 

216 Args: 

217 db (Session): Database session for querying resource metrics. 

218 limit (Optional[int]): Maximum number of resources to return. Defaults to 5. 

219 include_deleted (bool): Whether to include deleted resources from rollups. 

220 

221 Returns: 

222 List[TopPerformer]: A list of TopPerformer objects, each containing: 

223 - id: Resource ID. 

224 - name: Resource URI (used as the name field). 

225 - execution_count: Total number of executions. 

226 - avg_response_time: Average response time in seconds, or None if no metrics. 

227 - success_rate: Success rate percentage, or None if no metrics. 

228 - last_execution: Timestamp of the last execution, or None if no metrics. 

229 """ 

230 # Check cache first (if enabled) 

231 # First-Party 

232 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel 

233 

234 effective_limit = limit or 5 

235 cache_key = f"top_resources:{effective_limit}:include_deleted={include_deleted}" 

236 

237 if is_cache_enabled(): 

238 cached = metrics_cache.get(cache_key) 

239 if cached is not None: 

240 return cached 

241 

242 # Use combined query that includes both raw metrics and rollup data 

243 # Use name_column="uri" to maintain backward compatibility (resources show URI as name) 

244 # First-Party 

245 from mcpgateway.services.metrics_query_service import get_top_performers_combined # pylint: disable=import-outside-toplevel 

246 

247 results = get_top_performers_combined( 

248 db=db, 

249 metric_type="resource", 

250 entity_model=DbResource, 

251 limit=effective_limit, 

252 name_column="uri", # Resources use URI as display name 

253 include_deleted=include_deleted, 

254 ) 

255 top_performers = build_top_performers(results) 

256 

257 # Cache the result (if enabled) 

258 if is_cache_enabled(): 

259 metrics_cache.set(cache_key, top_performers) 

260 

261 return top_performers 

262 

263 def convert_resource_to_read(self, resource: DbResource, include_metrics: bool = False) -> ResourceRead: 

264 """ 

265 Converts a DbResource instance into a ResourceRead model, optionally including aggregated metrics. 

266 

267 Args: 

268 resource (DbResource): The ORM instance of the resource. 

269 include_metrics (bool): Whether to include metrics in the result. Defaults to False. 

270 Set to False for list operations to avoid N+1 query issues. 

271 

272 Returns: 

273 ResourceRead: The Pydantic model representing the resource, optionally including aggregated metrics. 

274 

275 Examples: 

276 >>> from types import SimpleNamespace 

277 >>> from datetime import datetime, timezone 

278 >>> svc = ResourceService() 

279 >>> now = datetime.now(timezone.utc) 

280 >>> # Fake metrics 

281 >>> m1 = SimpleNamespace(is_success=True, response_time=0.1, timestamp=now) 

282 >>> m2 = SimpleNamespace(is_success=False, response_time=0.3, timestamp=now) 

283 >>> r = SimpleNamespace( 

284 ... id="ca627760127d409080fdefc309147e08", uri='res://x', name='R', description=None, mime_type='text/plain', size=123, 

285 ... created_at=now, updated_at=now, enabled=True, tags=[{"id": "t", "label": "T"}], metrics=[m1, m2] 

286 ... ) 

287 >>> out = svc.convert_resource_to_read(r, include_metrics=True) 

288 >>> out.metrics.total_executions 

289 2 

290 >>> out.metrics.successful_executions 

291 1 

292 """ 

293 resource_dict = resource.__dict__.copy() 

294 # Remove SQLAlchemy state and any pre-existing 'metrics' attribute 

295 resource_dict.pop("_sa_instance_state", None) 

296 resource_dict.pop("metrics", None) 

297 

298 # Ensure required base fields are present even if SQLAlchemy hasn't loaded them into __dict__ yet 

299 resource_dict["id"] = getattr(resource, "id", resource_dict.get("id")) 

300 resource_dict["uri"] = getattr(resource, "uri", resource_dict.get("uri")) 

301 resource_dict["name"] = getattr(resource, "name", resource_dict.get("name")) 

302 resource_dict["description"] = getattr(resource, "description", resource_dict.get("description")) 

303 resource_dict["mime_type"] = getattr(resource, "mime_type", resource_dict.get("mime_type")) 

304 resource_dict["size"] = getattr(resource, "size", resource_dict.get("size")) 

305 resource_dict["created_at"] = getattr(resource, "created_at", resource_dict.get("created_at")) 

306 resource_dict["updated_at"] = getattr(resource, "updated_at", resource_dict.get("updated_at")) 

307 resource_dict["is_active"] = getattr(resource, "is_active", resource_dict.get("is_active")) 

308 resource_dict["enabled"] = getattr(resource, "enabled", resource_dict.get("enabled")) 

309 

310 # Compute aggregated metrics from the resource's metrics list (only if requested) 

311 if include_metrics: 

312 total = len(resource.metrics) if hasattr(resource, "metrics") and resource.metrics is not None else 0 

313 successful = sum(1 for m in resource.metrics if m.is_success) if total > 0 else 0 

314 failed = sum(1 for m in resource.metrics if not m.is_success) if total > 0 else 0 

315 failure_rate = (failed / total) if total > 0 else 0.0 

316 min_rt = min((m.response_time for m in resource.metrics), default=None) if total > 0 else None 

317 max_rt = max((m.response_time for m in resource.metrics), default=None) if total > 0 else None 

318 avg_rt = (sum(m.response_time for m in resource.metrics) / total) if total > 0 else None 

319 last_time = max((m.timestamp for m in resource.metrics), default=None) if total > 0 else None 

320 

321 resource_dict["metrics"] = { 

322 "total_executions": total, 

323 "successful_executions": successful, 

324 "failed_executions": failed, 

325 "failure_rate": failure_rate, 

326 "min_response_time": min_rt, 

327 "max_response_time": max_rt, 

328 "avg_response_time": avg_rt, 

329 "last_execution_time": last_time, 

330 } 

331 else: 

332 resource_dict["metrics"] = None 

333 

334 raw_tags = resource.tags or [] 

335 normalized_tags = [] 

336 for tag in raw_tags: 

337 if isinstance(tag, str): 

338 normalized_tags.append(tag) 

339 continue 

340 if isinstance(tag, dict): 

341 label = tag.get("label") or tag.get("name") 

342 if label: 

343 normalized_tags.append(label) 

344 continue 

345 label = getattr(tag, "label", None) or getattr(tag, "name", None) 

346 if label: 

347 normalized_tags.append(label) 

348 resource_dict["tags"] = normalized_tags 

349 resource_dict["team"] = getattr(resource, "team", None) 

350 

351 # Include metadata fields for proper API response 

352 resource_dict["created_by"] = getattr(resource, "created_by", None) 

353 resource_dict["modified_by"] = getattr(resource, "modified_by", None) 

354 resource_dict["created_at"] = getattr(resource, "created_at", None) 

355 resource_dict["updated_at"] = getattr(resource, "updated_at", None) 

356 resource_dict["version"] = getattr(resource, "version", None) 

357 return ResourceRead.model_validate(resource_dict) 

358 

359 def _get_team_name(self, db: Session, team_id: Optional[str]) -> Optional[str]: 

360 """Retrieve the team name given a team ID. 

361 

362 Args: 

363 db (Session): Database session for querying teams. 

364 team_id (Optional[str]): The ID of the team. 

365 

366 Returns: 

367 Optional[str]: The name of the team if found, otherwise None. 

368 """ 

369 if not team_id: 

370 return None 

371 team = db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first() 

372 db.commit() # Release transaction to avoid idle-in-transaction 

373 return team.name if team else None 

374 

375 async def register_resource( 

376 self, 

377 db: Session, 

378 resource: ResourceCreate, 

379 created_by: Optional[str] = None, 

380 created_from_ip: Optional[str] = None, 

381 created_via: Optional[str] = None, 

382 created_user_agent: Optional[str] = None, 

383 import_batch_id: Optional[str] = None, 

384 federation_source: Optional[str] = None, 

385 team_id: Optional[str] = None, 

386 owner_email: Optional[str] = None, 

387 visibility: Optional[str] = "public", 

388 ) -> ResourceRead: 

389 """Register a new resource. 

390 

391 Args: 

392 db: Database session 

393 resource: Resource creation schema 

394 created_by: User who created the resource 

395 created_from_ip: IP address of the creator 

396 created_via: Method used to create the resource (e.g., API, UI) 

397 created_user_agent: User agent of the creator 

398 import_batch_id: Optional batch ID for bulk imports 

399 federation_source: Optional source of the resource if federated 

400 team_id (Optional[str]): Team ID to assign the resource to. 

401 owner_email (Optional[str]): Email of the user who owns this resource. 

402 visibility (str): Resource visibility level (private, team, public). 

403 

404 Returns: 

405 Created resource information 

406 

407 Raises: 

408 IntegrityError: If a database integrity error occurs. 

409 ResourceURIConflictError: If a resource with the same URI already exists. 

410 ResourceError: For other resource registration errors 

411 

412 Examples: 

413 >>> from mcpgateway.services.resource_service import ResourceService 

414 >>> from unittest.mock import MagicMock, AsyncMock 

415 >>> from mcpgateway.schemas import ResourceRead 

416 >>> service = ResourceService() 

417 >>> db = MagicMock() 

418 >>> resource = MagicMock() 

419 >>> db.execute.return_value.scalar_one_or_none.return_value = None 

420 >>> db.add = MagicMock() 

421 >>> db.commit = MagicMock() 

422 >>> db.refresh = MagicMock() 

423 >>> service._notify_resource_added = AsyncMock() 

424 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read') 

425 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read') 

426 >>> import asyncio 

427 >>> asyncio.run(service.register_resource(db, resource)) 

428 'resource_read' 

429 """ 

430 try: 

431 logger.info(f"Registering resource: {resource.uri}") 

432 

433 # Extract gateway_id from resource if present 

434 gateway_id = getattr(resource, "gateway_id", None) 

435 

436 # Check for existing server with the same uri 

437 if visibility.lower() == "public": 

438 logger.info(f"visibility:: {visibility}") 

439 # Check for existing public resource with the same uri and gateway_id 

440 existing_resource = db.execute(select(DbResource).where(DbResource.uri == resource.uri, DbResource.visibility == "public", DbResource.gateway_id == gateway_id)).scalar_one_or_none() 

441 if existing_resource: 

442 raise ResourceURIConflictError(resource.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility) 

443 elif visibility.lower() == "team" and team_id: 

444 # Check for existing team resource with the same uri and gateway_id 

445 existing_resource = db.execute( 

446 select(DbResource).where(DbResource.uri == resource.uri, DbResource.visibility == "team", DbResource.team_id == team_id, DbResource.gateway_id == gateway_id) 

447 ).scalar_one_or_none() 

448 if existing_resource: 

449 raise ResourceURIConflictError(resource.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility) 

450 

451 # Detect mime type if not provided 

452 mime_type = resource.mime_type 

453 if not mime_type: 

454 mime_type = self._detect_mime_type(resource.uri, resource.content) 

455 

456 # Determine content storage 

457 is_text = mime_type and mime_type.startswith("text/") or isinstance(resource.content, str) 

458 

459 # Create DB model 

460 db_resource = DbResource( 

461 uri=resource.uri, 

462 name=resource.name, 

463 description=resource.description, 

464 mime_type=mime_type, 

465 uri_template=resource.uri_template, 

466 text_content=resource.content if is_text else None, 

467 binary_content=(resource.content.encode() if is_text and isinstance(resource.content, str) else resource.content if isinstance(resource.content, bytes) else None), 

468 size=len(resource.content) if resource.content else 0, 

469 tags=resource.tags or [], 

470 created_by=created_by, 

471 created_from_ip=created_from_ip, 

472 created_via=created_via, 

473 created_user_agent=created_user_agent, 

474 import_batch_id=import_batch_id, 

475 federation_source=federation_source, 

476 version=1, 

477 # Team scoping fields - use schema values if provided, otherwise fallback to parameters 

478 team_id=getattr(resource, "team_id", None) or team_id, 

479 owner_email=getattr(resource, "owner_email", None) or owner_email or created_by, 

480 # Endpoint visibility parameter takes precedence over schema default 

481 visibility=visibility if visibility is not None else getattr(resource, "visibility", "public"), 

482 gateway_id=gateway_id, 

483 ) 

484 

485 # Add to DB 

486 db.add(db_resource) 

487 db.commit() 

488 db.refresh(db_resource) 

489 

490 # Notify subscribers 

491 await self._notify_resource_added(db_resource) 

492 

493 logger.info(f"Registered resource: {resource.uri}") 

494 

495 # Structured logging: Audit trail for resource creation 

496 audit_trail.log_action( 

497 user_id=created_by or "system", 

498 action="create_resource", 

499 resource_type="resource", 

500 resource_id=str(db_resource.id), 

501 resource_name=db_resource.name, 

502 user_email=owner_email, 

503 team_id=team_id, 

504 client_ip=created_from_ip, 

505 user_agent=created_user_agent, 

506 new_values={ 

507 "uri": db_resource.uri, 

508 "name": db_resource.name, 

509 "visibility": visibility, 

510 "mime_type": db_resource.mime_type, 

511 }, 

512 context={ 

513 "created_via": created_via, 

514 "import_batch_id": import_batch_id, 

515 "federation_source": federation_source, 

516 }, 

517 db=db, 

518 ) 

519 

520 # Structured logging: Log successful resource creation 

521 structured_logger.log( 

522 level="INFO", 

523 message="Resource created successfully", 

524 event_type="resource_created", 

525 component="resource_service", 

526 user_id=created_by, 

527 user_email=owner_email, 

528 team_id=team_id, 

529 resource_type="resource", 

530 resource_id=str(db_resource.id), 

531 custom_fields={ 

532 "resource_uri": db_resource.uri, 

533 "resource_name": db_resource.name, 

534 "visibility": visibility, 

535 }, 

536 db=db, 

537 ) 

538 

539 db_resource.team = self._get_team_name(db, db_resource.team_id) 

540 return self.convert_resource_to_read(db_resource) 

541 except IntegrityError as ie: 

542 logger.error(f"IntegrityErrors in group: {ie}") 

543 

544 # Structured logging: Log database integrity error 

545 structured_logger.log( 

546 level="ERROR", 

547 message="Resource creation failed due to database integrity error", 

548 event_type="resource_creation_failed", 

549 component="resource_service", 

550 user_id=created_by, 

551 user_email=owner_email, 

552 error=ie, 

553 custom_fields={ 

554 "resource_uri": resource.uri, 

555 }, 

556 db=db, 

557 ) 

558 raise ie 

559 except ResourceURIConflictError as rce: 

560 logger.error(f"ResourceURIConflictError in group: {resource.uri}") 

561 

562 # Structured logging: Log URI conflict error 

563 structured_logger.log( 

564 level="WARNING", 

565 message="Resource creation failed due to URI conflict", 

566 event_type="resource_uri_conflict", 

567 component="resource_service", 

568 user_id=created_by, 

569 user_email=owner_email, 

570 custom_fields={ 

571 "resource_uri": resource.uri, 

572 "visibility": visibility, 

573 }, 

574 db=db, 

575 ) 

576 raise rce 

577 except Exception as e: 

578 db.rollback() 

579 

580 # Structured logging: Log generic resource creation failure 

581 structured_logger.log( 

582 level="ERROR", 

583 message="Resource creation failed", 

584 event_type="resource_creation_failed", 

585 component="resource_service", 

586 user_id=created_by, 

587 user_email=owner_email, 

588 error=e, 

589 custom_fields={ 

590 "resource_uri": resource.uri, 

591 }, 

592 db=db, 

593 ) 

594 raise ResourceError(f"Failed to register resource: {str(e)}") 

595 

596 async def register_resources_bulk( 

597 self, 

598 db: Session, 

599 resources: List[ResourceCreate], 

600 created_by: Optional[str] = None, 

601 created_from_ip: Optional[str] = None, 

602 created_via: Optional[str] = None, 

603 created_user_agent: Optional[str] = None, 

604 import_batch_id: Optional[str] = None, 

605 federation_source: Optional[str] = None, 

606 team_id: Optional[str] = None, 

607 owner_email: Optional[str] = None, 

608 visibility: Optional[str] = "public", 

609 conflict_strategy: str = "skip", 

610 ) -> Dict[str, Any]: 

611 """Register multiple resources in bulk with a single commit. 

612 

613 This method provides significant performance improvements over individual 

614 resource registration by: 

615 - Using db.add_all() instead of individual db.add() calls 

616 - Performing a single commit for all resources 

617 - Batch conflict detection 

618 - Chunking for very large imports (>500 items) 

619 

620 Args: 

621 db: Database session 

622 resources: List of resource creation schemas 

623 created_by: Username who created these resources 

624 created_from_ip: IP address of creator 

625 created_via: Creation method (ui, api, import, federation) 

626 created_user_agent: User agent of creation request 

627 import_batch_id: UUID for bulk import operations 

628 federation_source: Source gateway for federated resources 

629 team_id: Team ID to assign the resources to 

630 owner_email: Email of the user who owns these resources 

631 visibility: Resource visibility level (private, team, public) 

632 conflict_strategy: How to handle conflicts (skip, update, rename, fail) 

633 

634 Returns: 

635 Dict with statistics: 

636 - created: Number of resources created 

637 - updated: Number of resources updated 

638 - skipped: Number of resources skipped 

639 - failed: Number of resources that failed 

640 - errors: List of error messages 

641 

642 Raises: 

643 ResourceError: If bulk registration fails critically 

644 

645 Examples: 

646 >>> from mcpgateway.services.resource_service import ResourceService 

647 >>> from unittest.mock import MagicMock 

648 >>> service = ResourceService() 

649 >>> db = MagicMock() 

650 >>> resources = [MagicMock(), MagicMock()] 

651 >>> import asyncio 

652 >>> try: 

653 ... result = asyncio.run(service.register_resources_bulk(db, resources)) 

654 ... except Exception: 

655 ... pass 

656 """ 

657 if not resources: 

658 return {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []} 

659 

660 stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []} 

661 

662 # Process in chunks to avoid memory issues and SQLite parameter limits 

663 chunk_size = 500 

664 

665 for chunk_start in range(0, len(resources), chunk_size): 

666 chunk = resources[chunk_start : chunk_start + chunk_size] 

667 

668 try: 

669 # Batch check for existing resources to detect conflicts 

670 resource_uris = [resource.uri for resource in chunk] 

671 

672 # Build base query conditions 

673 if visibility.lower() == "public": 

674 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "public"] 

675 elif visibility.lower() == "team" and team_id: 

676 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "team", DbResource.team_id == team_id] 

677 else: 

678 # Private resources - check by owner 

679 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "private", DbResource.owner_email == (owner_email or created_by)] 

680 

681 existing_resources_query = select(DbResource).where(*base_conditions) 

682 existing_resources = db.execute(existing_resources_query).scalars().all() 

683 # Use (uri, gateway_id) tuple as key for proper conflict detection with gateway_id scoping 

684 existing_resources_map = {(r.uri, r.gateway_id): r for r in existing_resources} 

685 

686 resources_to_add = [] 

687 resources_to_update = [] 

688 

689 for resource in chunk: 

690 try: 

691 # Use provided parameters or schema values 

692 resource_team_id = team_id if team_id is not None else getattr(resource, "team_id", None) 

693 resource_owner_email = owner_email or getattr(resource, "owner_email", None) or created_by 

694 resource_visibility = visibility if visibility is not None else getattr(resource, "visibility", "public") 

695 resource_gateway_id = getattr(resource, "gateway_id", None) 

696 

697 # Look up existing resource by (uri, gateway_id) tuple 

698 existing_resource = existing_resources_map.get((resource.uri, resource_gateway_id)) 

699 

700 if existing_resource: 

701 # Handle conflict based on strategy 

702 if conflict_strategy == "skip": 

703 stats["skipped"] += 1 

704 continue 

705 if conflict_strategy == "update": 

706 # Update existing resource 

707 existing_resource.name = resource.name 

708 existing_resource.description = resource.description 

709 existing_resource.mime_type = resource.mime_type 

710 existing_resource.size = getattr(resource, "size", None) 

711 existing_resource.uri_template = resource.uri_template 

712 existing_resource.tags = resource.tags or [] 

713 existing_resource.modified_by = created_by 

714 existing_resource.modified_from_ip = created_from_ip 

715 existing_resource.modified_via = created_via 

716 existing_resource.modified_user_agent = created_user_agent 

717 existing_resource.updated_at = datetime.now(timezone.utc) 

718 existing_resource.version = (existing_resource.version or 1) + 1 

719 

720 resources_to_update.append(existing_resource) 

721 stats["updated"] += 1 

722 elif conflict_strategy == "rename": 

723 # Create with renamed resource 

724 new_uri = f"{resource.uri}_imported_{int(datetime.now().timestamp())}" 

725 db_resource = DbResource( 

726 uri=new_uri, 

727 name=resource.name, 

728 description=resource.description, 

729 mime_type=resource.mime_type, 

730 size=getattr(resource, "size", None), 

731 uri_template=resource.uri_template, 

732 gateway_id=getattr(resource, "gateway_id", None), 

733 tags=resource.tags or [], 

734 created_by=created_by, 

735 created_from_ip=created_from_ip, 

736 created_via=created_via, 

737 created_user_agent=created_user_agent, 

738 import_batch_id=import_batch_id, 

739 federation_source=federation_source, 

740 version=1, 

741 team_id=resource_team_id, 

742 owner_email=resource_owner_email, 

743 visibility=resource_visibility, 

744 ) 

745 resources_to_add.append(db_resource) 

746 stats["created"] += 1 

747 elif conflict_strategy == "fail": 

748 stats["failed"] += 1 

749 stats["errors"].append(f"Resource URI conflict: {resource.uri}") 

750 continue 

751 else: 

752 # Create new resource 

753 db_resource = DbResource( 

754 uri=resource.uri, 

755 name=resource.name, 

756 description=resource.description, 

757 mime_type=resource.mime_type, 

758 size=getattr(resource, "size", None), 

759 uri_template=resource.uri_template, 

760 gateway_id=getattr(resource, "gateway_id", None), 

761 tags=resource.tags or [], 

762 created_by=created_by, 

763 created_from_ip=created_from_ip, 

764 created_via=created_via, 

765 created_user_agent=created_user_agent, 

766 import_batch_id=import_batch_id, 

767 federation_source=federation_source, 

768 version=1, 

769 team_id=resource_team_id, 

770 owner_email=resource_owner_email, 

771 visibility=resource_visibility, 

772 ) 

773 resources_to_add.append(db_resource) 

774 stats["created"] += 1 

775 

776 except Exception as e: 

777 stats["failed"] += 1 

778 stats["errors"].append(f"Failed to process resource {resource.uri}: {str(e)}") 

779 logger.warning(f"Failed to process resource {resource.uri} in bulk operation: {str(e)}") 

780 continue 

781 

782 # Bulk add new resources 

783 if resources_to_add: 

784 db.add_all(resources_to_add) 

785 

786 # Commit the chunk 

787 db.commit() 

788 

789 # Refresh resources for notifications and audit trail 

790 for db_resource in resources_to_add: 

791 db.refresh(db_resource) 

792 # Notify subscribers 

793 await self._notify_resource_added(db_resource) 

794 

795 # Log bulk audit trail entry 

796 if resources_to_add or resources_to_update: 

797 audit_trail.log_action( 

798 user_id=created_by or "system", 

799 action="bulk_create_resources" if resources_to_add else "bulk_update_resources", 

800 resource_type="resource", 

801 resource_id=import_batch_id or "bulk_operation", 

802 resource_name=f"Bulk operation: {len(resources_to_add)} created, {len(resources_to_update)} updated", 

803 user_email=owner_email, 

804 team_id=team_id, 

805 client_ip=created_from_ip, 

806 user_agent=created_user_agent, 

807 new_values={ 

808 "resources_created": len(resources_to_add), 

809 "resources_updated": len(resources_to_update), 

810 "visibility": visibility, 

811 }, 

812 context={ 

813 "created_via": created_via, 

814 "import_batch_id": import_batch_id, 

815 "federation_source": federation_source, 

816 "conflict_strategy": conflict_strategy, 

817 }, 

818 db=db, 

819 ) 

820 

821 logger.info(f"Bulk registered {len(resources_to_add)} resources, updated {len(resources_to_update)} resources in chunk") 

822 

823 except Exception as e: 

824 db.rollback() 

825 logger.error(f"Failed to process chunk in bulk resource registration: {str(e)}") 

826 stats["failed"] += len(chunk) 

827 stats["errors"].append(f"Chunk processing failed: {str(e)}") 

828 continue 

829 

830 # Final structured logging 

831 structured_logger.log( 

832 level="INFO", 

833 message="Bulk resource registration completed", 

834 event_type="resources_bulk_created", 

835 component="resource_service", 

836 user_id=created_by, 

837 user_email=owner_email, 

838 team_id=team_id, 

839 resource_type="resource", 

840 custom_fields={ 

841 "resources_created": stats["created"], 

842 "resources_updated": stats["updated"], 

843 "resources_skipped": stats["skipped"], 

844 "resources_failed": stats["failed"], 

845 "total_resources": len(resources), 

846 "visibility": visibility, 

847 "conflict_strategy": conflict_strategy, 

848 }, 

849 db=db, 

850 ) 

851 

852 return stats 

853 

854 async def _check_resource_access( 

855 self, 

856 db: Session, 

857 resource: DbResource, 

858 user_email: Optional[str], 

859 token_teams: Optional[List[str]], 

860 ) -> bool: 

861 """Check if user has access to a resource based on visibility rules. 

862 

863 Implements the same access control logic as list_resources() for consistency. 

864 

865 Args: 

866 db: Database session for team membership lookup if needed. 

867 resource: Resource ORM object with visibility, team_id, owner_email. 

868 user_email: Email of the requesting user (None = unauthenticated). 

869 token_teams: List of team IDs from token. 

870 - None = unrestricted admin access 

871 - [] = public-only token 

872 - [...] = team-scoped token 

873 

874 Returns: 

875 True if access is allowed, False otherwise. 

876 """ 

877 visibility = getattr(resource, "visibility", "public") 

878 resource_team_id = getattr(resource, "team_id", None) 

879 resource_owner_email = getattr(resource, "owner_email", None) 

880 

881 # Public resources are accessible by everyone 

882 if visibility == "public": 

883 return True 

884 

885 # Admin bypass: token_teams=None AND user_email=None means unrestricted admin 

886 # This happens when is_admin=True and no team scoping in token 

887 if token_teams is None and user_email is None: 

888 return True 

889 

890 # No user context (but not admin) = deny access to non-public resources 

891 if not user_email: 

892 return False 

893 

894 # Public-only tokens (empty teams array) can ONLY access public resources 

895 is_public_only_token = token_teams is not None and len(token_teams) == 0 

896 if is_public_only_token: 

897 return False # Already checked public above 

898 

899 # Owner can always access their own resources 

900 if resource_owner_email and resource_owner_email == user_email: 

901 return True 

902 

903 # Team resources: check team membership (matches list_resources behavior) 

904 if resource_team_id: 

905 # Use token_teams if provided, otherwise look up from DB 

906 if token_teams is not None: 

907 team_ids = token_teams 

908 else: 

909 # First-Party 

910 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel 

911 

912 team_service = TeamManagementService(db) 

913 user_teams = await team_service.get_user_teams(user_email) 

914 team_ids = [team.id for team in user_teams] 

915 

916 # Team/public visibility allows access if user is in the team 

917 if visibility in ["team", "public"] and resource_team_id in team_ids: 

918 return True 

919 

920 return False 

921 

922 async def list_resources( 

923 self, 

924 db: Session, 

925 include_inactive: bool = False, 

926 cursor: Optional[str] = None, 

927 tags: Optional[List[str]] = None, 

928 limit: Optional[int] = None, 

929 page: Optional[int] = None, 

930 per_page: Optional[int] = None, 

931 user_email: Optional[str] = None, 

932 team_id: Optional[str] = None, 

933 visibility: Optional[str] = None, 

934 token_teams: Optional[List[str]] = None, 

935 ) -> Union[tuple[List[ResourceRead], Optional[str]], Dict[str, Any]]: 

936 """ 

937 Retrieve a list of registered resources from the database with pagination support. 

938 

939 This method retrieves resources from the database and converts them into a list 

940 of ResourceRead objects. It supports filtering out inactive resources based on the 

941 include_inactive parameter and cursor-based pagination. 

942 

943 Args: 

944 db (Session): The SQLAlchemy database session. 

945 include_inactive (bool): If True, include inactive resources in the result. 

946 Defaults to False. 

947 cursor (Optional[str], optional): An opaque cursor token for pagination. 

948 Opaque base64-encoded string containing last item's ID and created_at. 

949 tags (Optional[List[str]]): Filter resources by tags. If provided, only resources with at least one matching tag will be returned. 

950 limit (Optional[int]): Maximum number of resources to return. Use 0 for all resources (no limit). 

951 If not specified, uses pagination_default_page_size. 

952 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

953 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size. 

954 user_email (Optional[str]): User email for team-based access control. If None, no access control is applied. 

955 team_id (Optional[str]): Filter by specific team ID. Requires user_email for access validation. 

956 visibility (Optional[str]): Filter by visibility (private, team, public). 

957 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API token access 

958 where the token scope should be respected instead of the user's full team memberships. 

959 

960 Returns: 

961 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}} 

962 If cursor is provided or neither: tuple of (list of ResourceRead objects, next_cursor). 

963 

964 Examples: 

965 >>> from mcpgateway.services.resource_service import ResourceService 

966 >>> from unittest.mock import MagicMock 

967 >>> service = ResourceService() 

968 >>> db = MagicMock() 

969 >>> resource_read = MagicMock() 

970 >>> service.convert_resource_to_read = MagicMock(return_value=resource_read) 

971 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()] 

972 >>> import asyncio 

973 >>> resources, next_cursor = asyncio.run(service.list_resources(db)) 

974 >>> isinstance(resources, list) 

975 True 

976 

977 With tags filter: 

978 >>> db2 = MagicMock() 

979 >>> bind = MagicMock() 

980 >>> bind.dialect = MagicMock() 

981 >>> bind.dialect.name = "sqlite" # or "postgresql" / "mysql" 

982 >>> db2.get_bind.return_value = bind 

983 >>> db2.execute.return_value.scalars.return_value.all.return_value = [MagicMock()] 

984 >>> result2, _ = asyncio.run(service.list_resources(db2, tags=['api'])) 

985 >>> isinstance(result2, list) 

986 True 

987 """ 

988 # Check cache for first page only (cursor=None) 

989 # Skip caching when: 

990 # - user_email is provided (team-filtered results are user-specific) 

991 # - token_teams is set (scoped access, e.g., public-only or team-scoped tokens) 

992 # - page-based pagination is used 

993 # This prevents cache poisoning where admin results could leak to public-only requests 

994 cache = _get_registry_cache() 

995 if cursor is None and user_email is None and token_teams is None and page is None: 

996 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None, limit=limit) 

997 cached = await cache.get("resources", filters_hash) 

998 if cached is not None: 

999 # Reconstruct ResourceRead objects from cached dicts 

1000 cached_resources = [ResourceRead.model_validate(r) for r in cached["resources"]] 

1001 return (cached_resources, cached.get("next_cursor")) 

1002 

1003 # Build base query with ordering 

1004 query = select(DbResource).where(DbResource.uri_template.is_(None)).order_by(desc(DbResource.created_at), desc(DbResource.id)) 

1005 

1006 # Apply active/inactive filter 

1007 if not include_inactive: 

1008 query = query.where(DbResource.enabled) 

1009 

1010 # Apply team-based access control if user_email is provided OR token_teams is explicitly set 

1011 # This ensures unauthenticated requests with token_teams=[] only see public resources 

1012 if user_email is not None or token_teams is not None: # empty-string user_email -> public-only filtering (secure default) 

1013 # Use token_teams if provided (for MCP/API token access), otherwise look up from DB 

1014 if token_teams is not None: 

1015 team_ids = token_teams 

1016 elif user_email: 

1017 # First-Party 

1018 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel 

1019 

1020 team_service = TeamManagementService(db) 

1021 user_teams = await team_service.get_user_teams(user_email) 

1022 team_ids = [team.id for team in user_teams] 

1023 else: 

1024 team_ids = [] 

1025 

1026 # Check if this is a public-only token (empty teams array) 

1027 # Public-only tokens can ONLY see public resources - no owner access 

1028 is_public_only_token = token_teams is not None and len(token_teams) == 0 

1029 

1030 if team_id: 

1031 # User requesting specific team - verify access 

1032 if team_id not in team_ids: 

1033 return ([], None) # No access to this team 

1034 

1035 access_conditions = [ 

1036 and_(DbResource.team_id == team_id, DbResource.visibility.in_(["team", "public"])), 

1037 ] 

1038 # Only include owner access for non-public-only tokens with user_email 

1039 if not is_public_only_token and user_email: 

1040 access_conditions.append(and_(DbResource.team_id == team_id, DbResource.owner_email == user_email)) 

1041 query = query.where(or_(*access_conditions)) 

1042 else: 

1043 # General access: public resources + team resources (+ owner resources if not public-only token) 

1044 access_conditions = [ 

1045 DbResource.visibility == "public", 

1046 ] 

1047 # Only include owner access for non-public-only tokens with user_email 

1048 if not is_public_only_token and user_email: 

1049 access_conditions.append(DbResource.owner_email == user_email) 

1050 if team_ids: 

1051 access_conditions.append(and_(DbResource.team_id.in_(team_ids), DbResource.visibility.in_(["team", "public"]))) 

1052 

1053 query = query.where(or_(*access_conditions)) 

1054 

1055 # Apply visibility filter if specified 

1056 if visibility: 

1057 query = query.where(DbResource.visibility == visibility) 

1058 

1059 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats) 

1060 if tags: 

1061 query = query.where(json_contains_tag_expr(db, DbResource.tags, tags, match_any=True)) 

1062 

1063 # Use unified pagination helper - handles both page and cursor pagination 

1064 pag_result = await unified_paginate( 

1065 db=db, 

1066 query=query, 

1067 page=page, 

1068 per_page=per_page, 

1069 cursor=cursor, 

1070 limit=limit, 

1071 base_url="/admin/resources", # Used for page-based links 

1072 query_params={"include_inactive": include_inactive} if include_inactive else {}, 

1073 ) 

1074 

1075 next_cursor = None 

1076 # Extract servers based on pagination type 

1077 if page is not None: 

1078 # Page-based: pag_result is a dict 

1079 resources_db = pag_result["data"] 

1080 else: 

1081 # Cursor-based: pag_result is a tuple 

1082 resources_db, next_cursor = pag_result 

1083 

1084 # Fetch team names for the resources (common for both pagination types) 

1085 team_ids_set = {s.team_id for s in resources_db if s.team_id} 

1086 team_map = {} 

1087 if team_ids_set: 

1088 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(team_ids_set), EmailTeam.is_active.is_(True))).all() 

1089 team_map = {team.id: team.name for team in teams} 

1090 

1091 db.commit() # Release transaction to avoid idle-in-transaction 

1092 

1093 # Convert to ResourceRead (common for both pagination types) 

1094 result = [] 

1095 for s in resources_db: 

1096 try: 

1097 s.team = team_map.get(s.team_id) if s.team_id else None 

1098 result.append(self.convert_resource_to_read(s, include_metrics=False)) 

1099 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

1100 logger.exception(f"Failed to convert resource {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}") 

1101 # Continue with remaining resources instead of failing completely 

1102 # Return appropriate format based on pagination type 

1103 if page is not None: 

1104 # Page-based format 

1105 return { 

1106 "data": result, 

1107 "pagination": pag_result["pagination"], 

1108 "links": pag_result["links"], 

1109 } 

1110 

1111 # Cursor-based format 

1112 

1113 # Cache first page results - only for non-user-specific/non-scoped queries 

1114 # Must match the same conditions as cache lookup to prevent cache poisoning 

1115 if cursor is None and user_email is None and token_teams is None: 

1116 try: 

1117 cache_data = {"resources": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor} 

1118 await cache.set("resources", cache_data, filters_hash) 

1119 except AttributeError: 

1120 pass # Skip caching if result objects don't support model_dump (e.g., in doctests) 

1121 

1122 return (result, next_cursor) 

1123 

1124 async def list_resources_for_user( 

1125 self, db: Session, user_email: str, team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100 

1126 ) -> List[ResourceRead]: 

1127 """ 

1128 DEPRECATED: Use list_resources() with user_email parameter instead. 

1129 

1130 List resources user has access to with team filtering. 

1131 

1132 This method is maintained for backward compatibility but is no longer used. 

1133 New code should call list_resources() with user_email, team_id, and visibility parameters. 

1134 

1135 Args: 

1136 db: Database session 

1137 user_email: Email of the user requesting resources 

1138 team_id: Optional team ID to filter by specific team 

1139 visibility: Optional visibility filter (private, team, public) 

1140 include_inactive: Whether to include inactive resources 

1141 skip: Number of resources to skip for pagination 

1142 limit: Maximum number of resources to return 

1143 

1144 Returns: 

1145 List[ResourceRead]: Resources the user has access to 

1146 

1147 Examples: 

1148 >>> from unittest.mock import MagicMock 

1149 >>> import asyncio 

1150 >>> service = ResourceService() 

1151 >>> db = MagicMock() 

1152 >>> # Patch out TeamManagementService so it doesn't run real logic 

1153 >>> import mcpgateway.services.resource_service as _rs 

1154 >>> class FakeTeamService: 

1155 ... def __init__(self, db): pass 

1156 ... async def get_user_teams(self, email): return [] 

1157 >>> _rs.TeamManagementService = FakeTeamService 

1158 >>> # Force DB to return one fake row with a 'team' attribute 

1159 >>> class FakeResource: 

1160 ... team_id = None 

1161 >>> fake_resource = FakeResource() 

1162 >>> db.execute.return_value.scalars.return_value.all.return_value = [fake_resource] 

1163 >>> service.convert_resource_to_read = MagicMock(return_value="converted") 

1164 >>> asyncio.run(service.list_resources_for_user(db, "user@example.com")) 

1165 ['converted'] 

1166 

1167 Without team_id (default/public access): 

1168 >>> db2 = MagicMock() 

1169 >>> class FakeResource2: 

1170 ... team_id = None 

1171 >>> fake_resource2 = FakeResource2() 

1172 >>> db2.execute.return_value.scalars.return_value.all.return_value = [fake_resource2] 

1173 >>> service.convert_resource_to_read = MagicMock(return_value="converted2") 

1174 >>> out2 = asyncio.run(service.list_resources_for_user(db2, "user@example.com")) 

1175 >>> out2 

1176 ['converted2'] 

1177 """ 

1178 # First-Party 

1179 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel 

1180 

1181 # Build query following existing patterns from list_resources() 

1182 team_service = TeamManagementService(db) 

1183 user_teams = await team_service.get_user_teams(user_email) 

1184 team_ids = [team.id for team in user_teams] 

1185 

1186 # Build query following existing patterns from list_resources() 

1187 query = select(DbResource) 

1188 

1189 # Apply active/inactive filter 

1190 if not include_inactive: 

1191 query = query.where(DbResource.enabled) 

1192 

1193 if team_id: 

1194 if team_id not in team_ids: 

1195 return [] # No access to team 

1196 

1197 access_conditions = [] 

1198 # Filter by specific team 

1199 access_conditions.append(and_(DbResource.team_id == team_id, DbResource.visibility.in_(["team", "public"]))) 

1200 

1201 access_conditions.append(and_(DbResource.team_id == team_id, DbResource.owner_email == user_email)) 

1202 

1203 query = query.where(or_(*access_conditions)) 

1204 else: 

1205 # Get user's accessible teams 

1206 # Build access conditions following existing patterns 

1207 access_conditions = [] 

1208 # 1. User's personal resources (owner_email matches) 

1209 access_conditions.append(DbResource.owner_email == user_email) 

1210 # 2. Team resources where user is member 

1211 if team_ids: 

1212 access_conditions.append(and_(DbResource.team_id.in_(team_ids), DbResource.visibility.in_(["team", "public"]))) 

1213 # 3. Public resources (if visibility allows) 

1214 access_conditions.append(DbResource.visibility == "public") 

1215 

1216 query = query.where(or_(*access_conditions)) 

1217 

1218 # Apply visibility filter if specified 

1219 if visibility: 

1220 query = query.where(DbResource.visibility == visibility) 

1221 

1222 # Apply pagination following existing patterns 

1223 query = query.offset(skip).limit(limit) 

1224 

1225 resources = db.execute(query).scalars().all() 

1226 

1227 # Batch fetch team names to avoid N+1 queries 

1228 resource_team_ids = {r.team_id for r in resources if r.team_id} 

1229 team_map = {} 

1230 if resource_team_ids: 

1231 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(resource_team_ids), EmailTeam.is_active.is_(True))).all() 

1232 team_map = {str(team.id): team.name for team in teams} 

1233 

1234 db.commit() # Release transaction to avoid idle-in-transaction 

1235 

1236 result = [] 

1237 for t in resources: 

1238 try: 

1239 t.team = team_map.get(str(t.team_id)) if t.team_id else None 

1240 result.append(self.convert_resource_to_read(t, include_metrics=False)) 

1241 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

1242 logger.exception(f"Failed to convert resource {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}") 

1243 # Continue with remaining resources instead of failing completely 

1244 return result 

1245 

1246 async def list_server_resources( 

1247 self, 

1248 db: Session, 

1249 server_id: str, 

1250 include_inactive: bool = False, 

1251 user_email: Optional[str] = None, 

1252 token_teams: Optional[List[str]] = None, 

1253 ) -> List[ResourceRead]: 

1254 """ 

1255 Retrieve a list of registered resources from the database. 

1256 

1257 This method retrieves resources from the database and converts them into a list 

1258 of ResourceRead objects. It supports filtering out inactive resources based on the 

1259 include_inactive parameter. The cursor parameter is reserved for future pagination support 

1260 but is currently not implemented. 

1261 

1262 Args: 

1263 db (Session): The SQLAlchemy database session. 

1264 server_id (str): Server ID 

1265 include_inactive (bool): If True, include inactive resources in the result. 

1266 Defaults to False. 

1267 user_email (Optional[str]): User email for visibility filtering. If None, no filtering applied. 

1268 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API 

1269 token access where the token scope should be respected. 

1270 

1271 Returns: 

1272 List[ResourceRead]: A list of resources represented as ResourceRead objects. 

1273 

1274 Examples: 

1275 >>> from mcpgateway.services.resource_service import ResourceService 

1276 >>> from unittest.mock import MagicMock 

1277 >>> service = ResourceService() 

1278 >>> db = MagicMock() 

1279 >>> resource_read = MagicMock() 

1280 >>> service.convert_resource_to_read = MagicMock(return_value=resource_read) 

1281 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()] 

1282 >>> import asyncio 

1283 >>> result = asyncio.run(service.list_server_resources(db, 'server1')) 

1284 >>> isinstance(result, list) 

1285 True 

1286 >>> # Include inactive branch 

1287 >>> result = asyncio.run(service.list_server_resources(db, 'server1', include_inactive=True)) 

1288 >>> isinstance(result, list) 

1289 True 

1290 """ 

1291 logger.debug(f"Listing resources for server_id: {server_id}, include_inactive: {include_inactive}") 

1292 query = ( 

1293 select(DbResource) 

1294 .join(server_resource_association, DbResource.id == server_resource_association.c.resource_id) 

1295 .where(DbResource.uri_template.is_(None)) 

1296 .where(server_resource_association.c.server_id == server_id) 

1297 ) 

1298 if not include_inactive: 

1299 query = query.where(DbResource.enabled) 

1300 

1301 # Add visibility filtering if user context OR token_teams provided 

1302 # This ensures unauthenticated requests with token_teams=[] only see public resources 

1303 if user_email is not None or token_teams is not None: # empty-string user_email -> public-only filtering (secure default) 

1304 # Use token_teams if provided (for MCP/API token access), otherwise look up from DB 

1305 if token_teams is not None: 

1306 team_ids = token_teams 

1307 elif user_email: 

1308 # First-Party 

1309 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel 

1310 

1311 team_service = TeamManagementService(db) 

1312 user_teams = await team_service.get_user_teams(user_email) 

1313 team_ids = [team.id for team in user_teams] 

1314 else: 

1315 team_ids = [] 

1316 

1317 # Check if this is a public-only token (empty teams array) 

1318 # Public-only tokens can ONLY see public resources - no owner access 

1319 is_public_only_token = token_teams is not None and len(token_teams) == 0 

1320 

1321 access_conditions = [ 

1322 DbResource.visibility == "public", 

1323 ] 

1324 # Only include owner access for non-public-only tokens with user_email 

1325 if not is_public_only_token and user_email: 

1326 access_conditions.append(DbResource.owner_email == user_email) 

1327 if team_ids: 

1328 access_conditions.append(and_(DbResource.team_id.in_(team_ids), DbResource.visibility.in_(["team", "public"]))) 

1329 query = query.where(or_(*access_conditions)) 

1330 

1331 # Cursor-based pagination logic can be implemented here in the future. 

1332 resources = db.execute(query).scalars().all() 

1333 

1334 # Batch fetch team names to avoid N+1 queries 

1335 resource_team_ids = {r.team_id for r in resources if r.team_id} 

1336 team_map = {} 

1337 if resource_team_ids: 

1338 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(resource_team_ids), EmailTeam.is_active.is_(True))).all() 

1339 team_map = {str(team.id): team.name for team in teams} 

1340 

1341 db.commit() # Release transaction to avoid idle-in-transaction 

1342 

1343 result = [] 

1344 for t in resources: 

1345 try: 

1346 t.team = team_map.get(str(t.team_id)) if t.team_id else None 

1347 result.append(self.convert_resource_to_read(t, include_metrics=False)) 

1348 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

1349 logger.exception(f"Failed to convert resource {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}") 

1350 # Continue with remaining resources instead of failing completely 

1351 return result 

1352 

1353 async def _record_resource_metric(self, db: Session, resource: DbResource, start_time: float, success: bool, error_message: Optional[str]) -> None: 

1354 """ 

1355 Records a metric for a resource access. 

1356 

1357 Args: 

1358 db: Database session 

1359 resource: The resource that was accessed 

1360 start_time: Monotonic start time of the access 

1361 success: True if successful, False otherwise 

1362 error_message: Error message if failed, None otherwise 

1363 """ 

1364 end_time = time.monotonic() 

1365 response_time = end_time - start_time 

1366 

1367 metric = ResourceMetric( 

1368 resource_id=resource.id, 

1369 response_time=response_time, 

1370 is_success=success, 

1371 error_message=error_message, 

1372 ) 

1373 db.add(metric) 

1374 db.commit() 

1375 

1376 async def _record_invoke_resource_metric(self, db: Session, resource_id: str, start_time: float, success: bool, error_message: Optional[str]) -> None: 

1377 """ 

1378 Records a metric for invoking resource. 

1379 

1380 Args: 

1381 db: Database Session 

1382 resource_id: unique identifier to access & invoke resource 

1383 start_time: Monotonic start time of the access 

1384 success: True if successful, False otherwise 

1385 error_message: Error message if failed, None otherwise 

1386 """ 

1387 end_time = time.monotonic() 

1388 response_time = end_time - start_time 

1389 

1390 metric = ResourceMetric( 

1391 resource_id=resource_id, 

1392 response_time=response_time, 

1393 is_success=success, 

1394 error_message=error_message, 

1395 ) 

1396 db.add(metric) 

1397 db.commit() 

1398 

1399 def create_ssl_context(self, ca_certificate: str) -> ssl.SSLContext: 

1400 """Create an SSL context with the provided CA certificate. 

1401 

1402 Uses caching to avoid repeated SSL context creation for the same certificate. 

1403 

1404 Args: 

1405 ca_certificate: CA certificate in PEM format 

1406 

1407 Returns: 

1408 ssl.SSLContext: Configured SSL context 

1409 """ 

1410 return get_cached_ssl_context(ca_certificate) 

1411 

1412 async def invoke_resource( # pylint: disable=unused-argument 

1413 self, 

1414 db: Session, 

1415 resource_id: str, 

1416 resource_uri: str, 

1417 resource_template_uri: Optional[str] = None, 

1418 user_identity: Optional[Union[str, Dict[str, Any]]] = None, 

1419 meta_data: Optional[Dict[str, Any]] = None, # Reserved for future MCP SDK support 

1420 resource_obj: Optional[Any] = None, 

1421 gateway_obj: Optional[Any] = None, 

1422 ) -> Any: 

1423 """ 

1424 Invoke a resource via its configured gateway using SSE or StreamableHTTP transport. 

1425 

1426 This method determines the correct URI to invoke, loads the associated resource 

1427 and gateway from the database, validates certificates if applicable, prepares 

1428 authentication headers (OAuth, header-based, or none), and then connects to 

1429 the gateway to read the resource using the appropriate transport. 

1430 

1431 The function supports: 

1432 - CA certificate validation / SSL context creation 

1433 - OAuth client-credentials and authorization-code flow 

1434 - Header-based auth 

1435 - SSE transport gateways 

1436 - StreamableHTTP transport gateways 

1437 

1438 Args: 

1439 db (Session): 

1440 SQLAlchemy session for retrieving resource and gateway information. 

1441 resource_id (str): 

1442 ID of the resource to invoke. 

1443 resource_uri (str): 

1444 Direct resource URI configured for the resource. 

1445 resource_template_uri (Optional[str]): 

1446 URI from the template. Overrides `resource_uri` when provided. 

1447 user_identity (Optional[Union[str, Dict[str, Any]]]): 

1448 Identity of the user making the request, used for session pool isolation. 

1449 Can be a string (email) or a dict with an 'email' key. 

1450 Defaults to "anonymous" for pool isolation if not provided. 

1451 OAuth token lookup always uses platform_admin_email (service account). 

1452 meta_data (Optional[Dict[str, Any]]): 

1453 Additional metadata to pass to the gateway during invocation. 

1454 resource_obj (Optional[Any]): 

1455 Pre-fetched DbResource object to skip the internal resource lookup query. 

1456 gateway_obj (Optional[Any]): 

1457 Pre-fetched DbGateway object to skip the internal gateway lookup query. 

1458 

1459 Returns: 

1460 Any: The text content returned by the remote resource, or ``None`` if the 

1461 gateway could not be contacted or an error occurred. 

1462 

1463 Raises: 

1464 Exception: Any unhandled internal errors (e.g., DB issues). 

1465 

1466 --- 

1467 Doctest Examples 

1468 ---------------- 

1469 

1470 >>> class FakeDB: 

1471 ... "Simple DB stub returning fake resource and gateway rows." 

1472 ... def execute(self, query): 

1473 ... class Result: 

1474 ... def scalar_one_or_none(self): 

1475 ... # Return fake objects with the needed attributes 

1476 ... class FakeResource: 

1477 ... id = "res123" 

1478 ... name = "Demo Resource" 

1479 ... gateway_id = "gw1" 

1480 ... return FakeResource() 

1481 ... return Result() 

1482 

1483 >>> class FakeGateway: 

1484 ... id = "gw1" 

1485 ... name = "Fake Gateway" 

1486 ... url = "https://fake.gateway" 

1487 ... ca_certificate = None 

1488 ... ca_certificate_sig = None 

1489 ... transport = "sse" 

1490 ... auth_type = None 

1491 ... auth_value = {} 

1492 

1493 >>> # Monkeypatch the DB lookup for gateway 

1494 >>> def fake_execute_gateway(self, query): 

1495 ... class Result: 

1496 ... def scalar_one_or_none(self_inner): 

1497 ... return FakeGateway() 

1498 ... return Result() 

1499 

1500 >>> FakeDB.execute_gateway = fake_execute_gateway 

1501 

1502 >>> class FakeService: 

1503 ... "Service stub replacing network calls with predictable outputs." 

1504 ... async def invoke_resource(self, db, resource_id, resource_uri, resource_template_uri=None): 

1505 ... # Represent the behavior of a successful SSE response. 

1506 ... return "hello from gateway" 

1507 

1508 >>> svc = FakeService() 

1509 >>> import asyncio 

1510 >>> asyncio.run(svc.invoke_resource(FakeDB(), "res123", "/test")) 

1511 'hello from gateway' 

1512 

1513 --- 

1514 Example: Template URI overrides resource URI 

1515 -------------------------------------------- 

1516 

1517 >>> class FakeService2(FakeService): 

1518 ... async def invoke_resource(self, db, resource_id, resource_uri, resource_template_uri=None): 

1519 ... if resource_template_uri: 

1520 ... return f"using template: {resource_template_uri}" 

1521 ... return f"using direct: {resource_uri}" 

1522 

1523 >>> svc2 = FakeService2() 

1524 >>> asyncio.run(svc2.invoke_resource(FakeDB(), "res123", "/direct", "/template")) 

1525 'using template: /template' 

1526 

1527 """ 

1528 uri = None 

1529 if resource_uri and resource_template_uri: 

1530 uri = resource_template_uri 

1531 elif resource_uri: 

1532 uri = resource_uri 

1533 

1534 logger.info(f"Invoking the resource: {uri}") 

1535 gateway_id = None 

1536 # Use pre-fetched resource if provided (avoids Q5 re-fetch) 

1537 resource_info = resource_obj 

1538 if resource_info is None: 

1539 resource_info = db.execute(select(DbResource).where(DbResource.id == resource_id)).scalar_one_or_none() 

1540 

1541 # Release transaction immediately after resource lookup to prevent idle-in-transaction 

1542 # This is especially important when resource isn't found - we don't want to hold the transaction 

1543 db.commit() 

1544 

1545 # Normalize user_identity to string for session pool isolation 

1546 # Use authenticated user for pool isolation, but keep platform_admin for OAuth token lookup 

1547 if isinstance(user_identity, dict): 

1548 pool_user_identity = user_identity.get("email") or "anonymous" 

1549 elif isinstance(user_identity, str): 

1550 pool_user_identity = user_identity 

1551 else: 

1552 pool_user_identity = "anonymous" 

1553 

1554 # OAuth token lookup uses platform admin (service account) - not changed 

1555 oauth_user_email = settings.platform_admin_email 

1556 

1557 if resource_info: 

1558 gateway_id = getattr(resource_info, "gateway_id", None) 

1559 resource_name = getattr(resource_info, "name", None) 

1560 # Use pre-fetched gateway if provided (avoids Q6 re-fetch) 

1561 gateway = gateway_obj 

1562 if gateway is None and gateway_id: 

1563 gateway = db.execute(select(DbGateway).where(DbGateway.id == gateway_id)).scalar_one_or_none() 

1564 

1565 # ═══════════════════════════════════════════════════════════════════════════ 

1566 # CRITICAL: Release DB transaction immediately after fetching resource/gateway 

1567 # This ensures the transaction doesn't stay open if there's no gateway 

1568 # or if the function returns early for any reason. 

1569 # ═══════════════════════════════════════════════════════════════════════════ 

1570 db.commit() 

1571 

1572 if gateway: 

1573 

1574 start_time = time.monotonic() 

1575 success = False 

1576 error_message = None 

1577 

1578 # Create database span for observability dashboard 

1579 trace_id = current_trace_id.get() 

1580 db_span_id = None 

1581 db_span_ended = False 

1582 observability_service = ObservabilityService() if trace_id else None 

1583 

1584 if trace_id and observability_service: 

1585 try: 

1586 db_span_id = observability_service.start_span( 

1587 db=db, 

1588 trace_id=trace_id, 

1589 name="invoke.resource", 

1590 attributes={ 

1591 "resource.name": resource_name if resource_name else "unknown", 

1592 "resource.id": str(resource_id) if resource_id else "unknown", 

1593 "resource.uri": str(uri) or "unknown", 

1594 "gateway.transport": getattr(gateway, "transport") or "uknown", 

1595 "gateway.url": getattr(gateway, "url") or "unknown", 

1596 }, 

1597 ) 

1598 logger.debug(f"✓ Created resource.read span: {db_span_id} for resource: {resource_id} & {uri}") 

1599 except Exception as e: 

1600 logger.warning(f"Failed to start the observability span for invoking resource: {e}") 

1601 db_span_id = None 

1602 

1603 with create_span( 

1604 "invoke.resource", 

1605 { 

1606 "resource.name": resource_name if resource_name else "unknown", 

1607 "resource.id": str(resource_id) if resource_id else "unknown", 

1608 "resource.uri": str(uri) or "unknown", 

1609 "gateway.transport": getattr(gateway, "transport") or "uknown", 

1610 "gateway.url": getattr(gateway, "url") or "unknown", 

1611 }, 

1612 ) as span: 

1613 valid = False 

1614 if gateway.ca_certificate: 

1615 if settings.enable_ed25519_signing: 

1616 public_key_pem = settings.ed25519_public_key 

1617 valid = validate_signature(gateway.ca_certificate.encode(), gateway.ca_certificate_sig, public_key_pem) 

1618 else: 

1619 valid = True 

1620 

1621 if valid: 

1622 ssl_context = self.create_ssl_context(gateway.ca_certificate) 

1623 else: 

1624 ssl_context = None 

1625 

1626 def _get_httpx_client_factory( 

1627 headers: dict[str, str] | None = None, 

1628 timeout: httpx.Timeout | None = None, 

1629 auth: httpx.Auth | None = None, 

1630 ) -> httpx.AsyncClient: 

1631 """Factory function to create httpx.AsyncClient with optional CA certificate. 

1632 

1633 Args: 

1634 headers: Optional headers for the client 

1635 timeout: Optional timeout for the client 

1636 auth: Optional auth for the client 

1637 

1638 Returns: 

1639 httpx.AsyncClient: Configured HTTPX async client 

1640 """ 

1641 # First-Party 

1642 from mcpgateway.services.http_client_service import get_default_verify, get_http_timeout # pylint: disable=import-outside-toplevel 

1643 

1644 return httpx.AsyncClient( 

1645 verify=ssl_context if ssl_context else get_default_verify(), # pylint: disable=cell-var-from-loop 

1646 follow_redirects=True, 

1647 headers=headers, 

1648 timeout=timeout if timeout else get_http_timeout(), 

1649 auth=auth, 

1650 limits=httpx.Limits( 

1651 max_connections=settings.httpx_max_connections, 

1652 max_keepalive_connections=settings.httpx_max_keepalive_connections, 

1653 keepalive_expiry=settings.httpx_keepalive_expiry, 

1654 ), 

1655 ) 

1656 

1657 try: 

1658 # ═══════════════════════════════════════════════════════════════════════════ 

1659 # Extract gateway data to local variables BEFORE OAuth handling 

1660 # OAuth client_credentials flow makes network calls, so we must release 

1661 # the DB transaction first to prevent idle-in-transaction during network I/O 

1662 # ═══════════════════════════════════════════════════════════════════════════ 

1663 gateway_url = gateway.url 

1664 gateway_transport = gateway.transport 

1665 gateway_auth_type = gateway.auth_type 

1666 gateway_auth_value = gateway.auth_value 

1667 gateway_oauth_config = gateway.oauth_config 

1668 gateway_name = gateway.name 

1669 gateway_auth_query_params = getattr(gateway, "auth_query_params", None) 

1670 

1671 # Apply query param auth to URL if applicable 

1672 auth_query_params_decrypted: Optional[Dict[str, str]] = None 

1673 if gateway_auth_type == "query_param" and gateway_auth_query_params: 

1674 auth_query_params_decrypted = {} 

1675 for param_key, encrypted_value in gateway_auth_query_params.items(): 

1676 if encrypted_value: 

1677 try: 

1678 decrypted = decode_auth(encrypted_value) 

1679 auth_query_params_decrypted[param_key] = decrypted.get(param_key, "") 

1680 except Exception: # noqa: S110 - intentionally skip failed decryptions 

1681 # Silently skip params that fail decryption (corrupted or old key) 

1682 logger.debug(f"Failed to decrypt query param '{param_key}' for resource") 

1683 if auth_query_params_decrypted: 

1684 gateway_url = apply_query_param_auth(gateway_url, auth_query_params_decrypted) 

1685 

1686 # ═══════════════════════════════════════════════════════════════════════════ 

1687 # CRITICAL: Release DB connection back to pool BEFORE making HTTP/OAuth calls 

1688 # This prevents connection pool exhaustion during slow upstream requests. 

1689 # OAuth client_credentials flow makes network calls to token endpoints. 

1690 # All needed data has been extracted to local variables above. 

1691 # ═══════════════════════════════════════════════════════════════════════════ 

1692 db.commit() # End read-only transaction cleanly 

1693 db.close() 

1694 

1695 # Handle different authentication types (AFTER DB release) 

1696 headers = {} 

1697 if gateway_auth_type == "oauth" and gateway_oauth_config: 

1698 grant_type = gateway_oauth_config.get("grant_type", "client_credentials") 

1699 

1700 if grant_type == "authorization_code": 

1701 # For Authorization Code flow, try to get stored tokens 

1702 try: 

1703 # First-Party 

1704 from mcpgateway.services.token_storage_service import TokenStorageService # pylint: disable=import-outside-toplevel 

1705 

1706 # Use fresh DB session for token lookup (original db was closed) 

1707 with fresh_db_session() as token_db: 

1708 token_storage = TokenStorageService(token_db) 

1709 access_token: str = await token_storage.get_user_token(gateway_id, oauth_user_email) 

1710 

1711 if access_token: 

1712 headers["Authorization"] = f"Bearer {access_token}" 

1713 else: 

1714 if span: 

1715 span.set_attribute("health.status", "unhealthy") 

1716 span.set_attribute("error.message", "No valid OAuth token for user") 

1717 

1718 except Exception as e: 

1719 logger.error(f"Failed to obtain stored OAuth token for gateway {gateway_name}: {e}") 

1720 if span: 

1721 span.set_attribute("health.status", "unhealthy") 

1722 span.set_attribute("error.message", "Failed to obtain stored OAuth token") 

1723 else: 

1724 # For Client Credentials flow, get token directly (makes network calls) 

1725 try: 

1726 access_token: str = await self.oauth_manager.get_access_token(gateway_oauth_config) 

1727 headers["Authorization"] = f"Bearer {access_token}" 

1728 except Exception as e: 

1729 if span: 

1730 span.set_attribute("health.status", "unhealthy") 

1731 span.set_attribute("error.message", str(e)) 

1732 else: 

1733 # Handle non-OAuth authentication (existing logic) 

1734 auth_data = gateway_auth_value or {} 

1735 if isinstance(auth_data, str): 

1736 headers = decode_auth(auth_data) 

1737 elif isinstance(auth_data, dict): 

1738 headers = {str(k): str(v) for k, v in auth_data.items()} 

1739 else: 

1740 headers = {} 

1741 

1742 async def connect_to_sse_session(server_url: str, uri: str, authentication: Optional[Dict[str, str]] = None) -> str | None: 

1743 """ 

1744 Connect to an SSE-based gateway and retrieve the text content of a resource. 

1745 

1746 This helper establishes an SSE (Server-Sent Events) session with the remote 

1747 gateway, initializes a `ClientSession`, invokes `read_resource()` for the 

1748 given URI, and returns the textual content from the first item in the 

1749 response's `contents` list. 

1750 

1751 If any error occurs (network failure, unexpected response format, session 

1752 initialization failure, etc.), the method logs the exception and returns 

1753 ``None`` instead of raising. 

1754 

1755 Note: 

1756 MCP SDK 1.25.0 read_resource() does not support meta parameter. 

1757 When the SDK adds support, meta_data can be added back here. 

1758 

1759 Args: 

1760 server_url (str): 

1761 The base URL of the SSE gateway to connect to. 

1762 uri (str): 

1763 The resource URI that should be requested from the gateway. 

1764 authentication (Optional[Dict[str, str]]): 

1765 Optional dictionary of headers (e.g., OAuth Bearer tokens) to 

1766 include in the SSE connection request. Defaults to an empty 

1767 dictionary when not provided. 

1768 

1769 Returns: 

1770 str | None: 

1771 The text content returned by the remote resource, or ``None`` if the 

1772 SSE connection fails or the response is invalid. 

1773 

1774 Notes: 

1775 - This function assumes the SSE client context manager yields: 

1776 ``(read_stream, write_stream, get_session_id)``. 

1777 - The expected response object from `session.read_resource()` must have a 

1778 `contents` attribute containing a list, where the first element has a 

1779 `text` attribute. 

1780 """ 

1781 if authentication is None: 

1782 authentication = {} 

1783 try: 

1784 # Use session pool if enabled for 10-20x latency improvement 

1785 use_pool = False 

1786 pool = None 

1787 if settings.mcp_session_pool_enabled: 

1788 try: 

1789 pool = get_mcp_session_pool() 

1790 use_pool = True 

1791 except RuntimeError: 

1792 # Pool not initialized (e.g., in tests), fall back to per-call sessions 

1793 pass 

1794 

1795 if use_pool and pool is not None: 

1796 async with pool.session( 

1797 url=server_url, 

1798 headers=authentication, 

1799 transport_type=TransportType.SSE, 

1800 httpx_client_factory=_get_httpx_client_factory, 

1801 user_identity=pool_user_identity, 

1802 gateway_id=gateway_id, 

1803 ) as pooled: 

1804 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter 

1805 resource_response = await pooled.session.read_resource(uri=uri) 

1806 return getattr(getattr(resource_response, "contents")[0], "text") 

1807 else: 

1808 # Fallback to per-call sessions when pool disabled or not initialized 

1809 async with sse_client(url=server_url, headers=authentication, timeout=settings.health_check_timeout, httpx_client_factory=_get_httpx_client_factory) as ( 

1810 read_stream, 

1811 write_stream, 

1812 _get_session_id, 

1813 ): 

1814 async with ClientSession(read_stream, write_stream) as session: 

1815 _ = await session.initialize() 

1816 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter 

1817 resource_response = await session.read_resource(uri=uri) 

1818 return getattr(getattr(resource_response, "contents")[0], "text") 

1819 except Exception as e: 

1820 # Sanitize error message to prevent URL secrets from leaking in logs 

1821 sanitized_error = sanitize_exception_message(str(e), auth_query_params_decrypted) 

1822 logger.debug(f"Exception while connecting to sse gateway: {sanitized_error}") 

1823 return None 

1824 

1825 async def connect_to_streamablehttp_server(server_url: str, uri: str, authentication: Optional[Dict[str, str]] = None) -> str | None: 

1826 """ 

1827 Connect to a StreamableHTTP gateway and retrieve the text content of a resource. 

1828 

1829 This helper establishes a StreamableHTTP client session with the specified 

1830 gateway, initializes a `ClientSession`, invokes `read_resource()` for the 

1831 given URI, and returns the textual content from the first element in the 

1832 response's `contents` list. 

1833 

1834 If any exception occurs during connection, session initialization, or 

1835 resource reading, the function logs the error and returns ``None`` instead 

1836 of propagating the exception. 

1837 

1838 Note: 

1839 MCP SDK 1.25.0 read_resource() does not support meta parameter. 

1840 When the SDK adds support, meta_data can be added back here. 

1841 

1842 Args: 

1843 server_url (str): 

1844 The endpoint URL of the StreamableHTTP gateway. 

1845 uri (str): 

1846 The resource URI to request from the gateway. 

1847 authentication (Optional[Dict[str, str]]): 

1848 Optional dictionary of authentication headers (e.g., API keys or 

1849 Bearer tokens). Defaults to an empty dictionary when not provided. 

1850 

1851 Returns: 

1852 str | None: 

1853 The text content returned by the StreamableHTTP resource, or ``None`` 

1854 if the connection fails or the response format is invalid. 

1855 

1856 Notes: 

1857 - The `streamablehttp_client` context manager must yield a tuple: 

1858 ``(read_stream, write_stream, get_session_id)``. 

1859 - The expected `resource_response` returned by ``session.read_resource()`` 

1860 must contain a `contents` list, whose first element exposes a `text` 

1861 attribute. 

1862 """ 

1863 if authentication is None: 

1864 authentication = {} 

1865 try: 

1866 # Use session pool if enabled for 10-20x latency improvement 

1867 use_pool = False 

1868 pool = None 

1869 if settings.mcp_session_pool_enabled: 

1870 try: 

1871 pool = get_mcp_session_pool() 

1872 use_pool = True 

1873 except RuntimeError: 

1874 # Pool not initialized (e.g., in tests), fall back to per-call sessions 

1875 pass 

1876 

1877 if use_pool and pool is not None: 

1878 async with pool.session( 

1879 url=server_url, 

1880 headers=authentication, 

1881 transport_type=TransportType.STREAMABLE_HTTP, 

1882 httpx_client_factory=_get_httpx_client_factory, 

1883 user_identity=pool_user_identity, 

1884 gateway_id=gateway_id, 

1885 ) as pooled: 

1886 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter 

1887 resource_response = await pooled.session.read_resource(uri=uri) 

1888 return getattr(getattr(resource_response, "contents")[0], "text") 

1889 else: 

1890 # Fallback to per-call sessions when pool disabled or not initialized 

1891 async with streamablehttp_client(url=server_url, headers=authentication, timeout=settings.health_check_timeout, httpx_client_factory=_get_httpx_client_factory) as ( 

1892 read_stream, 

1893 write_stream, 

1894 _get_session_id, 

1895 ): 

1896 async with ClientSession(read_stream, write_stream) as session: 

1897 _ = await session.initialize() 

1898 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter 

1899 resource_response = await session.read_resource(uri=uri) 

1900 return getattr(getattr(resource_response, "contents")[0], "text") 

1901 except Exception as e: 

1902 # Sanitize error message to prevent URL secrets from leaking in logs 

1903 sanitized_error = sanitize_exception_message(str(e), auth_query_params_decrypted) 

1904 logger.debug(f"Exception while connecting to streamablehttp gateway: {sanitized_error}") 

1905 return None 

1906 

1907 if span: 

1908 span.set_attribute("success", True) 

1909 span.set_attribute("duration.ms", (time.monotonic() - start_time) * 1000) 

1910 

1911 resource_text = "" 

1912 if (gateway_transport).lower() == "sse": 

1913 # Note: meta_data not passed - MCP SDK 1.25.0 read_resource() doesn't support it 

1914 resource_text = await connect_to_sse_session(server_url=gateway_url, authentication=headers, uri=uri) 

1915 else: 

1916 # Note: meta_data not passed - MCP SDK 1.25.0 read_resource() doesn't support it 

1917 resource_text = await connect_to_streamablehttp_server(server_url=gateway_url, authentication=headers, uri=uri) 

1918 success = True # Mark as successful before returning 

1919 return resource_text 

1920 except Exception as e: 

1921 success = False 

1922 error_message = str(e) 

1923 raise 

1924 finally: 

1925 if resource_text: 

1926 try: 

1927 # First-Party 

1928 from mcpgateway.services.metrics_buffer_service import get_metrics_buffer_service # pylint: disable=import-outside-toplevel 

1929 

1930 metrics_buffer = get_metrics_buffer_service() 

1931 metrics_buffer.record_resource_metric( 

1932 resource_id=resource_id, 

1933 start_time=start_time, 

1934 success=success, 

1935 error_message=error_message, 

1936 ) 

1937 except Exception as metrics_error: 

1938 logger.warning(f"Failed to invoke resource metric: {metrics_error}") 

1939 

1940 # End Invoke resource span for Observability dashboard 

1941 # NOTE: Use fresh_db_session() since the original db was released 

1942 # before making HTTP calls to prevent connection pool exhaustion 

1943 if db_span_id and observability_service and not db_span_ended: 

1944 try: 

1945 with fresh_db_session() as fresh_db: 

1946 observability_service.end_span( 

1947 db=fresh_db, 

1948 span_id=db_span_id, 

1949 status="ok" if success else "error", 

1950 status_message=error_message if error_message else None, 

1951 ) 

1952 db_span_ended = True 

1953 logger.debug(f"✓ Ended invoke.resource span: {db_span_id}") 

1954 except Exception as e: 

1955 logger.warning(f"Failed to end observability span for invoking resource: {e}") 

1956 

1957 async def read_resource( 

1958 self, 

1959 db: Session, 

1960 resource_id: Optional[Union[int, str]] = None, 

1961 resource_uri: Optional[str] = None, 

1962 request_id: Optional[str] = None, 

1963 user: Optional[str] = None, 

1964 server_id: Optional[str] = None, 

1965 include_inactive: bool = False, 

1966 token_teams: Optional[List[str]] = None, 

1967 plugin_context_table: Optional[PluginContextTable] = None, 

1968 plugin_global_context: Optional[GlobalContext] = None, 

1969 meta_data: Optional[Dict[str, Any]] = None, 

1970 ) -> ResourceContent: 

1971 """Read a resource's content with plugin hook support. 

1972 

1973 Args: 

1974 db: Database session. 

1975 resource_id: Optional ID of the resource to read. 

1976 resource_uri: Optional URI of the resource to read. 

1977 request_id: Optional request ID for tracing. 

1978 user: Optional user email for authorization checks. 

1979 server_id: Optional server ID for server scoping enforcement. 

1980 include_inactive: Whether to include inactive resources. Defaults to False. 

1981 token_teams: Optional list of team IDs from token for authorization. 

1982 None = unrestricted admin, [] = public-only, [...] = team-scoped. 

1983 plugin_context_table: Optional plugin context table from previous hooks for cross-hook state sharing. 

1984 plugin_global_context: Optional global context from middleware for consistency across hooks. 

1985 meta_data: Optional metadata dictionary to pass to the gateway during resource reading. 

1986 

1987 Returns: 

1988 Resource content object 

1989 

1990 Raises: 

1991 ResourceNotFoundError: If resource not found or access denied 

1992 ResourceError: If blocked by plugin 

1993 PluginError: If encounters issue with plugin 

1994 PluginViolationError: If plugin violated the request. Example - In case of OPA plugin, if the request is denied by policy. 

1995 ValueError: If neither resource_id nor resource_uri is provided 

1996 

1997 Examples: 

1998 >>> from mcpgateway.common.models import ResourceContent 

1999 >>> from mcpgateway.services.resource_service import ResourceService 

2000 >>> from unittest.mock import MagicMock, PropertyMock 

2001 >>> service = ResourceService() 

2002 >>> db = MagicMock() 

2003 >>> uri = 'http://example.com/resource.txt' 

2004 >>> mock_resource = MagicMock() 

2005 >>> mock_resource.id = 123 

2006 >>> mock_resource.uri = uri 

2007 >>> type(mock_resource).content = PropertyMock(return_value='test') 

2008 >>> db.execute.return_value.scalar_one_or_none.return_value = mock_resource 

2009 >>> db.get.return_value = mock_resource 

2010 >>> import asyncio 

2011 >>> result = asyncio.run(service.read_resource(db, resource_uri=uri)) 

2012 >>> result.__class__.__name__ == 'ResourceContent' 

2013 True 

2014 

2015 Not found case returns ResourceNotFoundError: 

2016 

2017 >>> db2 = MagicMock() 

2018 >>> db2.execute.return_value.scalar_one_or_none.return_value = None 

2019 >>> db2.get.return_value = None 

2020 >>> import asyncio 

2021 >>> # Disable path validation for doctest 

2022 >>> import mcpgateway.config 

2023 >>> old_val = getattr(mcpgateway.config.settings, 'experimental_validate_io', False) 

2024 >>> mcpgateway.config.settings.experimental_validate_io = False 

2025 >>> def _nf(): 

2026 ... try: 

2027 ... asyncio.run(service.read_resource(db2, resource_uri='abc')) 

2028 ... except ResourceNotFoundError: 

2029 ... return True 

2030 >>> result = _nf() 

2031 >>> mcpgateway.config.settings.experimental_validate_io = old_val 

2032 >>> result 

2033 True 

2034 """ 

2035 start_time = time.monotonic() 

2036 success = False 

2037 error_message = None 

2038 resource_db = None 

2039 resource_db_gateway = None # Only set when eager-loaded via Q2's joinedload 

2040 content = None 

2041 uri = resource_uri or "unknown" 

2042 if resource_id: 

2043 resource_db = db.get(DbResource, resource_id, options=[joinedload(DbResource.gateway)]) 

2044 if resource_db: 

2045 uri = resource_db.uri 

2046 resource_db_gateway = resource_db.gateway # Eager-loaded, safe to access 

2047 # Check enabled status in Python (avoids redundant Q3/Q4 re-fetches) 

2048 if not include_inactive and not resource_db.enabled: 

2049 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive") 

2050 content = resource_db.content 

2051 else: 

2052 uri = None 

2053 

2054 # Create database span for observability dashboard 

2055 trace_id = current_trace_id.get() 

2056 db_span_id = None 

2057 db_span_ended = False 

2058 observability_service = ObservabilityService() if trace_id else None 

2059 

2060 if trace_id and observability_service: 

2061 try: 

2062 db_span_id = observability_service.start_span( 

2063 db=db, 

2064 trace_id=trace_id, 

2065 name="resource.read", 

2066 attributes={ 

2067 "resource.uri": str(resource_uri) if resource_uri else "unknown", 

2068 "user": user or "anonymous", 

2069 "server_id": server_id, 

2070 "request_id": request_id, 

2071 "http.url": uri if uri is not None and uri.startswith("http") else None, 

2072 "resource.type": "template" if (uri is not None and "{" in uri and "}" in uri) else "static", 

2073 }, 

2074 ) 

2075 logger.debug(f"✓ Created resource.read span: {db_span_id} for resource: {uri}") 

2076 except Exception as e: 

2077 logger.warning(f"Failed to start observability span for resource reading: {e}") 

2078 db_span_id = None 

2079 

2080 with create_span( 

2081 "resource.read", 

2082 { 

2083 "resource.uri": resource_uri or "unknown", 

2084 "user": user or "anonymous", 

2085 "server_id": server_id, 

2086 "request_id": request_id, 

2087 "http.url": uri if uri is not None and uri.startswith("http") else None, 

2088 "resource.type": "template" if (uri is not None and "{" in uri and "}" in uri) else "static", 

2089 }, 

2090 ) as span: 

2091 try: 

2092 # Generate request ID if not provided 

2093 if not request_id: 

2094 request_id = str(uuid.uuid4()) 

2095 

2096 original_uri = uri 

2097 contexts = None 

2098 

2099 # Check if plugin manager is available and eligible for this request 

2100 plugin_eligible = bool(self._plugin_manager and PLUGINS_AVAILABLE and uri and ("://" in uri)) 

2101 

2102 # Initialize plugin manager if needed (lazy init must happen before has_hooks_for check) 

2103 # pylint: disable=protected-access 

2104 if plugin_eligible and not self._plugin_manager._initialized: 

2105 await self._plugin_manager.initialize() 

2106 # pylint: enable=protected-access 

2107 

2108 # Check if any resource hooks are registered to avoid unnecessary context creation 

2109 has_pre_fetch = plugin_eligible and self._plugin_manager.has_hooks_for(ResourceHookType.RESOURCE_PRE_FETCH) 

2110 has_post_fetch = plugin_eligible and self._plugin_manager.has_hooks_for(ResourceHookType.RESOURCE_POST_FETCH) 

2111 

2112 # Initialize plugin context variables only if hooks are registered 

2113 global_context = None 

2114 if has_pre_fetch or has_post_fetch: 

2115 # Create plugin context 

2116 # Normalize user to an identifier string if provided 

2117 user_id = None 

2118 if user is not None: 

2119 if isinstance(user, dict) and "email" in user: 

2120 user_id = user.get("email") 

2121 elif isinstance(user, str): 

2122 user_id = user 

2123 else: 

2124 # Attempt to fallback to attribute access 

2125 user_id = getattr(user, "email", None) 

2126 

2127 # Use existing global_context from middleware or create new one 

2128 if plugin_global_context: 

2129 global_context = plugin_global_context 

2130 # Update fields with resource-specific information 

2131 if user_id: 

2132 global_context.user = user_id 

2133 if server_id: 

2134 global_context.server_id = server_id 

2135 else: 

2136 # Create new context (fallback when middleware didn't run) 

2137 global_context = GlobalContext(request_id=request_id, user=user_id, server_id=server_id) 

2138 

2139 # Call pre-fetch hooks if registered 

2140 if has_pre_fetch: 

2141 # Create pre-fetch payload 

2142 pre_payload = ResourcePreFetchPayload(uri=uri, metadata={}) 

2143 

2144 # Execute pre-fetch hooks with context from previous hooks 

2145 pre_result, contexts = await self._plugin_manager.invoke_hook( 

2146 ResourceHookType.RESOURCE_PRE_FETCH, 

2147 pre_payload, 

2148 global_context, 

2149 local_contexts=plugin_context_table, # Pass context from previous hooks 

2150 violations_as_exceptions=True, 

2151 ) 

2152 # Use modified URI if plugin changed it 

2153 if pre_result.modified_payload: 

2154 uri = pre_result.modified_payload.uri 

2155 logger.debug(f"Resource URI modified by plugin: {original_uri} -> {uri}") 

2156 

2157 # Validate resource path if experimental validation is enabled 

2158 if getattr(settings, "experimental_validate_io", False) and uri and isinstance(uri, str): 

2159 try: 

2160 SecurityValidator.validate_path(uri, getattr(settings, "allowed_roots", None)) 

2161 except ValueError as e: 

2162 raise ResourceError(f"Path validation failed: {e}") 

2163 

2164 # Original resource fetching logic 

2165 logger.info(f"Fetching resource: {resource_id} (URI: {uri})") 

2166 # Check for template 

2167 

2168 if resource_db is None and uri is not None: # and "{" in uri and "}" in uri: 

2169 # Matches uri (modified value from pluggins if applicable) 

2170 # with uri from resource DB 

2171 # if uri is of type resource template then resource is retreived from DB 

2172 query = select(DbResource).where(DbResource.uri == str(uri)).where(DbResource.enabled) 

2173 if include_inactive: 

2174 query = select(DbResource).where(DbResource.uri == str(uri)) 

2175 resource_db = db.execute(query).scalar_one_or_none() 

2176 if resource_db: 

2177 # resource_id = resource_db.id 

2178 content = resource_db.content 

2179 else: 

2180 # Check the inactivity first 

2181 check_inactivity = db.execute(select(DbResource).where(DbResource.uri == str(resource_uri)).where(not_(DbResource.enabled))).scalar_one_or_none() 

2182 if check_inactivity: 

2183 raise ResourceNotFoundError(f"Resource '{resource_uri}' exists but is inactive") 

2184 

2185 if resource_db is None: 

2186 if resource_uri: 

2187 # if resource_uri is provided 

2188 # modified uri have templatized resource with prefilled value 

2189 # triggers _read_template_resource 

2190 # it internally checks which uri matches the pattern of modified uri and fetches 

2191 # the one which matches else raises ResourceNotFoundError 

2192 try: 

2193 content = await self._read_template_resource(db, uri) or None 

2194 # ═══════════════════════════════════════════════════════════════════════════ 

2195 # SECURITY: Fetch the template's DbResource record for access checking 

2196 # _read_template_resource returns ResourceContent with the template's ID 

2197 # ═══════════════════════════════════════════════════════════════════════════ 

2198 if content is not None and hasattr(content, "id") and content.id: 

2199 template_query = select(DbResource).where(DbResource.id == str(content.id)) 

2200 if not include_inactive: 

2201 template_query = template_query.where(DbResource.enabled) 

2202 resource_db = db.execute(template_query).scalar_one_or_none() 

2203 except Exception as e: 

2204 raise ResourceNotFoundError(f"Resource template not found for '{resource_uri}'") from e 

2205 

2206 if resource_uri: 

2207 if content is None and resource_db is None: 

2208 raise ResourceNotFoundError(f"Resource template not found for '{resource_uri}'") 

2209 

2210 if resource_id and resource_db is None: 

2211 # if resource_id provided but not found by Q2 (shouldn't normally happen, 

2212 # but handles race conditions where resource is deleted between requests) 

2213 query = select(DbResource).where(DbResource.id == str(resource_id)).where(DbResource.enabled) 

2214 if include_inactive: 

2215 query = select(DbResource).where(DbResource.id == str(resource_id)) 

2216 resource_db = db.execute(query).scalar_one_or_none() 

2217 if resource_db: 

2218 original_uri = resource_db.uri or None 

2219 content = resource_db.content 

2220 else: 

2221 check_inactivity = db.execute(select(DbResource).where(DbResource.id == str(resource_id)).where(not_(DbResource.enabled))).scalar_one_or_none() 

2222 if check_inactivity: 

2223 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive") 

2224 raise ResourceNotFoundError(f"Resource not found for the resource id: {resource_id}") 

2225 

2226 # ═══════════════════════════════════════════════════════════════════════════ 

2227 # SECURITY: Check resource access based on visibility and team membership 

2228 # ═══════════════════════════════════════════════════════════════════════════ 

2229 if resource_db: 

2230 if not await self._check_resource_access(db, resource_db, user, token_teams): 

2231 # Don't reveal resource existence - return generic "not found" 

2232 raise ResourceNotFoundError(f"Resource not found: {resource_uri or resource_id}") 

2233 

2234 # ═══════════════════════════════════════════════════════════════════════════ 

2235 # SECURITY: Enforce server scoping if server_id is provided 

2236 # Resource must be attached to the specified virtual server 

2237 # ═══════════════════════════════════════════════════════════════════════════ 

2238 if server_id: 

2239 server_match = db.execute( 

2240 select(server_resource_association.c.resource_id).where( 

2241 server_resource_association.c.server_id == server_id, 

2242 server_resource_association.c.resource_id == resource_db.id, 

2243 ) 

2244 ).first() 

2245 if not server_match: 

2246 raise ResourceNotFoundError(f"Resource not found: {resource_uri or resource_id}") 

2247 

2248 # Set success attributes on span 

2249 if span: 

2250 span.set_attribute("success", True) 

2251 span.set_attribute("duration.ms", (time.monotonic() - start_time) * 1000) 

2252 if content: 

2253 span.set_attribute("content.size", len(str(content))) 

2254 

2255 success = True 

2256 # Return standardized content without breaking callers that expect passthrough 

2257 # Prefer returning first-class content models or objects with content-like attributes. 

2258 # ResourceContent and TextContent already imported at top level 

2259 

2260 # Release transaction before network calls to avoid idle-in-transaction during invoke_resource 

2261 db.commit() 

2262 

2263 # ═══════════════════════════════════════════════════════════════════════════ 

2264 # RESOLVE CONTENT: Fetch actual content from gateway if needed 

2265 # ═══════════════════════════════════════════════════════════════════════════ 

2266 # If content is a Pydantic content model, invoke gateway 

2267 

2268 if isinstance(content, (ResourceContent, TextContent)): 

2269 resource_response = await self.invoke_resource( 

2270 db=db, 

2271 resource_id=getattr(content, "id"), 

2272 resource_uri=getattr(content, "uri") or None, 

2273 resource_template_uri=getattr(content, "text") or None, 

2274 user_identity=user, 

2275 meta_data=meta_data, 

2276 resource_obj=resource_db, 

2277 gateway_obj=resource_db_gateway, 

2278 ) 

2279 if resource_response: 

2280 setattr(content, "text", resource_response) 

2281 # If content is any object that quacks like content 

2282 elif hasattr(content, "text") or hasattr(content, "blob"): 

2283 if hasattr(content, "blob"): 

2284 resource_response = await self.invoke_resource( 

2285 db=db, 

2286 resource_id=getattr(content, "id"), 

2287 resource_uri=getattr(content, "uri") or None, 

2288 resource_template_uri=getattr(content, "blob") or None, 

2289 user_identity=user, 

2290 meta_data=meta_data, 

2291 resource_obj=resource_db, 

2292 gateway_obj=resource_db_gateway, 

2293 ) 

2294 if resource_response: 

2295 setattr(content, "blob", resource_response) 

2296 elif hasattr(content, "text"): 

2297 resource_response = await self.invoke_resource( 

2298 db=db, 

2299 resource_id=getattr(content, "id"), 

2300 resource_uri=getattr(content, "uri") or None, 

2301 resource_template_uri=getattr(content, "text") or None, 

2302 user_identity=user, 

2303 meta_data=meta_data, 

2304 resource_obj=resource_db, 

2305 gateway_obj=resource_db_gateway, 

2306 ) 

2307 if resource_response: 

2308 setattr(content, "text", resource_response) 

2309 # Normalize primitive types to ResourceContent 

2310 elif isinstance(content, bytes): 

2311 content = ResourceContent(type="resource", id=str(resource_id), uri=original_uri, blob=content) 

2312 elif isinstance(content, str): 

2313 content = ResourceContent(type="resource", id=str(resource_id), uri=original_uri, text=content) 

2314 else: 

2315 # Fallback to stringified content 

2316 content = ResourceContent(type="resource", id=str(resource_id) or str(content.id), uri=original_uri or content.uri, text=str(content)) 

2317 

2318 # ═══════════════════════════════════════════════════════════════════════════ 

2319 # POST-FETCH HOOKS: Now called AFTER content is resolved from gateway 

2320 # ═══════════════════════════════════════════════════════════════════════════ 

2321 if has_post_fetch: 

2322 post_payload = ResourcePostFetchPayload(uri=original_uri, content=content) 

2323 post_result, _ = await self._plugin_manager.invoke_hook(ResourceHookType.RESOURCE_POST_FETCH, post_payload, global_context, contexts, violations_as_exceptions=True) 

2324 if post_result.modified_payload: 

2325 content = post_result.modified_payload.content 

2326 

2327 return content 

2328 except Exception as e: 

2329 success = False 

2330 error_message = str(e) 

2331 raise 

2332 finally: 

2333 # Record metrics only if we found a resource (not for templates) 

2334 if resource_db: 

2335 try: 

2336 # First-Party 

2337 from mcpgateway.services.metrics_buffer_service import get_metrics_buffer_service # pylint: disable=import-outside-toplevel 

2338 

2339 metrics_buffer = get_metrics_buffer_service() 

2340 metrics_buffer.record_resource_metric( 

2341 resource_id=resource_db.id, 

2342 start_time=start_time, 

2343 success=success, 

2344 error_message=error_message, 

2345 ) 

2346 except Exception as metrics_error: 

2347 logger.warning(f"Failed to record resource metric: {metrics_error}") 

2348 

2349 # End database span for observability dashboard 

2350 # NOTE: Use fresh_db_session() since db may have been closed by invoke_resource 

2351 if db_span_id and observability_service and not db_span_ended: 

2352 try: 

2353 with fresh_db_session() as fresh_db: 

2354 observability_service.end_span( 

2355 db=fresh_db, 

2356 span_id=db_span_id, 

2357 status="ok" if success else "error", 

2358 status_message=error_message if error_message else None, 

2359 ) 

2360 db_span_ended = True 

2361 logger.debug(f"✓ Ended resource.read span: {db_span_id}") 

2362 except Exception as e: 

2363 logger.warning(f"Failed to end observability span for resource reading: {e}") 

2364 

2365 async def set_resource_state(self, db: Session, resource_id: int, activate: bool, user_email: Optional[str] = None, skip_cache_invalidation: bool = False) -> ResourceRead: 

2366 """ 

2367 Set the activation status of a resource. 

2368 

2369 Args: 

2370 db: Database session 

2371 resource_id: Resource ID 

2372 activate: True to activate, False to deactivate 

2373 user_email: Optional[str] The email of the user to check if the user has permission to modify. 

2374 skip_cache_invalidation: If True, skip cache invalidation (used for batch operations). 

2375 

2376 Returns: 

2377 The updated ResourceRead object 

2378 

2379 Raises: 

2380 ResourceNotFoundError: If the resource is not found. 

2381 ResourceLockConflictError: If the resource is locked by another transaction. 

2382 ResourceError: For other errors. 

2383 PermissionError: If user doesn't own the resource. 

2384 

2385 Examples: 

2386 >>> from mcpgateway.services.resource_service import ResourceService 

2387 >>> from unittest.mock import MagicMock, AsyncMock 

2388 >>> from mcpgateway.schemas import ResourceRead 

2389 >>> service = ResourceService() 

2390 >>> db = MagicMock() 

2391 >>> resource = MagicMock() 

2392 >>> db.get.return_value = resource 

2393 >>> db.commit = MagicMock() 

2394 >>> db.refresh = MagicMock() 

2395 >>> service._notify_resource_activated = AsyncMock() 

2396 >>> service._notify_resource_deactivated = AsyncMock() 

2397 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read') 

2398 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read') 

2399 >>> import asyncio 

2400 >>> asyncio.run(service.set_resource_state(db, 1, True)) 

2401 'resource_read' 

2402 """ 

2403 try: 

2404 # Use nowait=True to fail fast if row is locked, preventing lock contention under high load 

2405 try: 

2406 resource = get_for_update(db, DbResource, resource_id, nowait=True) 

2407 except OperationalError as lock_err: 

2408 # Row is locked by another transaction - fail fast with 409 

2409 db.rollback() 

2410 raise ResourceLockConflictError(f"Resource {resource_id} is currently being modified by another request") from lock_err 

2411 if not resource: 

2412 raise ResourceNotFoundError(f"Resource not found: {resource_id}") 

2413 

2414 if user_email: 

2415 # First-Party 

2416 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

2417 

2418 permission_service = PermissionService(db) 

2419 if not await permission_service.check_resource_ownership(user_email, resource): 

2420 raise PermissionError("Only the owner can activate the Resource" if activate else "Only the owner can deactivate the Resource") 

2421 

2422 # Update status if it's different 

2423 if resource.enabled != activate: 

2424 resource.enabled = activate 

2425 resource.updated_at = datetime.now(timezone.utc) 

2426 db.commit() 

2427 db.refresh(resource) 

2428 

2429 # Invalidate cache after status change (skip for batch operations) 

2430 if not skip_cache_invalidation: 

2431 cache = _get_registry_cache() 

2432 await cache.invalidate_resources() 

2433 

2434 # Notify subscribers 

2435 if activate: 

2436 await self._notify_resource_activated(resource) 

2437 else: 

2438 await self._notify_resource_deactivated(resource) 

2439 

2440 logger.info(f"Resource {resource.uri} {'activated' if activate else 'deactivated'}") 

2441 

2442 # Structured logging: Audit trail for resource state change 

2443 audit_trail.log_action( 

2444 user_id=user_email or "system", 

2445 action="set_resource_state", 

2446 resource_type="resource", 

2447 resource_id=str(resource.id), 

2448 resource_name=resource.name, 

2449 user_email=user_email, 

2450 team_id=resource.team_id, 

2451 new_values={ 

2452 "enabled": resource.enabled, 

2453 }, 

2454 context={ 

2455 "action": "activate" if activate else "deactivate", 

2456 }, 

2457 db=db, 

2458 ) 

2459 

2460 # Structured logging: Log successful resource state change 

2461 structured_logger.log( 

2462 level="INFO", 

2463 message=f"Resource {'activated' if activate else 'deactivated'} successfully", 

2464 event_type="resource_state_changed", 

2465 component="resource_service", 

2466 user_email=user_email, 

2467 team_id=resource.team_id, 

2468 resource_type="resource", 

2469 resource_id=str(resource.id), 

2470 custom_fields={ 

2471 "resource_uri": resource.uri, 

2472 "enabled": resource.enabled, 

2473 }, 

2474 db=db, 

2475 ) 

2476 

2477 resource.team = self._get_team_name(db, resource.team_id) 

2478 return self.convert_resource_to_read(resource) 

2479 except PermissionError as e: 

2480 # Structured logging: Log permission error 

2481 structured_logger.log( 

2482 level="WARNING", 

2483 message="Resource state change failed due to permission error", 

2484 event_type="resource_state_change_permission_denied", 

2485 component="resource_service", 

2486 user_email=user_email, 

2487 resource_type="resource", 

2488 resource_id=str(resource_id), 

2489 error=e, 

2490 db=db, 

2491 ) 

2492 raise e 

2493 except ResourceLockConflictError: 

2494 # Re-raise lock conflicts without wrapping - allows 409 response 

2495 raise 

2496 except ResourceNotFoundError: 

2497 # Re-raise not found without wrapping - allows 404 response 

2498 raise 

2499 except Exception as e: 

2500 db.rollback() 

2501 

2502 # Structured logging: Log generic resource state change failure 

2503 structured_logger.log( 

2504 level="ERROR", 

2505 message="Resource state change failed", 

2506 event_type="resource_state_change_failed", 

2507 component="resource_service", 

2508 user_email=user_email, 

2509 resource_type="resource", 

2510 resource_id=str(resource_id), 

2511 error=e, 

2512 db=db, 

2513 ) 

2514 raise ResourceError(f"Failed to set resource state: {str(e)}") 

2515 

2516 async def subscribe_resource(self, db: Session, subscription: ResourceSubscription) -> None: 

2517 """ 

2518 Subscribe to a resource. 

2519 

2520 Args: 

2521 db: Database session 

2522 subscription: Resource subscription object 

2523 

2524 Raises: 

2525 ResourceNotFoundError: If the resource is not found or is inactive 

2526 ResourceError: For other subscription errors 

2527 

2528 Examples: 

2529 >>> from mcpgateway.services.resource_service import ResourceService 

2530 >>> from unittest.mock import MagicMock 

2531 >>> service = ResourceService() 

2532 >>> db = MagicMock() 

2533 >>> subscription = MagicMock() 

2534 >>> import asyncio 

2535 >>> asyncio.run(service.subscribe_resource(db, subscription)) 

2536 """ 

2537 try: 

2538 # Verify resource exists (single query to avoid TOCTOU between active/inactive checks) 

2539 resource = db.execute(select(DbResource).where(DbResource.uri == subscription.uri)).scalar_one_or_none() 

2540 

2541 if not resource: 

2542 raise ResourceNotFoundError(f"Resource not found: {subscription.uri}") 

2543 

2544 if not resource.enabled: 

2545 raise ResourceNotFoundError(f"Resource '{subscription.uri}' exists but is inactive") 

2546 

2547 # Create subscription 

2548 db_sub = DbSubscription(resource_id=resource.id, subscriber_id=subscription.subscriber_id) 

2549 db.add(db_sub) 

2550 db.commit() 

2551 

2552 logger.info(f"Added subscription for {subscription.uri} by {subscription.subscriber_id}") 

2553 

2554 except Exception as e: 

2555 db.rollback() 

2556 raise ResourceError(f"Failed to subscribe: {str(e)}") 

2557 

2558 async def unsubscribe_resource(self, db: Session, subscription: ResourceSubscription) -> None: 

2559 """ 

2560 Unsubscribe from a resource. 

2561 

2562 Args: 

2563 db: Database session 

2564 subscription: Resource subscription object 

2565 

2566 Raises: 

2567 

2568 Examples: 

2569 >>> from mcpgateway.services.resource_service import ResourceService 

2570 >>> from unittest.mock import MagicMock 

2571 >>> service = ResourceService() 

2572 >>> db = MagicMock() 

2573 >>> subscription = MagicMock() 

2574 >>> import asyncio 

2575 >>> asyncio.run(service.unsubscribe_resource(db, subscription)) 

2576 """ 

2577 try: 

2578 # Find resource 

2579 resource = db.execute(select(DbResource).where(DbResource.uri == subscription.uri)).scalar_one_or_none() 

2580 

2581 if not resource: 

2582 return 

2583 

2584 # Remove subscription 

2585 db.execute(select(DbSubscription).where(DbSubscription.resource_id == resource.id).where(DbSubscription.subscriber_id == subscription.subscriber_id)).delete() 

2586 db.commit() 

2587 

2588 logger.info(f"Removed subscription for {subscription.uri} by {subscription.subscriber_id}") 

2589 

2590 except Exception as e: 

2591 db.rollback() 

2592 logger.error(f"Failed to unsubscribe: {str(e)}") 

2593 

2594 async def update_resource( 

2595 self, 

2596 db: Session, 

2597 resource_id: Union[int, str], 

2598 resource_update: ResourceUpdate, 

2599 modified_by: Optional[str] = None, 

2600 modified_from_ip: Optional[str] = None, 

2601 modified_via: Optional[str] = None, 

2602 modified_user_agent: Optional[str] = None, 

2603 user_email: Optional[str] = None, 

2604 ) -> ResourceRead: 

2605 """ 

2606 Update a resource. 

2607 

2608 Args: 

2609 db: Database session 

2610 resource_id: Resource ID 

2611 resource_update: Resource update object 

2612 modified_by: Username of the person modifying the resource 

2613 modified_from_ip: IP address where the modification request originated 

2614 modified_via: Source of modification (ui/api/import) 

2615 modified_user_agent: User agent string from the modification request 

2616 user_email: Email of user performing update (for ownership check) 

2617 

2618 Returns: 

2619 The updated ResourceRead object 

2620 

2621 Raises: 

2622 ResourceNotFoundError: If the resource is not found 

2623 ResourceURIConflictError: If a resource with the same URI already exists. 

2624 PermissionError: If user doesn't own the resource 

2625 ResourceError: For other update errors 

2626 IntegrityError: If a database integrity error occurs. 

2627 Exception: For unexpected errors 

2628 

2629 Example: 

2630 >>> from mcpgateway.services.resource_service import ResourceService 

2631 >>> from unittest.mock import MagicMock, AsyncMock 

2632 >>> from mcpgateway.schemas import ResourceRead 

2633 >>> service = ResourceService() 

2634 >>> db = MagicMock() 

2635 >>> resource = MagicMock() 

2636 >>> db.get.return_value = resource 

2637 >>> db.commit = MagicMock() 

2638 >>> db.refresh = MagicMock() 

2639 >>> service._notify_resource_updated = AsyncMock() 

2640 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read') 

2641 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read') 

2642 >>> import asyncio 

2643 >>> asyncio.run(service.update_resource(db, 'resource_id', MagicMock())) 

2644 'resource_read' 

2645 """ 

2646 try: 

2647 logger.info(f"Updating resource: {resource_id}") 

2648 resource = get_for_update(db, DbResource, resource_id) 

2649 if not resource: 

2650 raise ResourceNotFoundError(f"Resource not found: {resource_id}") 

2651 

2652 # # Check for uri conflict if uri is being changed and visibility is public 

2653 if resource_update.uri and resource_update.uri != resource.uri: 

2654 visibility = resource_update.visibility or resource.visibility 

2655 team_id = resource_update.team_id or resource.team_id 

2656 if visibility.lower() == "public": 

2657 # Check for existing public resources with the same uri 

2658 existing_resource = get_for_update(db, DbResource, where=and_(DbResource.uri == resource_update.uri, DbResource.visibility == "public", DbResource.id != resource_id)) 

2659 if existing_resource: 

2660 raise ResourceURIConflictError(resource_update.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility) 

2661 elif visibility.lower() == "team" and team_id: 

2662 # Check for existing team resource with the same uri 

2663 existing_resource = get_for_update( 

2664 db, DbResource, where=and_(DbResource.uri == resource_update.uri, DbResource.visibility == "team", DbResource.team_id == team_id, DbResource.id != resource_id) 

2665 ) 

2666 if existing_resource: 

2667 raise ResourceURIConflictError(resource_update.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility) 

2668 

2669 # Check ownership if user_email provided 

2670 if user_email: 

2671 # First-Party 

2672 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

2673 

2674 permission_service = PermissionService(db) 

2675 if not await permission_service.check_resource_ownership(user_email, resource): 

2676 raise PermissionError("Only the owner can update this resource") 

2677 

2678 # Update fields if provided 

2679 if resource_update.uri is not None: 

2680 resource.uri = resource_update.uri 

2681 if resource_update.name is not None: 

2682 resource.name = resource_update.name 

2683 if resource_update.description is not None: 

2684 resource.description = resource_update.description 

2685 if resource_update.mime_type is not None: 

2686 resource.mime_type = resource_update.mime_type 

2687 if resource_update.uri_template is not None: 

2688 resource.uri_template = resource_update.uri_template 

2689 if resource_update.visibility is not None: 

2690 resource.visibility = resource_update.visibility 

2691 

2692 # Update content if provided 

2693 if resource_update.content is not None: 

2694 # Determine content storage 

2695 is_text = resource.mime_type and resource.mime_type.startswith("text/") or isinstance(resource_update.content, str) 

2696 

2697 resource.text_content = resource_update.content if is_text else None 

2698 resource.binary_content = ( 

2699 resource_update.content.encode() if is_text and isinstance(resource_update.content, str) else resource_update.content if isinstance(resource_update.content, bytes) else None 

2700 ) 

2701 resource.size = len(resource_update.content) 

2702 

2703 # Update tags if provided 

2704 if resource_update.tags is not None: 

2705 resource.tags = resource_update.tags 

2706 

2707 # Update metadata fields 

2708 resource.updated_at = datetime.now(timezone.utc) 

2709 if modified_by: 

2710 resource.modified_by = modified_by 

2711 if modified_from_ip: 

2712 resource.modified_from_ip = modified_from_ip 

2713 if modified_via: 

2714 resource.modified_via = modified_via 

2715 if modified_user_agent: 

2716 resource.modified_user_agent = modified_user_agent 

2717 if hasattr(resource, "version") and resource.version is not None: 

2718 resource.version = resource.version + 1 

2719 else: 

2720 resource.version = 1 

2721 db.commit() 

2722 db.refresh(resource) 

2723 

2724 # Invalidate cache after successful update 

2725 cache = _get_registry_cache() 

2726 await cache.invalidate_resources() 

2727 # Also invalidate tags cache since resource tags may have changed 

2728 # First-Party 

2729 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

2730 

2731 await admin_stats_cache.invalidate_tags() 

2732 # First-Party 

2733 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

2734 

2735 metrics_cache.invalidate_prefix("top_resources:") 

2736 metrics_cache.invalidate("resources") 

2737 

2738 # Notify subscribers 

2739 await self._notify_resource_updated(resource) 

2740 

2741 logger.info(f"Updated resource: {resource.uri}") 

2742 

2743 # Structured logging: Audit trail for resource update 

2744 changes = [] 

2745 if resource_update.uri: 

2746 changes.append(f"uri: {resource_update.uri}") 

2747 if resource_update.visibility: 

2748 changes.append(f"visibility: {resource_update.visibility}") 

2749 if resource_update.description: 

2750 changes.append("description updated") 

2751 

2752 audit_trail.log_action( 

2753 user_id=user_email or modified_by or "system", 

2754 action="update_resource", 

2755 resource_type="resource", 

2756 resource_id=str(resource.id), 

2757 resource_name=resource.name, 

2758 user_email=user_email, 

2759 team_id=resource.team_id, 

2760 client_ip=modified_from_ip, 

2761 user_agent=modified_user_agent, 

2762 new_values={ 

2763 "uri": resource.uri, 

2764 "name": resource.name, 

2765 "version": resource.version, 

2766 }, 

2767 context={ 

2768 "modified_via": modified_via, 

2769 "changes": ", ".join(changes) if changes else "metadata only", 

2770 }, 

2771 db=db, 

2772 ) 

2773 

2774 # Structured logging: Log successful resource update 

2775 structured_logger.log( 

2776 level="INFO", 

2777 message="Resource updated successfully", 

2778 event_type="resource_updated", 

2779 component="resource_service", 

2780 user_id=modified_by, 

2781 user_email=user_email, 

2782 team_id=resource.team_id, 

2783 resource_type="resource", 

2784 resource_id=str(resource.id), 

2785 custom_fields={ 

2786 "resource_uri": resource.uri, 

2787 "version": resource.version, 

2788 }, 

2789 db=db, 

2790 ) 

2791 

2792 return self.convert_resource_to_read(resource) 

2793 except PermissionError as pe: 

2794 db.rollback() 

2795 

2796 # Structured logging: Log permission error 

2797 structured_logger.log( 

2798 level="WARNING", 

2799 message="Resource update failed due to permission error", 

2800 event_type="resource_update_permission_denied", 

2801 component="resource_service", 

2802 user_email=user_email, 

2803 resource_type="resource", 

2804 resource_id=str(resource_id), 

2805 error=pe, 

2806 db=db, 

2807 ) 

2808 raise 

2809 except IntegrityError as ie: 

2810 db.rollback() 

2811 logger.error(f"IntegrityErrors in group: {ie}") 

2812 

2813 # Structured logging: Log database integrity error 

2814 structured_logger.log( 

2815 level="ERROR", 

2816 message="Resource update failed due to database integrity error", 

2817 event_type="resource_update_failed", 

2818 component="resource_service", 

2819 user_id=modified_by, 

2820 user_email=user_email, 

2821 resource_type="resource", 

2822 resource_id=str(resource_id), 

2823 error=ie, 

2824 db=db, 

2825 ) 

2826 raise ie 

2827 except ResourceURIConflictError as pe: 

2828 logger.error(f"Resource URI conflict: {pe}") 

2829 

2830 # Structured logging: Log URI conflict error 

2831 structured_logger.log( 

2832 level="WARNING", 

2833 message="Resource update failed due to URI conflict", 

2834 event_type="resource_uri_conflict", 

2835 component="resource_service", 

2836 user_id=modified_by, 

2837 user_email=user_email, 

2838 resource_type="resource", 

2839 resource_id=str(resource_id), 

2840 error=pe, 

2841 db=db, 

2842 ) 

2843 raise pe 

2844 except Exception as e: 

2845 db.rollback() 

2846 if isinstance(e, ResourceNotFoundError): 

2847 # Structured logging: Log not found error 

2848 structured_logger.log( 

2849 level="ERROR", 

2850 message="Resource update failed - resource not found", 

2851 event_type="resource_not_found", 

2852 component="resource_service", 

2853 user_email=user_email, 

2854 resource_type="resource", 

2855 resource_id=str(resource_id), 

2856 error=e, 

2857 db=db, 

2858 ) 

2859 raise e 

2860 

2861 # Structured logging: Log generic resource update failure 

2862 structured_logger.log( 

2863 level="ERROR", 

2864 message="Resource update failed", 

2865 event_type="resource_update_failed", 

2866 component="resource_service", 

2867 user_id=modified_by, 

2868 user_email=user_email, 

2869 resource_type="resource", 

2870 resource_id=str(resource_id), 

2871 error=e, 

2872 db=db, 

2873 ) 

2874 raise ResourceError(f"Failed to update resource: {str(e)}") 

2875 

2876 async def delete_resource(self, db: Session, resource_id: Union[int, str], user_email: Optional[str] = None, purge_metrics: bool = False) -> None: 

2877 """ 

2878 Delete a resource. 

2879 

2880 Args: 

2881 db: Database session 

2882 resource_id: Resource ID 

2883 user_email: Email of user performing delete (for ownership check) 

2884 purge_metrics: If True, delete raw + rollup metrics for this resource 

2885 

2886 Raises: 

2887 ResourceNotFoundError: If the resource is not found 

2888 PermissionError: If user doesn't own the resource 

2889 ResourceError: For other deletion errors 

2890 

2891 Example: 

2892 >>> from mcpgateway.services.resource_service import ResourceService 

2893 >>> from unittest.mock import MagicMock, AsyncMock 

2894 >>> service = ResourceService() 

2895 >>> db = MagicMock() 

2896 >>> resource = MagicMock() 

2897 >>> db.get.return_value = resource 

2898 >>> db.delete = MagicMock() 

2899 >>> db.commit = MagicMock() 

2900 >>> service._notify_resource_deleted = AsyncMock() 

2901 >>> import asyncio 

2902 >>> asyncio.run(service.delete_resource(db, 'resource_id')) 

2903 """ 

2904 try: 

2905 # Find resource by its URI. 

2906 resource = db.execute(select(DbResource).where(DbResource.id == resource_id)).scalar_one_or_none() 

2907 

2908 if not resource: 

2909 # If resource doesn't exist, rollback and re-raise a ResourceNotFoundError. 

2910 db.rollback() 

2911 raise ResourceNotFoundError(f"Resource not found: {resource_id}") 

2912 

2913 # Check ownership if user_email provided 

2914 if user_email: 

2915 # First-Party 

2916 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

2917 

2918 permission_service = PermissionService(db) 

2919 if not await permission_service.check_resource_ownership(user_email, resource): 

2920 raise PermissionError("Only the owner can delete this resource") 

2921 

2922 # Store resource info for notification before deletion. 

2923 resource_info = { 

2924 "id": resource.id, 

2925 "uri": resource.uri, 

2926 "name": resource.name, 

2927 } 

2928 

2929 # Remove subscriptions using SQLAlchemy's delete() expression. 

2930 db.execute(delete(DbSubscription).where(DbSubscription.resource_id == resource.id)) 

2931 

2932 if purge_metrics: 

2933 with pause_rollup_during_purge(reason=f"purge_resource:{resource.id}"): 

2934 delete_metrics_in_batches(db, ResourceMetric, ResourceMetric.resource_id, resource.id) 

2935 delete_metrics_in_batches(db, ResourceMetricsHourly, ResourceMetricsHourly.resource_id, resource.id) 

2936 

2937 # Hard delete the resource. 

2938 resource_uri = resource.uri 

2939 resource_name = resource.name 

2940 resource_team_id = resource.team_id 

2941 

2942 db.delete(resource) 

2943 db.commit() 

2944 

2945 # Invalidate cache after successful deletion 

2946 cache = _get_registry_cache() 

2947 await cache.invalidate_resources() 

2948 # Also invalidate tags cache since resource tags may have changed 

2949 # First-Party 

2950 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

2951 

2952 await admin_stats_cache.invalidate_tags() 

2953 

2954 # Notify subscribers. 

2955 await self._notify_resource_deleted(resource_info) 

2956 

2957 logger.info(f"Permanently deleted resource: {resource.uri}") 

2958 

2959 # Structured logging: Audit trail for resource deletion 

2960 audit_trail.log_action( 

2961 user_id=user_email or "system", 

2962 action="delete_resource", 

2963 resource_type="resource", 

2964 resource_id=str(resource_info["id"]), 

2965 resource_name=resource_name, 

2966 user_email=user_email, 

2967 team_id=resource_team_id, 

2968 old_values={ 

2969 "uri": resource_uri, 

2970 "name": resource_name, 

2971 }, 

2972 db=db, 

2973 ) 

2974 

2975 # Structured logging: Log successful resource deletion 

2976 structured_logger.log( 

2977 level="INFO", 

2978 message="Resource deleted successfully", 

2979 event_type="resource_deleted", 

2980 component="resource_service", 

2981 user_email=user_email, 

2982 team_id=resource_team_id, 

2983 resource_type="resource", 

2984 resource_id=str(resource_info["id"]), 

2985 custom_fields={ 

2986 "resource_uri": resource_uri, 

2987 "purge_metrics": purge_metrics, 

2988 }, 

2989 db=db, 

2990 ) 

2991 

2992 except PermissionError as pe: 

2993 db.rollback() 

2994 

2995 # Structured logging: Log permission error 

2996 structured_logger.log( 

2997 level="WARNING", 

2998 message="Resource deletion failed due to permission error", 

2999 event_type="resource_delete_permission_denied", 

3000 component="resource_service", 

3001 user_email=user_email, 

3002 resource_type="resource", 

3003 resource_id=str(resource_id), 

3004 error=pe, 

3005 db=db, 

3006 ) 

3007 raise 

3008 except ResourceNotFoundError as rnfe: 

3009 # ResourceNotFoundError is re-raised to be handled in the endpoint. 

3010 # Structured logging: Log not found error 

3011 structured_logger.log( 

3012 level="ERROR", 

3013 message="Resource deletion failed - resource not found", 

3014 event_type="resource_not_found", 

3015 component="resource_service", 

3016 user_email=user_email, 

3017 resource_type="resource", 

3018 resource_id=str(resource_id), 

3019 error=rnfe, 

3020 db=db, 

3021 ) 

3022 raise 

3023 except Exception as e: 

3024 db.rollback() 

3025 

3026 # Structured logging: Log generic resource deletion failure 

3027 structured_logger.log( 

3028 level="ERROR", 

3029 message="Resource deletion failed", 

3030 event_type="resource_deletion_failed", 

3031 component="resource_service", 

3032 user_email=user_email, 

3033 resource_type="resource", 

3034 resource_id=str(resource_id), 

3035 error=e, 

3036 db=db, 

3037 ) 

3038 raise ResourceError(f"Failed to delete resource: {str(e)}") 

3039 

3040 async def get_resource_by_id(self, db: Session, resource_id: str, include_inactive: bool = False) -> ResourceRead: 

3041 """ 

3042 Get a resource by ID. 

3043 

3044 Args: 

3045 db: Database session 

3046 resource_id: Resource ID 

3047 include_inactive: Whether to include inactive resources 

3048 

3049 Returns: 

3050 ResourceRead: The resource object 

3051 

3052 Raises: 

3053 ResourceNotFoundError: If the resource is not found 

3054 

3055 Example: 

3056 >>> from mcpgateway.services.resource_service import ResourceService 

3057 >>> from unittest.mock import MagicMock 

3058 >>> service = ResourceService() 

3059 >>> db = MagicMock() 

3060 >>> resource = MagicMock() 

3061 >>> db.execute.return_value.scalar_one_or_none.return_value = resource 

3062 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read') 

3063 >>> import asyncio 

3064 >>> asyncio.run(service.get_resource_by_id(db, "39334ce0ed2644d79ede8913a66930c9")) 

3065 'resource_read' 

3066 """ 

3067 query = select(DbResource).where(DbResource.id == resource_id) 

3068 

3069 if not include_inactive: 

3070 query = query.where(DbResource.enabled) 

3071 

3072 resource = db.execute(query).scalar_one_or_none() 

3073 

3074 if not resource: 

3075 if not include_inactive: 

3076 # Check if inactive resource exists 

3077 inactive_resource = db.execute(select(DbResource).where(DbResource.id == resource_id).where(not_(DbResource.enabled))).scalar_one_or_none() 

3078 

3079 if inactive_resource: 

3080 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive") 

3081 

3082 raise ResourceNotFoundError(f"Resource not found: {resource_id}") 

3083 

3084 resource_read = self.convert_resource_to_read(resource) 

3085 

3086 structured_logger.log( 

3087 level="INFO", 

3088 message="Resource retrieved successfully", 

3089 event_type="resource_viewed", 

3090 component="resource_service", 

3091 team_id=getattr(resource, "team_id", None), 

3092 resource_type="resource", 

3093 resource_id=str(resource.id), 

3094 custom_fields={ 

3095 "resource_uri": resource.uri, 

3096 "include_inactive": include_inactive, 

3097 }, 

3098 db=db, 

3099 ) 

3100 

3101 return resource_read 

3102 

3103 async def _notify_resource_activated(self, resource: DbResource) -> None: 

3104 """ 

3105 Notify subscribers of resource activation. 

3106 

3107 Args: 

3108 resource: Resource to activate 

3109 """ 

3110 event = { 

3111 "type": "resource_activated", 

3112 "data": { 

3113 "id": resource.id, 

3114 "uri": resource.uri, 

3115 "name": resource.name, 

3116 "enabled": True, 

3117 }, 

3118 "timestamp": datetime.now(timezone.utc).isoformat(), 

3119 } 

3120 await self._publish_event(event) 

3121 

3122 async def _notify_resource_deactivated(self, resource: DbResource) -> None: 

3123 """ 

3124 Notify subscribers of resource deactivation. 

3125 

3126 Args: 

3127 resource: Resource to deactivate 

3128 """ 

3129 event = { 

3130 "type": "resource_deactivated", 

3131 "data": { 

3132 "id": resource.id, 

3133 "uri": resource.uri, 

3134 "name": resource.name, 

3135 "enabled": False, 

3136 }, 

3137 "timestamp": datetime.now(timezone.utc).isoformat(), 

3138 } 

3139 await self._publish_event(event) 

3140 

3141 async def _notify_resource_deleted(self, resource_info: Dict[str, Any]) -> None: 

3142 """ 

3143 Notify subscribers of resource deletion. 

3144 

3145 Args: 

3146 resource_info: Dictionary of resource to delete 

3147 """ 

3148 event = { 

3149 "type": "resource_deleted", 

3150 "data": resource_info, 

3151 "timestamp": datetime.now(timezone.utc).isoformat(), 

3152 } 

3153 await self._publish_event(event) 

3154 

3155 async def _notify_resource_removed(self, resource: DbResource) -> None: 

3156 """ 

3157 Notify subscribers of resource removal. 

3158 

3159 Args: 

3160 resource: Resource to remove 

3161 """ 

3162 event = { 

3163 "type": "resource_removed", 

3164 "data": { 

3165 "id": resource.id, 

3166 "uri": resource.uri, 

3167 "name": resource.name, 

3168 "enabled": False, 

3169 }, 

3170 "timestamp": datetime.now(timezone.utc).isoformat(), 

3171 } 

3172 await self._publish_event(event) 

3173 

3174 async def subscribe_events(self) -> AsyncGenerator[Dict[str, Any], None]: 

3175 """Subscribe to Resource events via the EventService. 

3176 

3177 Yields: 

3178 Resource event messages. 

3179 """ 

3180 async for event in self._event_service.subscribe_events(): 

3181 yield event 

3182 

3183 def _detect_mime_type(self, uri: str, content: Union[str, bytes]) -> str: 

3184 """Detect mime type from URI and content. 

3185 

3186 Args: 

3187 uri: Resource URI 

3188 content: Resource content 

3189 

3190 Returns: 

3191 Detected mime type 

3192 """ 

3193 # Try from URI first 

3194 mime_type, _ = mimetypes.guess_type(uri) 

3195 if mime_type: 

3196 return mime_type 

3197 

3198 # Check content type 

3199 if isinstance(content, str): 

3200 return "text/plain" 

3201 

3202 return "application/octet-stream" 

3203 

3204 async def _read_template_resource(self, db: Session, uri: str, include_inactive: Optional[bool] = False) -> ResourceContent: 

3205 """ 

3206 Read a templated resource. 

3207 

3208 Args: 

3209 db: Database session. 

3210 uri: Template URI with parameters. 

3211 include_inactive: Whether to include inactive resources in DB lookups. 

3212 

3213 Returns: 

3214 ResourceContent: The resolved content from the matching template. 

3215 

3216 Raises: 

3217 ResourceNotFoundError: If no matching template is found. 

3218 ResourceError: For other template resolution errors. 

3219 NotImplementedError: If a binary template resource is encountered. 

3220 """ 

3221 # Find matching template # DRT BREAKPOINT 

3222 template = None 

3223 if not self._template_cache: 

3224 logger.info("_template_cache is empty, fetching exisitng resource templates") 

3225 resource_templates = await self.list_resource_templates(db=db, include_inactive=include_inactive) 

3226 for i in resource_templates: 

3227 self._template_cache[i.name] = i 

3228 for cached in self._template_cache.values(): 

3229 if self._uri_matches_template(uri, cached.uri_template): 

3230 template = cached 

3231 break 

3232 

3233 if template: 

3234 check_inactivity = db.execute(select(DbResource).where(DbResource.id == str(template.id)).where(not_(DbResource.enabled))).scalar_one_or_none() 

3235 if check_inactivity: 

3236 raise ResourceNotFoundError(f"Resource '{template.id}' exists but is inactive") 

3237 else: 

3238 raise ResourceNotFoundError(f"No template matches URI: {uri}") 

3239 

3240 try: 

3241 # Extract parameters 

3242 params = self._extract_template_params(uri, template.uri_template) 

3243 # Generate content 

3244 if template.mime_type and template.mime_type.startswith("text/"): 

3245 content = template.uri_template.format(**params) 

3246 return ResourceContent(type="resource", id=str(template.id) or None, uri=template.uri_template or None, mime_type=template.mime_type or None, text=content) 

3247 # # Handle binary template 

3248 raise NotImplementedError("Binary resource templates not yet supported") 

3249 

3250 except ResourceNotFoundError: 

3251 raise 

3252 except Exception as e: 

3253 raise ResourceError(f"Failed to process template: {str(e)}") from e 

3254 

3255 @staticmethod 

3256 @lru_cache(maxsize=256) 

3257 def _build_regex(template: str) -> re.Pattern: 

3258 """ 

3259 Convert a URI template into a compiled regular expression. 

3260 

3261 This parser supports a subset of RFC 6570–style templates for path 

3262 matching. It extracts path parameters and converts them into named 

3263 regex groups. 

3264 

3265 Supported template features: 

3266 - `{var}` 

3267 A simple path parameter. Matches a single URI segment 

3268 (i.e., any characters except `/`). 

3269 → Translates to `(?P<var>[^/]+)` 

3270 - `{var*}` 

3271 A wildcard parameter. Matches one or more URI segments, 

3272 including `/`. 

3273 → Translates to `(?P<var>.+)` 

3274 - `{?var1,var2}` 

3275 Query-parameter expressions. These are ignored when building 

3276 the regex for path matching and are stripped from the template. 

3277 

3278 Example: 

3279 Template: "files://root/{path*}/meta/{id}{?expand,debug}" 

3280 Regex: r"^files://root/(?P<path>.+)/meta/(?P<id>[^/]+)$" 

3281 

3282 Args: 

3283 template: The URI template string containing parameter expressions. 

3284 

3285 Returns: 

3286 A compiled regular expression (re.Pattern) that can be used to 

3287 match URIs and extract parameter values. 

3288 

3289 Note: 

3290 Results are cached using LRU cache (maxsize=256) to avoid 

3291 recompiling the same template pattern repeatedly. 

3292 """ 

3293 # Remove query parameter syntax for path matching 

3294 template_without_query = re.sub(r"\{\?[^}]+\}", "", template) 

3295 

3296 parts = re.split(r"(\{[^}]+\})", template_without_query) 

3297 pattern = "" 

3298 for part in parts: 

3299 if part.startswith("{") and part.endswith("}"): 

3300 name = part[1:-1] 

3301 if name.endswith("*"): 

3302 name = name[:-1] 

3303 pattern += f"(?P<{name}>.+)" 

3304 else: 

3305 pattern += f"(?P<{name}>[^/]+)" 

3306 else: 

3307 pattern += re.escape(part) 

3308 return re.compile(f"^{pattern}$") 

3309 

3310 @staticmethod 

3311 @lru_cache(maxsize=256) 

3312 def _compile_parse_pattern(template: str) -> parse.Parser: 

3313 """ 

3314 Compile a parse pattern for URI template parameter extraction. 

3315 

3316 Args: 

3317 template: The template pattern (e.g. "file:///{name}/{id}"). 

3318 

3319 Returns: 

3320 Compiled parse.Parser object. 

3321 

3322 Note: 

3323 Results are cached using LRU cache (maxsize=256) to avoid 

3324 recompiling the same template pattern repeatedly. 

3325 """ 

3326 return parse.compile(template) 

3327 

3328 def _extract_template_params(self, uri: str, template: str) -> Dict[str, str]: 

3329 """ 

3330 Extract parameters from a URI based on a template. 

3331 

3332 Args: 

3333 uri: The actual URI containing parameter values. 

3334 template: The template pattern (e.g. "file:///{name}/{id}"). 

3335 

3336 Returns: 

3337 Dict of parameter names and extracted values. 

3338 

3339 Note: 

3340 Uses cached compiled parse patterns for better performance. 

3341 """ 

3342 parser = self._compile_parse_pattern(template) 

3343 result = parser.parse(uri) 

3344 return result.named if result else {} 

3345 

3346 def _uri_matches_template(self, uri: str, template: str) -> bool: 

3347 """ 

3348 Check whether a URI matches a given template pattern. 

3349 

3350 Args: 

3351 uri: The URI to check. 

3352 template: The template pattern. 

3353 

3354 Returns: 

3355 True if the URI matches the template, otherwise False. 

3356 

3357 Note: 

3358 Uses cached compiled regex patterns for better performance. 

3359 """ 

3360 uri_path, _, _ = uri.partition("?") 

3361 regex = self._build_regex(template) 

3362 return bool(regex.match(uri_path)) 

3363 

3364 async def _notify_resource_added(self, resource: DbResource) -> None: 

3365 """ 

3366 Notify subscribers of resource addition. 

3367 

3368 Args: 

3369 resource: Resource to add 

3370 """ 

3371 event = { 

3372 "type": "resource_added", 

3373 "data": { 

3374 "id": resource.id, 

3375 "uri": resource.uri, 

3376 "name": resource.name, 

3377 "description": resource.description, 

3378 "enabled": resource.enabled, 

3379 }, 

3380 "timestamp": datetime.now(timezone.utc).isoformat(), 

3381 } 

3382 await self._publish_event(event) 

3383 

3384 async def _notify_resource_updated(self, resource: DbResource) -> None: 

3385 """ 

3386 Notify subscribers of resource update. 

3387 

3388 Args: 

3389 resource: Resource to update 

3390 """ 

3391 event = { 

3392 "type": "resource_updated", 

3393 "data": { 

3394 "id": resource.id, 

3395 "uri": resource.uri, 

3396 "enabled": resource.enabled, 

3397 }, 

3398 "timestamp": datetime.now(timezone.utc).isoformat(), 

3399 } 

3400 await self._publish_event(event) 

3401 

3402 async def _publish_event(self, event: Dict[str, Any]) -> None: 

3403 """ 

3404 Publish event to all subscribers via the EventService. 

3405 

3406 Args: 

3407 event: Event to publish 

3408 """ 

3409 await self._event_service.publish_event(event) 

3410 

3411 # --- Resource templates --- 

3412 async def list_resource_templates( 

3413 self, 

3414 db: Session, 

3415 include_inactive: bool = False, 

3416 user_email: Optional[str] = None, 

3417 token_teams: Optional[List[str]] = None, 

3418 tags: Optional[List[str]] = None, 

3419 visibility: Optional[str] = None, 

3420 ) -> List[ResourceTemplate]: 

3421 """ 

3422 List resource templates with visibility-based access control. 

3423 

3424 Args: 

3425 db: Database session 

3426 include_inactive: Whether to include inactive templates 

3427 user_email: Email of requesting user (for private visibility check) 

3428 token_teams: Teams from JWT. None = admin (no filtering), 

3429 [] = public-only (no owner access), [...] = team-scoped 

3430 tags (Optional[List[str]]): Filter resources by tags. If provided, only resources with at least one matching tag will be returned. 

3431 visibility (Optional[str]): Filter by visibility (private, team, public). 

3432 

3433 Returns: 

3434 List of ResourceTemplate objects the user has access to 

3435 

3436 Examples: 

3437 >>> from mcpgateway.services.resource_service import ResourceService 

3438 >>> from unittest.mock import MagicMock, patch 

3439 >>> service = ResourceService() 

3440 >>> db = MagicMock() 

3441 >>> template_obj = MagicMock() 

3442 >>> db.execute.return_value.scalars.return_value.all.return_value = [template_obj] 

3443 >>> with patch('mcpgateway.services.resource_service.ResourceTemplate') as MockResourceTemplate: 

3444 ... MockResourceTemplate.model_validate.return_value = 'resource_template' 

3445 ... import asyncio 

3446 ... result = asyncio.run(service.list_resource_templates(db)) 

3447 ... result == ['resource_template'] 

3448 True 

3449 """ 

3450 query = select(DbResource).where(DbResource.uri_template.isnot(None)) 

3451 

3452 if not include_inactive: 

3453 query = query.where(DbResource.enabled) 

3454 

3455 # Apply visibility filtering when token_teams is set (non-admin access) 

3456 if token_teams is not None: 

3457 # Check if this is a public-only token (empty teams array) 

3458 # Public-only tokens can ONLY see public templates - no owner access 

3459 is_public_only_token = len(token_teams) == 0 

3460 

3461 conditions = [DbResource.visibility == "public"] 

3462 

3463 # Only include owner access for non-public-only tokens with user_email 

3464 if not is_public_only_token and user_email: 

3465 conditions.append(DbResource.owner_email == user_email) 

3466 

3467 if token_teams: 

3468 conditions.append(and_(DbResource.team_id.in_(token_teams), DbResource.visibility.in_(["team", "public"]))) 

3469 

3470 query = query.where(or_(*conditions)) 

3471 

3472 # Cursor-based pagination logic can be implemented here in the future. 

3473 if visibility: 

3474 query = query.where(DbResource.visibility == visibility) 

3475 

3476 if tags: 

3477 query = query.where(json_contains_tag_expr(db, DbResource.tags, tags, match_any=True)) 

3478 

3479 templates = db.execute(query).scalars().all() 

3480 result = [ResourceTemplate.model_validate(t) for t in templates] 

3481 return result 

3482 

3483 # --- Metrics --- 

3484 async def aggregate_metrics(self, db: Session) -> ResourceMetrics: 

3485 """ 

3486 Aggregate metrics for all resource invocations across all resources. 

3487 

3488 Combines recent raw metrics (within retention period) with historical 

3489 hourly rollups for complete historical coverage. Uses in-memory caching 

3490 (10s TTL) to reduce database load under high request rates. 

3491 

3492 Args: 

3493 db: Database session 

3494 

3495 Returns: 

3496 ResourceMetrics: Aggregated metrics from raw + hourly rollup tables. 

3497 

3498 Examples: 

3499 >>> from mcpgateway.services.resource_service import ResourceService 

3500 >>> service = ResourceService() 

3501 >>> # Method exists and is callable 

3502 >>> callable(service.aggregate_metrics) 

3503 True 

3504 """ 

3505 # Check cache first (if enabled) 

3506 # First-Party 

3507 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel 

3508 

3509 if is_cache_enabled(): 

3510 cached = metrics_cache.get("resources") 

3511 if cached is not None: 

3512 return ResourceMetrics(**cached) 

3513 

3514 # Use combined raw + rollup query for full historical coverage 

3515 # First-Party 

3516 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel 

3517 

3518 result = aggregate_metrics_combined(db, "resource") 

3519 

3520 metrics = ResourceMetrics( 

3521 total_executions=result.total_executions, 

3522 successful_executions=result.successful_executions, 

3523 failed_executions=result.failed_executions, 

3524 failure_rate=result.failure_rate, 

3525 min_response_time=result.min_response_time, 

3526 max_response_time=result.max_response_time, 

3527 avg_response_time=result.avg_response_time, 

3528 last_execution_time=result.last_execution_time, 

3529 ) 

3530 

3531 # Cache the result as dict for serialization compatibility (if enabled) 

3532 if is_cache_enabled(): 

3533 metrics_cache.set("resources", metrics.model_dump()) 

3534 

3535 return metrics 

3536 

3537 async def reset_metrics(self, db: Session) -> None: 

3538 """ 

3539 Reset all resource metrics by deleting raw and hourly rollup records. 

3540 

3541 Args: 

3542 db: Database session 

3543 

3544 Examples: 

3545 >>> from mcpgateway.services.resource_service import ResourceService 

3546 >>> from unittest.mock import MagicMock 

3547 >>> service = ResourceService() 

3548 >>> db = MagicMock() 

3549 >>> db.execute = MagicMock() 

3550 >>> db.commit = MagicMock() 

3551 >>> import asyncio 

3552 >>> asyncio.run(service.reset_metrics(db)) 

3553 """ 

3554 db.execute(delete(ResourceMetric)) 

3555 db.execute(delete(ResourceMetricsHourly)) 

3556 db.commit() 

3557 

3558 # Invalidate metrics cache 

3559 # First-Party 

3560 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

3561 

3562 metrics_cache.invalidate("resources") 

3563 metrics_cache.invalidate_prefix("top_resources:") 

3564 

3565 

3566# Lazy singleton - created on first access, not at module import time. 

3567# This avoids instantiation when only exception classes are imported. 

3568_resource_service_instance = None # pylint: disable=invalid-name 

3569 

3570 

3571def __getattr__(name: str): 

3572 """Module-level __getattr__ for lazy singleton creation. 

3573 

3574 Args: 

3575 name: The attribute name being accessed. 

3576 

3577 Returns: 

3578 The resource_service singleton instance if name is "resource_service". 

3579 

3580 Raises: 

3581 AttributeError: If the attribute name is not "resource_service". 

3582 """ 

3583 global _resource_service_instance # pylint: disable=global-statement 

3584 if name == "resource_service": 

3585 if _resource_service_instance is None: 

3586 _resource_service_instance = ResourceService() 

3587 return _resource_service_instance 

3588 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")