Coverage for mcpgateway / services / resource_service.py: 100%
1154 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/resource_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Resource Service Implementation.
8This module implements resource management according to the MCP specification.
9It handles:
10- Resource registration and retrieval
11- Resource templates and URI handling
12- Resource subscriptions and updates
13- Content type management
14- Active/inactive resource management
16Examples:
17 >>> from mcpgateway.services.resource_service import ResourceService, ResourceError
18 >>> service = ResourceService()
19 >>> isinstance(service._event_service, EventService)
20 True
21"""
23# Standard
24import binascii
25from datetime import datetime, timezone
26from functools import lru_cache
27import mimetypes
28import os
29import re
30import ssl
31import time
32from typing import Any, AsyncGenerator, Dict, List, Optional, Union
33import uuid
35# Third-Party
36import httpx
37from mcp import ClientSession
38from mcp.client.sse import sse_client
39from mcp.client.streamable_http import streamablehttp_client
40import parse
41from pydantic import ValidationError
42from sqlalchemy import and_, delete, desc, not_, or_, select
43from sqlalchemy.exc import IntegrityError, OperationalError
44from sqlalchemy.orm import joinedload, Session
46# First-Party
47from mcpgateway.common.models import ResourceContent, ResourceTemplate, TextContent
48from mcpgateway.common.validators import SecurityValidator
49from mcpgateway.config import settings
50from mcpgateway.db import EmailTeam, fresh_db_session
51from mcpgateway.db import Gateway as DbGateway
52from mcpgateway.db import get_for_update
53from mcpgateway.db import Resource as DbResource
54from mcpgateway.db import ResourceMetric, ResourceMetricsHourly
55from mcpgateway.db import ResourceSubscription as DbSubscription
56from mcpgateway.db import server_resource_association
57from mcpgateway.observability import create_span
58from mcpgateway.schemas import ResourceCreate, ResourceMetrics, ResourceRead, ResourceSubscription, ResourceUpdate, TopPerformer
59from mcpgateway.services.audit_trail_service import get_audit_trail_service
60from mcpgateway.services.event_service import EventService
61from mcpgateway.services.logging_service import LoggingService
62from mcpgateway.services.mcp_session_pool import get_mcp_session_pool, TransportType
63from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge
64from mcpgateway.services.oauth_manager import OAuthManager
65from mcpgateway.services.observability_service import current_trace_id, ObservabilityService
66from mcpgateway.services.structured_logger import get_structured_logger
67from mcpgateway.utils.metrics_common import build_top_performers
68from mcpgateway.utils.pagination import unified_paginate
69from mcpgateway.utils.services_auth import decode_auth
70from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr
71from mcpgateway.utils.ssl_context_cache import get_cached_ssl_context
72from mcpgateway.utils.url_auth import apply_query_param_auth, sanitize_exception_message
73from mcpgateway.utils.validate_signature import validate_signature
75# Plugin support imports (conditional)
76try:
77 # First-Party
78 from mcpgateway.plugins.framework import GlobalContext, PluginContextTable, PluginManager, ResourceHookType, ResourcePostFetchPayload, ResourcePreFetchPayload
80 PLUGINS_AVAILABLE = True
81except ImportError:
82 PLUGINS_AVAILABLE = False
84# Cache import (lazy to avoid circular dependencies)
85_REGISTRY_CACHE = None
88def _get_registry_cache():
89 """Get registry cache singleton lazily.
91 Returns:
92 RegistryCache instance.
93 """
94 global _REGISTRY_CACHE # pylint: disable=global-statement
95 if _REGISTRY_CACHE is None:
96 # First-Party
97 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel
99 _REGISTRY_CACHE = registry_cache
100 return _REGISTRY_CACHE
103# Initialize logging service first
104logging_service = LoggingService()
105logger = logging_service.get_logger(__name__)
107# Initialize structured logger and audit trail for resource operations
108structured_logger = get_structured_logger("resource_service")
109audit_trail = get_audit_trail_service()
112class ResourceError(Exception):
113 """Base class for resource-related errors."""
116class ResourceNotFoundError(ResourceError):
117 """Raised when a requested resource is not found."""
120class ResourceURIConflictError(ResourceError):
121 """Raised when a resource URI conflicts with existing (active or inactive) resource."""
123 def __init__(self, uri: str, enabled: bool = True, resource_id: Optional[int] = None, visibility: str = "public") -> None:
124 """Initialize the error with resource information.
126 Args:
127 uri: The conflicting resource URI
128 enabled: Whether the existing resource is active
129 resource_id: ID of the existing resource if available
130 visibility: Visibility status of the resource
131 """
132 self.uri = uri
133 self.enabled = enabled
134 self.resource_id = resource_id
135 message = f"{visibility.capitalize()} Resource already exists with URI: {uri}"
136 logger.info(f"ResourceURIConflictError: {message}")
137 if not enabled:
138 message += f" (currently inactive, ID: {resource_id})"
139 super().__init__(message)
142class ResourceValidationError(ResourceError):
143 """Raised when resource validation fails."""
146class ResourceLockConflictError(ResourceError):
147 """Raised when a resource row is locked by another transaction.
149 Raises:
150 ResourceLockConflictError: When attempting to modify a resource that is
151 currently locked by another concurrent request.
152 """
155class ResourceService:
156 """Service for managing resources.
158 Handles:
159 - Resource registration and retrieval
160 - Resource templates and URIs
161 - Resource subscriptions
162 - Content type detection
163 - Active/inactive status management
164 """
166 def __init__(self) -> None:
167 """Initialize the resource service."""
168 self._event_service = EventService(channel_name="mcpgateway:resource_events")
169 self._template_cache: Dict[str, ResourceTemplate] = {}
170 self.oauth_manager = OAuthManager(request_timeout=int(os.getenv("OAUTH_REQUEST_TIMEOUT", "30")), max_retries=int(os.getenv("OAUTH_MAX_RETRIES", "3")))
172 # Initialize plugin manager if plugins are enabled in settings
173 self._plugin_manager = None
174 if PLUGINS_AVAILABLE:
175 try:
176 # Support env overrides for testability without reloading settings
177 env_flag = os.getenv("PLUGINS_ENABLED")
178 if env_flag is not None:
179 env_enabled = env_flag.strip().lower() in {"1", "true", "yes", "on"}
180 plugins_enabled = env_enabled
181 else:
182 plugins_enabled = settings.plugins_enabled
184 config_file = os.getenv("PLUGIN_CONFIG_FILE", settings.plugin_config_file)
186 if plugins_enabled:
187 self._plugin_manager = PluginManager(config_file)
188 logger.info(f"Plugin manager initialized for ResourceService with config: {config_file}")
189 except Exception as e:
190 logger.warning(f"Plugin manager initialization failed in ResourceService: {e}")
191 self._plugin_manager = None
193 # Initialize mime types
194 mimetypes.init()
196 async def initialize(self) -> None:
197 """Initialize the service."""
198 logger.info("Initializing resource service")
199 await self._event_service.initialize()
201 async def shutdown(self) -> None:
202 """Shutdown the service."""
203 # Clear subscriptions
204 await self._event_service.shutdown()
205 logger.info("Resource service shutdown complete")
207 async def get_top_resources(self, db: Session, limit: Optional[int] = 5, include_deleted: bool = False) -> List[TopPerformer]:
208 """Retrieve the top-performing resources based on execution count.
210 Queries the database to get resources with their metrics, ordered by the number of executions
211 in descending order. Combines recent raw metrics with historical hourly rollups for complete
212 historical coverage. Uses the resource URI as the name field for TopPerformer objects.
213 Returns a list of TopPerformer objects containing resource details and performance metrics.
214 Results are cached for performance.
216 Args:
217 db (Session): Database session for querying resource metrics.
218 limit (Optional[int]): Maximum number of resources to return. Defaults to 5.
219 include_deleted (bool): Whether to include deleted resources from rollups.
221 Returns:
222 List[TopPerformer]: A list of TopPerformer objects, each containing:
223 - id: Resource ID.
224 - name: Resource URI (used as the name field).
225 - execution_count: Total number of executions.
226 - avg_response_time: Average response time in seconds, or None if no metrics.
227 - success_rate: Success rate percentage, or None if no metrics.
228 - last_execution: Timestamp of the last execution, or None if no metrics.
229 """
230 # Check cache first (if enabled)
231 # First-Party
232 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel
234 effective_limit = limit or 5
235 cache_key = f"top_resources:{effective_limit}:include_deleted={include_deleted}"
237 if is_cache_enabled():
238 cached = metrics_cache.get(cache_key)
239 if cached is not None:
240 return cached
242 # Use combined query that includes both raw metrics and rollup data
243 # Use name_column="uri" to maintain backward compatibility (resources show URI as name)
244 # First-Party
245 from mcpgateway.services.metrics_query_service import get_top_performers_combined # pylint: disable=import-outside-toplevel
247 results = get_top_performers_combined(
248 db=db,
249 metric_type="resource",
250 entity_model=DbResource,
251 limit=effective_limit,
252 name_column="uri", # Resources use URI as display name
253 include_deleted=include_deleted,
254 )
255 top_performers = build_top_performers(results)
257 # Cache the result (if enabled)
258 if is_cache_enabled():
259 metrics_cache.set(cache_key, top_performers)
261 return top_performers
263 def convert_resource_to_read(self, resource: DbResource, include_metrics: bool = False) -> ResourceRead:
264 """
265 Converts a DbResource instance into a ResourceRead model, optionally including aggregated metrics.
267 Args:
268 resource (DbResource): The ORM instance of the resource.
269 include_metrics (bool): Whether to include metrics in the result. Defaults to False.
270 Set to False for list operations to avoid N+1 query issues.
272 Returns:
273 ResourceRead: The Pydantic model representing the resource, optionally including aggregated metrics.
275 Examples:
276 >>> from types import SimpleNamespace
277 >>> from datetime import datetime, timezone
278 >>> svc = ResourceService()
279 >>> now = datetime.now(timezone.utc)
280 >>> # Fake metrics
281 >>> m1 = SimpleNamespace(is_success=True, response_time=0.1, timestamp=now)
282 >>> m2 = SimpleNamespace(is_success=False, response_time=0.3, timestamp=now)
283 >>> r = SimpleNamespace(
284 ... id="ca627760127d409080fdefc309147e08", uri='res://x', name='R', description=None, mime_type='text/plain', size=123,
285 ... created_at=now, updated_at=now, enabled=True, tags=[{"id": "t", "label": "T"}], metrics=[m1, m2]
286 ... )
287 >>> out = svc.convert_resource_to_read(r, include_metrics=True)
288 >>> out.metrics.total_executions
289 2
290 >>> out.metrics.successful_executions
291 1
292 """
293 resource_dict = resource.__dict__.copy()
294 # Remove SQLAlchemy state and any pre-existing 'metrics' attribute
295 resource_dict.pop("_sa_instance_state", None)
296 resource_dict.pop("metrics", None)
298 # Ensure required base fields are present even if SQLAlchemy hasn't loaded them into __dict__ yet
299 resource_dict["id"] = getattr(resource, "id", resource_dict.get("id"))
300 resource_dict["uri"] = getattr(resource, "uri", resource_dict.get("uri"))
301 resource_dict["name"] = getattr(resource, "name", resource_dict.get("name"))
302 resource_dict["description"] = getattr(resource, "description", resource_dict.get("description"))
303 resource_dict["mime_type"] = getattr(resource, "mime_type", resource_dict.get("mime_type"))
304 resource_dict["size"] = getattr(resource, "size", resource_dict.get("size"))
305 resource_dict["created_at"] = getattr(resource, "created_at", resource_dict.get("created_at"))
306 resource_dict["updated_at"] = getattr(resource, "updated_at", resource_dict.get("updated_at"))
307 resource_dict["is_active"] = getattr(resource, "is_active", resource_dict.get("is_active"))
308 resource_dict["enabled"] = getattr(resource, "enabled", resource_dict.get("enabled"))
310 # Compute aggregated metrics from the resource's metrics list (only if requested)
311 if include_metrics:
312 total = len(resource.metrics) if hasattr(resource, "metrics") and resource.metrics is not None else 0
313 successful = sum(1 for m in resource.metrics if m.is_success) if total > 0 else 0
314 failed = sum(1 for m in resource.metrics if not m.is_success) if total > 0 else 0
315 failure_rate = (failed / total) if total > 0 else 0.0
316 min_rt = min((m.response_time for m in resource.metrics), default=None) if total > 0 else None
317 max_rt = max((m.response_time for m in resource.metrics), default=None) if total > 0 else None
318 avg_rt = (sum(m.response_time for m in resource.metrics) / total) if total > 0 else None
319 last_time = max((m.timestamp for m in resource.metrics), default=None) if total > 0 else None
321 resource_dict["metrics"] = {
322 "total_executions": total,
323 "successful_executions": successful,
324 "failed_executions": failed,
325 "failure_rate": failure_rate,
326 "min_response_time": min_rt,
327 "max_response_time": max_rt,
328 "avg_response_time": avg_rt,
329 "last_execution_time": last_time,
330 }
331 else:
332 resource_dict["metrics"] = None
334 raw_tags = resource.tags or []
335 normalized_tags = []
336 for tag in raw_tags:
337 if isinstance(tag, str):
338 normalized_tags.append(tag)
339 continue
340 if isinstance(tag, dict):
341 label = tag.get("label") or tag.get("name")
342 if label:
343 normalized_tags.append(label)
344 continue
345 label = getattr(tag, "label", None) or getattr(tag, "name", None)
346 if label:
347 normalized_tags.append(label)
348 resource_dict["tags"] = normalized_tags
349 resource_dict["team"] = getattr(resource, "team", None)
351 # Include metadata fields for proper API response
352 resource_dict["created_by"] = getattr(resource, "created_by", None)
353 resource_dict["modified_by"] = getattr(resource, "modified_by", None)
354 resource_dict["created_at"] = getattr(resource, "created_at", None)
355 resource_dict["updated_at"] = getattr(resource, "updated_at", None)
356 resource_dict["version"] = getattr(resource, "version", None)
357 return ResourceRead.model_validate(resource_dict)
359 def _get_team_name(self, db: Session, team_id: Optional[str]) -> Optional[str]:
360 """Retrieve the team name given a team ID.
362 Args:
363 db (Session): Database session for querying teams.
364 team_id (Optional[str]): The ID of the team.
366 Returns:
367 Optional[str]: The name of the team if found, otherwise None.
368 """
369 if not team_id:
370 return None
371 team = db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first()
372 db.commit() # Release transaction to avoid idle-in-transaction
373 return team.name if team else None
375 async def register_resource(
376 self,
377 db: Session,
378 resource: ResourceCreate,
379 created_by: Optional[str] = None,
380 created_from_ip: Optional[str] = None,
381 created_via: Optional[str] = None,
382 created_user_agent: Optional[str] = None,
383 import_batch_id: Optional[str] = None,
384 federation_source: Optional[str] = None,
385 team_id: Optional[str] = None,
386 owner_email: Optional[str] = None,
387 visibility: Optional[str] = "public",
388 ) -> ResourceRead:
389 """Register a new resource.
391 Args:
392 db: Database session
393 resource: Resource creation schema
394 created_by: User who created the resource
395 created_from_ip: IP address of the creator
396 created_via: Method used to create the resource (e.g., API, UI)
397 created_user_agent: User agent of the creator
398 import_batch_id: Optional batch ID for bulk imports
399 federation_source: Optional source of the resource if federated
400 team_id (Optional[str]): Team ID to assign the resource to.
401 owner_email (Optional[str]): Email of the user who owns this resource.
402 visibility (str): Resource visibility level (private, team, public).
404 Returns:
405 Created resource information
407 Raises:
408 IntegrityError: If a database integrity error occurs.
409 ResourceURIConflictError: If a resource with the same URI already exists.
410 ResourceError: For other resource registration errors
412 Examples:
413 >>> from mcpgateway.services.resource_service import ResourceService
414 >>> from unittest.mock import MagicMock, AsyncMock
415 >>> from mcpgateway.schemas import ResourceRead
416 >>> service = ResourceService()
417 >>> db = MagicMock()
418 >>> resource = MagicMock()
419 >>> db.execute.return_value.scalar_one_or_none.return_value = None
420 >>> db.add = MagicMock()
421 >>> db.commit = MagicMock()
422 >>> db.refresh = MagicMock()
423 >>> service._notify_resource_added = AsyncMock()
424 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read')
425 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read')
426 >>> import asyncio
427 >>> asyncio.run(service.register_resource(db, resource))
428 'resource_read'
429 """
430 try:
431 logger.info(f"Registering resource: {resource.uri}")
433 # Extract gateway_id from resource if present
434 gateway_id = getattr(resource, "gateway_id", None)
436 # Check for existing server with the same uri
437 if visibility.lower() == "public":
438 logger.info(f"visibility:: {visibility}")
439 # Check for existing public resource with the same uri and gateway_id
440 existing_resource = db.execute(select(DbResource).where(DbResource.uri == resource.uri, DbResource.visibility == "public", DbResource.gateway_id == gateway_id)).scalar_one_or_none()
441 if existing_resource:
442 raise ResourceURIConflictError(resource.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility)
443 elif visibility.lower() == "team" and team_id:
444 # Check for existing team resource with the same uri and gateway_id
445 existing_resource = db.execute(
446 select(DbResource).where(DbResource.uri == resource.uri, DbResource.visibility == "team", DbResource.team_id == team_id, DbResource.gateway_id == gateway_id)
447 ).scalar_one_or_none()
448 if existing_resource:
449 raise ResourceURIConflictError(resource.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility)
451 # Detect mime type if not provided
452 mime_type = resource.mime_type
453 if not mime_type:
454 mime_type = self._detect_mime_type(resource.uri, resource.content)
456 # Determine content storage
457 is_text = mime_type and mime_type.startswith("text/") or isinstance(resource.content, str)
459 # Create DB model
460 db_resource = DbResource(
461 uri=resource.uri,
462 name=resource.name,
463 description=resource.description,
464 mime_type=mime_type,
465 uri_template=resource.uri_template,
466 text_content=resource.content if is_text else None,
467 binary_content=(resource.content.encode() if is_text and isinstance(resource.content, str) else resource.content if isinstance(resource.content, bytes) else None),
468 size=len(resource.content) if resource.content else 0,
469 tags=resource.tags or [],
470 created_by=created_by,
471 created_from_ip=created_from_ip,
472 created_via=created_via,
473 created_user_agent=created_user_agent,
474 import_batch_id=import_batch_id,
475 federation_source=federation_source,
476 version=1,
477 # Team scoping fields - use schema values if provided, otherwise fallback to parameters
478 team_id=getattr(resource, "team_id", None) or team_id,
479 owner_email=getattr(resource, "owner_email", None) or owner_email or created_by,
480 # Endpoint visibility parameter takes precedence over schema default
481 visibility=visibility if visibility is not None else getattr(resource, "visibility", "public"),
482 gateway_id=gateway_id,
483 )
485 # Add to DB
486 db.add(db_resource)
487 db.commit()
488 db.refresh(db_resource)
490 # Notify subscribers
491 await self._notify_resource_added(db_resource)
493 logger.info(f"Registered resource: {resource.uri}")
495 # Structured logging: Audit trail for resource creation
496 audit_trail.log_action(
497 user_id=created_by or "system",
498 action="create_resource",
499 resource_type="resource",
500 resource_id=str(db_resource.id),
501 resource_name=db_resource.name,
502 user_email=owner_email,
503 team_id=team_id,
504 client_ip=created_from_ip,
505 user_agent=created_user_agent,
506 new_values={
507 "uri": db_resource.uri,
508 "name": db_resource.name,
509 "visibility": visibility,
510 "mime_type": db_resource.mime_type,
511 },
512 context={
513 "created_via": created_via,
514 "import_batch_id": import_batch_id,
515 "federation_source": federation_source,
516 },
517 db=db,
518 )
520 # Structured logging: Log successful resource creation
521 structured_logger.log(
522 level="INFO",
523 message="Resource created successfully",
524 event_type="resource_created",
525 component="resource_service",
526 user_id=created_by,
527 user_email=owner_email,
528 team_id=team_id,
529 resource_type="resource",
530 resource_id=str(db_resource.id),
531 custom_fields={
532 "resource_uri": db_resource.uri,
533 "resource_name": db_resource.name,
534 "visibility": visibility,
535 },
536 db=db,
537 )
539 db_resource.team = self._get_team_name(db, db_resource.team_id)
540 return self.convert_resource_to_read(db_resource)
541 except IntegrityError as ie:
542 logger.error(f"IntegrityErrors in group: {ie}")
544 # Structured logging: Log database integrity error
545 structured_logger.log(
546 level="ERROR",
547 message="Resource creation failed due to database integrity error",
548 event_type="resource_creation_failed",
549 component="resource_service",
550 user_id=created_by,
551 user_email=owner_email,
552 error=ie,
553 custom_fields={
554 "resource_uri": resource.uri,
555 },
556 db=db,
557 )
558 raise ie
559 except ResourceURIConflictError as rce:
560 logger.error(f"ResourceURIConflictError in group: {resource.uri}")
562 # Structured logging: Log URI conflict error
563 structured_logger.log(
564 level="WARNING",
565 message="Resource creation failed due to URI conflict",
566 event_type="resource_uri_conflict",
567 component="resource_service",
568 user_id=created_by,
569 user_email=owner_email,
570 custom_fields={
571 "resource_uri": resource.uri,
572 "visibility": visibility,
573 },
574 db=db,
575 )
576 raise rce
577 except Exception as e:
578 db.rollback()
580 # Structured logging: Log generic resource creation failure
581 structured_logger.log(
582 level="ERROR",
583 message="Resource creation failed",
584 event_type="resource_creation_failed",
585 component="resource_service",
586 user_id=created_by,
587 user_email=owner_email,
588 error=e,
589 custom_fields={
590 "resource_uri": resource.uri,
591 },
592 db=db,
593 )
594 raise ResourceError(f"Failed to register resource: {str(e)}")
596 async def register_resources_bulk(
597 self,
598 db: Session,
599 resources: List[ResourceCreate],
600 created_by: Optional[str] = None,
601 created_from_ip: Optional[str] = None,
602 created_via: Optional[str] = None,
603 created_user_agent: Optional[str] = None,
604 import_batch_id: Optional[str] = None,
605 federation_source: Optional[str] = None,
606 team_id: Optional[str] = None,
607 owner_email: Optional[str] = None,
608 visibility: Optional[str] = "public",
609 conflict_strategy: str = "skip",
610 ) -> Dict[str, Any]:
611 """Register multiple resources in bulk with a single commit.
613 This method provides significant performance improvements over individual
614 resource registration by:
615 - Using db.add_all() instead of individual db.add() calls
616 - Performing a single commit for all resources
617 - Batch conflict detection
618 - Chunking for very large imports (>500 items)
620 Args:
621 db: Database session
622 resources: List of resource creation schemas
623 created_by: Username who created these resources
624 created_from_ip: IP address of creator
625 created_via: Creation method (ui, api, import, federation)
626 created_user_agent: User agent of creation request
627 import_batch_id: UUID for bulk import operations
628 federation_source: Source gateway for federated resources
629 team_id: Team ID to assign the resources to
630 owner_email: Email of the user who owns these resources
631 visibility: Resource visibility level (private, team, public)
632 conflict_strategy: How to handle conflicts (skip, update, rename, fail)
634 Returns:
635 Dict with statistics:
636 - created: Number of resources created
637 - updated: Number of resources updated
638 - skipped: Number of resources skipped
639 - failed: Number of resources that failed
640 - errors: List of error messages
642 Raises:
643 ResourceError: If bulk registration fails critically
645 Examples:
646 >>> from mcpgateway.services.resource_service import ResourceService
647 >>> from unittest.mock import MagicMock
648 >>> service = ResourceService()
649 >>> db = MagicMock()
650 >>> resources = [MagicMock(), MagicMock()]
651 >>> import asyncio
652 >>> try:
653 ... result = asyncio.run(service.register_resources_bulk(db, resources))
654 ... except Exception:
655 ... pass
656 """
657 if not resources:
658 return {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []}
660 stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []}
662 # Process in chunks to avoid memory issues and SQLite parameter limits
663 chunk_size = 500
665 for chunk_start in range(0, len(resources), chunk_size):
666 chunk = resources[chunk_start : chunk_start + chunk_size]
668 try:
669 # Batch check for existing resources to detect conflicts
670 resource_uris = [resource.uri for resource in chunk]
672 # Build base query conditions
673 if visibility.lower() == "public":
674 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "public"]
675 elif visibility.lower() == "team" and team_id:
676 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "team", DbResource.team_id == team_id]
677 else:
678 # Private resources - check by owner
679 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "private", DbResource.owner_email == (owner_email or created_by)]
681 existing_resources_query = select(DbResource).where(*base_conditions)
682 existing_resources = db.execute(existing_resources_query).scalars().all()
683 # Use (uri, gateway_id) tuple as key for proper conflict detection with gateway_id scoping
684 existing_resources_map = {(r.uri, r.gateway_id): r for r in existing_resources}
686 resources_to_add = []
687 resources_to_update = []
689 for resource in chunk:
690 try:
691 # Use provided parameters or schema values
692 resource_team_id = team_id if team_id is not None else getattr(resource, "team_id", None)
693 resource_owner_email = owner_email or getattr(resource, "owner_email", None) or created_by
694 resource_visibility = visibility if visibility is not None else getattr(resource, "visibility", "public")
695 resource_gateway_id = getattr(resource, "gateway_id", None)
697 # Look up existing resource by (uri, gateway_id) tuple
698 existing_resource = existing_resources_map.get((resource.uri, resource_gateway_id))
700 if existing_resource:
701 # Handle conflict based on strategy
702 if conflict_strategy == "skip":
703 stats["skipped"] += 1
704 continue
705 if conflict_strategy == "update":
706 # Update existing resource
707 existing_resource.name = resource.name
708 existing_resource.description = resource.description
709 existing_resource.mime_type = resource.mime_type
710 existing_resource.size = getattr(resource, "size", None)
711 existing_resource.uri_template = resource.uri_template
712 existing_resource.tags = resource.tags or []
713 existing_resource.modified_by = created_by
714 existing_resource.modified_from_ip = created_from_ip
715 existing_resource.modified_via = created_via
716 existing_resource.modified_user_agent = created_user_agent
717 existing_resource.updated_at = datetime.now(timezone.utc)
718 existing_resource.version = (existing_resource.version or 1) + 1
720 resources_to_update.append(existing_resource)
721 stats["updated"] += 1
722 elif conflict_strategy == "rename":
723 # Create with renamed resource
724 new_uri = f"{resource.uri}_imported_{int(datetime.now().timestamp())}"
725 db_resource = DbResource(
726 uri=new_uri,
727 name=resource.name,
728 description=resource.description,
729 mime_type=resource.mime_type,
730 size=getattr(resource, "size", None),
731 uri_template=resource.uri_template,
732 gateway_id=getattr(resource, "gateway_id", None),
733 tags=resource.tags or [],
734 created_by=created_by,
735 created_from_ip=created_from_ip,
736 created_via=created_via,
737 created_user_agent=created_user_agent,
738 import_batch_id=import_batch_id,
739 federation_source=federation_source,
740 version=1,
741 team_id=resource_team_id,
742 owner_email=resource_owner_email,
743 visibility=resource_visibility,
744 )
745 resources_to_add.append(db_resource)
746 stats["created"] += 1
747 elif conflict_strategy == "fail":
748 stats["failed"] += 1
749 stats["errors"].append(f"Resource URI conflict: {resource.uri}")
750 continue
751 else:
752 # Create new resource
753 db_resource = DbResource(
754 uri=resource.uri,
755 name=resource.name,
756 description=resource.description,
757 mime_type=resource.mime_type,
758 size=getattr(resource, "size", None),
759 uri_template=resource.uri_template,
760 gateway_id=getattr(resource, "gateway_id", None),
761 tags=resource.tags or [],
762 created_by=created_by,
763 created_from_ip=created_from_ip,
764 created_via=created_via,
765 created_user_agent=created_user_agent,
766 import_batch_id=import_batch_id,
767 federation_source=federation_source,
768 version=1,
769 team_id=resource_team_id,
770 owner_email=resource_owner_email,
771 visibility=resource_visibility,
772 )
773 resources_to_add.append(db_resource)
774 stats["created"] += 1
776 except Exception as e:
777 stats["failed"] += 1
778 stats["errors"].append(f"Failed to process resource {resource.uri}: {str(e)}")
779 logger.warning(f"Failed to process resource {resource.uri} in bulk operation: {str(e)}")
780 continue
782 # Bulk add new resources
783 if resources_to_add:
784 db.add_all(resources_to_add)
786 # Commit the chunk
787 db.commit()
789 # Refresh resources for notifications and audit trail
790 for db_resource in resources_to_add:
791 db.refresh(db_resource)
792 # Notify subscribers
793 await self._notify_resource_added(db_resource)
795 # Log bulk audit trail entry
796 if resources_to_add or resources_to_update:
797 audit_trail.log_action(
798 user_id=created_by or "system",
799 action="bulk_create_resources" if resources_to_add else "bulk_update_resources",
800 resource_type="resource",
801 resource_id=import_batch_id or "bulk_operation",
802 resource_name=f"Bulk operation: {len(resources_to_add)} created, {len(resources_to_update)} updated",
803 user_email=owner_email,
804 team_id=team_id,
805 client_ip=created_from_ip,
806 user_agent=created_user_agent,
807 new_values={
808 "resources_created": len(resources_to_add),
809 "resources_updated": len(resources_to_update),
810 "visibility": visibility,
811 },
812 context={
813 "created_via": created_via,
814 "import_batch_id": import_batch_id,
815 "federation_source": federation_source,
816 "conflict_strategy": conflict_strategy,
817 },
818 db=db,
819 )
821 logger.info(f"Bulk registered {len(resources_to_add)} resources, updated {len(resources_to_update)} resources in chunk")
823 except Exception as e:
824 db.rollback()
825 logger.error(f"Failed to process chunk in bulk resource registration: {str(e)}")
826 stats["failed"] += len(chunk)
827 stats["errors"].append(f"Chunk processing failed: {str(e)}")
828 continue
830 # Final structured logging
831 structured_logger.log(
832 level="INFO",
833 message="Bulk resource registration completed",
834 event_type="resources_bulk_created",
835 component="resource_service",
836 user_id=created_by,
837 user_email=owner_email,
838 team_id=team_id,
839 resource_type="resource",
840 custom_fields={
841 "resources_created": stats["created"],
842 "resources_updated": stats["updated"],
843 "resources_skipped": stats["skipped"],
844 "resources_failed": stats["failed"],
845 "total_resources": len(resources),
846 "visibility": visibility,
847 "conflict_strategy": conflict_strategy,
848 },
849 db=db,
850 )
852 return stats
854 async def _check_resource_access(
855 self,
856 db: Session,
857 resource: DbResource,
858 user_email: Optional[str],
859 token_teams: Optional[List[str]],
860 ) -> bool:
861 """Check if user has access to a resource based on visibility rules.
863 Implements the same access control logic as list_resources() for consistency.
865 Args:
866 db: Database session for team membership lookup if needed.
867 resource: Resource ORM object with visibility, team_id, owner_email.
868 user_email: Email of the requesting user (None = unauthenticated).
869 token_teams: List of team IDs from token.
870 - None = unrestricted admin access
871 - [] = public-only token
872 - [...] = team-scoped token
874 Returns:
875 True if access is allowed, False otherwise.
876 """
877 visibility = getattr(resource, "visibility", "public")
878 resource_team_id = getattr(resource, "team_id", None)
879 resource_owner_email = getattr(resource, "owner_email", None)
881 # Public resources are accessible by everyone
882 if visibility == "public":
883 return True
885 # Admin bypass: token_teams=None AND user_email=None means unrestricted admin
886 # This happens when is_admin=True and no team scoping in token
887 if token_teams is None and user_email is None:
888 return True
890 # No user context (but not admin) = deny access to non-public resources
891 if not user_email:
892 return False
894 # Public-only tokens (empty teams array) can ONLY access public resources
895 is_public_only_token = token_teams is not None and len(token_teams) == 0
896 if is_public_only_token:
897 return False # Already checked public above
899 # Owner can always access their own resources
900 if resource_owner_email and resource_owner_email == user_email:
901 return True
903 # Team resources: check team membership (matches list_resources behavior)
904 if resource_team_id:
905 # Use token_teams if provided, otherwise look up from DB
906 if token_teams is not None:
907 team_ids = token_teams
908 else:
909 # First-Party
910 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel
912 team_service = TeamManagementService(db)
913 user_teams = await team_service.get_user_teams(user_email)
914 team_ids = [team.id for team in user_teams]
916 # Team/public visibility allows access if user is in the team
917 if visibility in ["team", "public"] and resource_team_id in team_ids:
918 return True
920 return False
922 async def list_resources(
923 self,
924 db: Session,
925 include_inactive: bool = False,
926 cursor: Optional[str] = None,
927 tags: Optional[List[str]] = None,
928 limit: Optional[int] = None,
929 page: Optional[int] = None,
930 per_page: Optional[int] = None,
931 user_email: Optional[str] = None,
932 team_id: Optional[str] = None,
933 visibility: Optional[str] = None,
934 token_teams: Optional[List[str]] = None,
935 ) -> Union[tuple[List[ResourceRead], Optional[str]], Dict[str, Any]]:
936 """
937 Retrieve a list of registered resources from the database with pagination support.
939 This method retrieves resources from the database and converts them into a list
940 of ResourceRead objects. It supports filtering out inactive resources based on the
941 include_inactive parameter and cursor-based pagination.
943 Args:
944 db (Session): The SQLAlchemy database session.
945 include_inactive (bool): If True, include inactive resources in the result.
946 Defaults to False.
947 cursor (Optional[str], optional): An opaque cursor token for pagination.
948 Opaque base64-encoded string containing last item's ID and created_at.
949 tags (Optional[List[str]]): Filter resources by tags. If provided, only resources with at least one matching tag will be returned.
950 limit (Optional[int]): Maximum number of resources to return. Use 0 for all resources (no limit).
951 If not specified, uses pagination_default_page_size.
952 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
953 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size.
954 user_email (Optional[str]): User email for team-based access control. If None, no access control is applied.
955 team_id (Optional[str]): Filter by specific team ID. Requires user_email for access validation.
956 visibility (Optional[str]): Filter by visibility (private, team, public).
957 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API token access
958 where the token scope should be respected instead of the user's full team memberships.
960 Returns:
961 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}}
962 If cursor is provided or neither: tuple of (list of ResourceRead objects, next_cursor).
964 Examples:
965 >>> from mcpgateway.services.resource_service import ResourceService
966 >>> from unittest.mock import MagicMock
967 >>> service = ResourceService()
968 >>> db = MagicMock()
969 >>> resource_read = MagicMock()
970 >>> service.convert_resource_to_read = MagicMock(return_value=resource_read)
971 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()]
972 >>> import asyncio
973 >>> resources, next_cursor = asyncio.run(service.list_resources(db))
974 >>> isinstance(resources, list)
975 True
977 With tags filter:
978 >>> db2 = MagicMock()
979 >>> bind = MagicMock()
980 >>> bind.dialect = MagicMock()
981 >>> bind.dialect.name = "sqlite" # or "postgresql" / "mysql"
982 >>> db2.get_bind.return_value = bind
983 >>> db2.execute.return_value.scalars.return_value.all.return_value = [MagicMock()]
984 >>> result2, _ = asyncio.run(service.list_resources(db2, tags=['api']))
985 >>> isinstance(result2, list)
986 True
987 """
988 # Check cache for first page only (cursor=None)
989 # Skip caching when:
990 # - user_email is provided (team-filtered results are user-specific)
991 # - token_teams is set (scoped access, e.g., public-only or team-scoped tokens)
992 # - page-based pagination is used
993 # This prevents cache poisoning where admin results could leak to public-only requests
994 cache = _get_registry_cache()
995 if cursor is None and user_email is None and token_teams is None and page is None:
996 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None, limit=limit)
997 cached = await cache.get("resources", filters_hash)
998 if cached is not None:
999 # Reconstruct ResourceRead objects from cached dicts
1000 cached_resources = [ResourceRead.model_validate(r) for r in cached["resources"]]
1001 return (cached_resources, cached.get("next_cursor"))
1003 # Build base query with ordering
1004 query = select(DbResource).where(DbResource.uri_template.is_(None)).order_by(desc(DbResource.created_at), desc(DbResource.id))
1006 # Apply active/inactive filter
1007 if not include_inactive:
1008 query = query.where(DbResource.enabled)
1010 # Apply team-based access control if user_email is provided OR token_teams is explicitly set
1011 # This ensures unauthenticated requests with token_teams=[] only see public resources
1012 if user_email is not None or token_teams is not None: # empty-string user_email -> public-only filtering (secure default)
1013 # Use token_teams if provided (for MCP/API token access), otherwise look up from DB
1014 if token_teams is not None:
1015 team_ids = token_teams
1016 elif user_email:
1017 # First-Party
1018 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel
1020 team_service = TeamManagementService(db)
1021 user_teams = await team_service.get_user_teams(user_email)
1022 team_ids = [team.id for team in user_teams]
1023 else:
1024 team_ids = []
1026 # Check if this is a public-only token (empty teams array)
1027 # Public-only tokens can ONLY see public resources - no owner access
1028 is_public_only_token = token_teams is not None and len(token_teams) == 0
1030 if team_id:
1031 # User requesting specific team - verify access
1032 if team_id not in team_ids:
1033 return ([], None) # No access to this team
1035 access_conditions = [
1036 and_(DbResource.team_id == team_id, DbResource.visibility.in_(["team", "public"])),
1037 ]
1038 # Only include owner access for non-public-only tokens with user_email
1039 if not is_public_only_token and user_email:
1040 access_conditions.append(and_(DbResource.team_id == team_id, DbResource.owner_email == user_email))
1041 query = query.where(or_(*access_conditions))
1042 else:
1043 # General access: public resources + team resources (+ owner resources if not public-only token)
1044 access_conditions = [
1045 DbResource.visibility == "public",
1046 ]
1047 # Only include owner access for non-public-only tokens with user_email
1048 if not is_public_only_token and user_email:
1049 access_conditions.append(DbResource.owner_email == user_email)
1050 if team_ids:
1051 access_conditions.append(and_(DbResource.team_id.in_(team_ids), DbResource.visibility.in_(["team", "public"])))
1053 query = query.where(or_(*access_conditions))
1055 # Apply visibility filter if specified
1056 if visibility:
1057 query = query.where(DbResource.visibility == visibility)
1059 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats)
1060 if tags:
1061 query = query.where(json_contains_tag_expr(db, DbResource.tags, tags, match_any=True))
1063 # Use unified pagination helper - handles both page and cursor pagination
1064 pag_result = await unified_paginate(
1065 db=db,
1066 query=query,
1067 page=page,
1068 per_page=per_page,
1069 cursor=cursor,
1070 limit=limit,
1071 base_url="/admin/resources", # Used for page-based links
1072 query_params={"include_inactive": include_inactive} if include_inactive else {},
1073 )
1075 next_cursor = None
1076 # Extract servers based on pagination type
1077 if page is not None:
1078 # Page-based: pag_result is a dict
1079 resources_db = pag_result["data"]
1080 else:
1081 # Cursor-based: pag_result is a tuple
1082 resources_db, next_cursor = pag_result
1084 # Fetch team names for the resources (common for both pagination types)
1085 team_ids_set = {s.team_id for s in resources_db if s.team_id}
1086 team_map = {}
1087 if team_ids_set:
1088 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(team_ids_set), EmailTeam.is_active.is_(True))).all()
1089 team_map = {team.id: team.name for team in teams}
1091 db.commit() # Release transaction to avoid idle-in-transaction
1093 # Convert to ResourceRead (common for both pagination types)
1094 result = []
1095 for s in resources_db:
1096 try:
1097 s.team = team_map.get(s.team_id) if s.team_id else None
1098 result.append(self.convert_resource_to_read(s, include_metrics=False))
1099 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
1100 logger.exception(f"Failed to convert resource {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}")
1101 # Continue with remaining resources instead of failing completely
1102 # Return appropriate format based on pagination type
1103 if page is not None:
1104 # Page-based format
1105 return {
1106 "data": result,
1107 "pagination": pag_result["pagination"],
1108 "links": pag_result["links"],
1109 }
1111 # Cursor-based format
1113 # Cache first page results - only for non-user-specific/non-scoped queries
1114 # Must match the same conditions as cache lookup to prevent cache poisoning
1115 if cursor is None and user_email is None and token_teams is None:
1116 try:
1117 cache_data = {"resources": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor}
1118 await cache.set("resources", cache_data, filters_hash)
1119 except AttributeError:
1120 pass # Skip caching if result objects don't support model_dump (e.g., in doctests)
1122 return (result, next_cursor)
1124 async def list_resources_for_user(
1125 self, db: Session, user_email: str, team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100
1126 ) -> List[ResourceRead]:
1127 """
1128 DEPRECATED: Use list_resources() with user_email parameter instead.
1130 List resources user has access to with team filtering.
1132 This method is maintained for backward compatibility but is no longer used.
1133 New code should call list_resources() with user_email, team_id, and visibility parameters.
1135 Args:
1136 db: Database session
1137 user_email: Email of the user requesting resources
1138 team_id: Optional team ID to filter by specific team
1139 visibility: Optional visibility filter (private, team, public)
1140 include_inactive: Whether to include inactive resources
1141 skip: Number of resources to skip for pagination
1142 limit: Maximum number of resources to return
1144 Returns:
1145 List[ResourceRead]: Resources the user has access to
1147 Examples:
1148 >>> from unittest.mock import MagicMock
1149 >>> import asyncio
1150 >>> service = ResourceService()
1151 >>> db = MagicMock()
1152 >>> # Patch out TeamManagementService so it doesn't run real logic
1153 >>> import mcpgateway.services.resource_service as _rs
1154 >>> class FakeTeamService:
1155 ... def __init__(self, db): pass
1156 ... async def get_user_teams(self, email): return []
1157 >>> _rs.TeamManagementService = FakeTeamService
1158 >>> # Force DB to return one fake row with a 'team' attribute
1159 >>> class FakeResource:
1160 ... team_id = None
1161 >>> fake_resource = FakeResource()
1162 >>> db.execute.return_value.scalars.return_value.all.return_value = [fake_resource]
1163 >>> service.convert_resource_to_read = MagicMock(return_value="converted")
1164 >>> asyncio.run(service.list_resources_for_user(db, "user@example.com"))
1165 ['converted']
1167 Without team_id (default/public access):
1168 >>> db2 = MagicMock()
1169 >>> class FakeResource2:
1170 ... team_id = None
1171 >>> fake_resource2 = FakeResource2()
1172 >>> db2.execute.return_value.scalars.return_value.all.return_value = [fake_resource2]
1173 >>> service.convert_resource_to_read = MagicMock(return_value="converted2")
1174 >>> out2 = asyncio.run(service.list_resources_for_user(db2, "user@example.com"))
1175 >>> out2
1176 ['converted2']
1177 """
1178 # First-Party
1179 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel
1181 # Build query following existing patterns from list_resources()
1182 team_service = TeamManagementService(db)
1183 user_teams = await team_service.get_user_teams(user_email)
1184 team_ids = [team.id for team in user_teams]
1186 # Build query following existing patterns from list_resources()
1187 query = select(DbResource)
1189 # Apply active/inactive filter
1190 if not include_inactive:
1191 query = query.where(DbResource.enabled)
1193 if team_id:
1194 if team_id not in team_ids:
1195 return [] # No access to team
1197 access_conditions = []
1198 # Filter by specific team
1199 access_conditions.append(and_(DbResource.team_id == team_id, DbResource.visibility.in_(["team", "public"])))
1201 access_conditions.append(and_(DbResource.team_id == team_id, DbResource.owner_email == user_email))
1203 query = query.where(or_(*access_conditions))
1204 else:
1205 # Get user's accessible teams
1206 # Build access conditions following existing patterns
1207 access_conditions = []
1208 # 1. User's personal resources (owner_email matches)
1209 access_conditions.append(DbResource.owner_email == user_email)
1210 # 2. Team resources where user is member
1211 if team_ids:
1212 access_conditions.append(and_(DbResource.team_id.in_(team_ids), DbResource.visibility.in_(["team", "public"])))
1213 # 3. Public resources (if visibility allows)
1214 access_conditions.append(DbResource.visibility == "public")
1216 query = query.where(or_(*access_conditions))
1218 # Apply visibility filter if specified
1219 if visibility:
1220 query = query.where(DbResource.visibility == visibility)
1222 # Apply pagination following existing patterns
1223 query = query.offset(skip).limit(limit)
1225 resources = db.execute(query).scalars().all()
1227 # Batch fetch team names to avoid N+1 queries
1228 resource_team_ids = {r.team_id for r in resources if r.team_id}
1229 team_map = {}
1230 if resource_team_ids:
1231 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(resource_team_ids), EmailTeam.is_active.is_(True))).all()
1232 team_map = {str(team.id): team.name for team in teams}
1234 db.commit() # Release transaction to avoid idle-in-transaction
1236 result = []
1237 for t in resources:
1238 try:
1239 t.team = team_map.get(str(t.team_id)) if t.team_id else None
1240 result.append(self.convert_resource_to_read(t, include_metrics=False))
1241 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
1242 logger.exception(f"Failed to convert resource {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}")
1243 # Continue with remaining resources instead of failing completely
1244 return result
1246 async def list_server_resources(
1247 self,
1248 db: Session,
1249 server_id: str,
1250 include_inactive: bool = False,
1251 user_email: Optional[str] = None,
1252 token_teams: Optional[List[str]] = None,
1253 ) -> List[ResourceRead]:
1254 """
1255 Retrieve a list of registered resources from the database.
1257 This method retrieves resources from the database and converts them into a list
1258 of ResourceRead objects. It supports filtering out inactive resources based on the
1259 include_inactive parameter. The cursor parameter is reserved for future pagination support
1260 but is currently not implemented.
1262 Args:
1263 db (Session): The SQLAlchemy database session.
1264 server_id (str): Server ID
1265 include_inactive (bool): If True, include inactive resources in the result.
1266 Defaults to False.
1267 user_email (Optional[str]): User email for visibility filtering. If None, no filtering applied.
1268 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API
1269 token access where the token scope should be respected.
1271 Returns:
1272 List[ResourceRead]: A list of resources represented as ResourceRead objects.
1274 Examples:
1275 >>> from mcpgateway.services.resource_service import ResourceService
1276 >>> from unittest.mock import MagicMock
1277 >>> service = ResourceService()
1278 >>> db = MagicMock()
1279 >>> resource_read = MagicMock()
1280 >>> service.convert_resource_to_read = MagicMock(return_value=resource_read)
1281 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()]
1282 >>> import asyncio
1283 >>> result = asyncio.run(service.list_server_resources(db, 'server1'))
1284 >>> isinstance(result, list)
1285 True
1286 >>> # Include inactive branch
1287 >>> result = asyncio.run(service.list_server_resources(db, 'server1', include_inactive=True))
1288 >>> isinstance(result, list)
1289 True
1290 """
1291 logger.debug(f"Listing resources for server_id: {server_id}, include_inactive: {include_inactive}")
1292 query = (
1293 select(DbResource)
1294 .join(server_resource_association, DbResource.id == server_resource_association.c.resource_id)
1295 .where(DbResource.uri_template.is_(None))
1296 .where(server_resource_association.c.server_id == server_id)
1297 )
1298 if not include_inactive:
1299 query = query.where(DbResource.enabled)
1301 # Add visibility filtering if user context OR token_teams provided
1302 # This ensures unauthenticated requests with token_teams=[] only see public resources
1303 if user_email is not None or token_teams is not None: # empty-string user_email -> public-only filtering (secure default)
1304 # Use token_teams if provided (for MCP/API token access), otherwise look up from DB
1305 if token_teams is not None:
1306 team_ids = token_teams
1307 elif user_email:
1308 # First-Party
1309 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel
1311 team_service = TeamManagementService(db)
1312 user_teams = await team_service.get_user_teams(user_email)
1313 team_ids = [team.id for team in user_teams]
1314 else:
1315 team_ids = []
1317 # Check if this is a public-only token (empty teams array)
1318 # Public-only tokens can ONLY see public resources - no owner access
1319 is_public_only_token = token_teams is not None and len(token_teams) == 0
1321 access_conditions = [
1322 DbResource.visibility == "public",
1323 ]
1324 # Only include owner access for non-public-only tokens with user_email
1325 if not is_public_only_token and user_email:
1326 access_conditions.append(DbResource.owner_email == user_email)
1327 if team_ids:
1328 access_conditions.append(and_(DbResource.team_id.in_(team_ids), DbResource.visibility.in_(["team", "public"])))
1329 query = query.where(or_(*access_conditions))
1331 # Cursor-based pagination logic can be implemented here in the future.
1332 resources = db.execute(query).scalars().all()
1334 # Batch fetch team names to avoid N+1 queries
1335 resource_team_ids = {r.team_id for r in resources if r.team_id}
1336 team_map = {}
1337 if resource_team_ids:
1338 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(resource_team_ids), EmailTeam.is_active.is_(True))).all()
1339 team_map = {str(team.id): team.name for team in teams}
1341 db.commit() # Release transaction to avoid idle-in-transaction
1343 result = []
1344 for t in resources:
1345 try:
1346 t.team = team_map.get(str(t.team_id)) if t.team_id else None
1347 result.append(self.convert_resource_to_read(t, include_metrics=False))
1348 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
1349 logger.exception(f"Failed to convert resource {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}")
1350 # Continue with remaining resources instead of failing completely
1351 return result
1353 async def _record_resource_metric(self, db: Session, resource: DbResource, start_time: float, success: bool, error_message: Optional[str]) -> None:
1354 """
1355 Records a metric for a resource access.
1357 Args:
1358 db: Database session
1359 resource: The resource that was accessed
1360 start_time: Monotonic start time of the access
1361 success: True if successful, False otherwise
1362 error_message: Error message if failed, None otherwise
1363 """
1364 end_time = time.monotonic()
1365 response_time = end_time - start_time
1367 metric = ResourceMetric(
1368 resource_id=resource.id,
1369 response_time=response_time,
1370 is_success=success,
1371 error_message=error_message,
1372 )
1373 db.add(metric)
1374 db.commit()
1376 async def _record_invoke_resource_metric(self, db: Session, resource_id: str, start_time: float, success: bool, error_message: Optional[str]) -> None:
1377 """
1378 Records a metric for invoking resource.
1380 Args:
1381 db: Database Session
1382 resource_id: unique identifier to access & invoke resource
1383 start_time: Monotonic start time of the access
1384 success: True if successful, False otherwise
1385 error_message: Error message if failed, None otherwise
1386 """
1387 end_time = time.monotonic()
1388 response_time = end_time - start_time
1390 metric = ResourceMetric(
1391 resource_id=resource_id,
1392 response_time=response_time,
1393 is_success=success,
1394 error_message=error_message,
1395 )
1396 db.add(metric)
1397 db.commit()
1399 def create_ssl_context(self, ca_certificate: str) -> ssl.SSLContext:
1400 """Create an SSL context with the provided CA certificate.
1402 Uses caching to avoid repeated SSL context creation for the same certificate.
1404 Args:
1405 ca_certificate: CA certificate in PEM format
1407 Returns:
1408 ssl.SSLContext: Configured SSL context
1409 """
1410 return get_cached_ssl_context(ca_certificate)
1412 async def invoke_resource( # pylint: disable=unused-argument
1413 self,
1414 db: Session,
1415 resource_id: str,
1416 resource_uri: str,
1417 resource_template_uri: Optional[str] = None,
1418 user_identity: Optional[Union[str, Dict[str, Any]]] = None,
1419 meta_data: Optional[Dict[str, Any]] = None, # Reserved for future MCP SDK support
1420 resource_obj: Optional[Any] = None,
1421 gateway_obj: Optional[Any] = None,
1422 ) -> Any:
1423 """
1424 Invoke a resource via its configured gateway using SSE or StreamableHTTP transport.
1426 This method determines the correct URI to invoke, loads the associated resource
1427 and gateway from the database, validates certificates if applicable, prepares
1428 authentication headers (OAuth, header-based, or none), and then connects to
1429 the gateway to read the resource using the appropriate transport.
1431 The function supports:
1432 - CA certificate validation / SSL context creation
1433 - OAuth client-credentials and authorization-code flow
1434 - Header-based auth
1435 - SSE transport gateways
1436 - StreamableHTTP transport gateways
1438 Args:
1439 db (Session):
1440 SQLAlchemy session for retrieving resource and gateway information.
1441 resource_id (str):
1442 ID of the resource to invoke.
1443 resource_uri (str):
1444 Direct resource URI configured for the resource.
1445 resource_template_uri (Optional[str]):
1446 URI from the template. Overrides `resource_uri` when provided.
1447 user_identity (Optional[Union[str, Dict[str, Any]]]):
1448 Identity of the user making the request, used for session pool isolation.
1449 Can be a string (email) or a dict with an 'email' key.
1450 Defaults to "anonymous" for pool isolation if not provided.
1451 OAuth token lookup always uses platform_admin_email (service account).
1452 meta_data (Optional[Dict[str, Any]]):
1453 Additional metadata to pass to the gateway during invocation.
1454 resource_obj (Optional[Any]):
1455 Pre-fetched DbResource object to skip the internal resource lookup query.
1456 gateway_obj (Optional[Any]):
1457 Pre-fetched DbGateway object to skip the internal gateway lookup query.
1459 Returns:
1460 Any: The text content returned by the remote resource, or ``None`` if the
1461 gateway could not be contacted or an error occurred.
1463 Raises:
1464 Exception: Any unhandled internal errors (e.g., DB issues).
1466 ---
1467 Doctest Examples
1468 ----------------
1470 >>> class FakeDB:
1471 ... "Simple DB stub returning fake resource and gateway rows."
1472 ... def execute(self, query):
1473 ... class Result:
1474 ... def scalar_one_or_none(self):
1475 ... # Return fake objects with the needed attributes
1476 ... class FakeResource:
1477 ... id = "res123"
1478 ... name = "Demo Resource"
1479 ... gateway_id = "gw1"
1480 ... return FakeResource()
1481 ... return Result()
1483 >>> class FakeGateway:
1484 ... id = "gw1"
1485 ... name = "Fake Gateway"
1486 ... url = "https://fake.gateway"
1487 ... ca_certificate = None
1488 ... ca_certificate_sig = None
1489 ... transport = "sse"
1490 ... auth_type = None
1491 ... auth_value = {}
1493 >>> # Monkeypatch the DB lookup for gateway
1494 >>> def fake_execute_gateway(self, query):
1495 ... class Result:
1496 ... def scalar_one_or_none(self_inner):
1497 ... return FakeGateway()
1498 ... return Result()
1500 >>> FakeDB.execute_gateway = fake_execute_gateway
1502 >>> class FakeService:
1503 ... "Service stub replacing network calls with predictable outputs."
1504 ... async def invoke_resource(self, db, resource_id, resource_uri, resource_template_uri=None):
1505 ... # Represent the behavior of a successful SSE response.
1506 ... return "hello from gateway"
1508 >>> svc = FakeService()
1509 >>> import asyncio
1510 >>> asyncio.run(svc.invoke_resource(FakeDB(), "res123", "/test"))
1511 'hello from gateway'
1513 ---
1514 Example: Template URI overrides resource URI
1515 --------------------------------------------
1517 >>> class FakeService2(FakeService):
1518 ... async def invoke_resource(self, db, resource_id, resource_uri, resource_template_uri=None):
1519 ... if resource_template_uri:
1520 ... return f"using template: {resource_template_uri}"
1521 ... return f"using direct: {resource_uri}"
1523 >>> svc2 = FakeService2()
1524 >>> asyncio.run(svc2.invoke_resource(FakeDB(), "res123", "/direct", "/template"))
1525 'using template: /template'
1527 """
1528 uri = None
1529 if resource_uri and resource_template_uri:
1530 uri = resource_template_uri
1531 elif resource_uri:
1532 uri = resource_uri
1534 logger.info(f"Invoking the resource: {uri}")
1535 gateway_id = None
1536 # Use pre-fetched resource if provided (avoids Q5 re-fetch)
1537 resource_info = resource_obj
1538 if resource_info is None:
1539 resource_info = db.execute(select(DbResource).where(DbResource.id == resource_id)).scalar_one_or_none()
1541 # Release transaction immediately after resource lookup to prevent idle-in-transaction
1542 # This is especially important when resource isn't found - we don't want to hold the transaction
1543 db.commit()
1545 # Normalize user_identity to string for session pool isolation
1546 # Use authenticated user for pool isolation, but keep platform_admin for OAuth token lookup
1547 if isinstance(user_identity, dict):
1548 pool_user_identity = user_identity.get("email") or "anonymous"
1549 elif isinstance(user_identity, str):
1550 pool_user_identity = user_identity
1551 else:
1552 pool_user_identity = "anonymous"
1554 # OAuth token lookup uses platform admin (service account) - not changed
1555 oauth_user_email = settings.platform_admin_email
1557 if resource_info:
1558 gateway_id = getattr(resource_info, "gateway_id", None)
1559 resource_name = getattr(resource_info, "name", None)
1560 # Use pre-fetched gateway if provided (avoids Q6 re-fetch)
1561 gateway = gateway_obj
1562 if gateway is None and gateway_id:
1563 gateway = db.execute(select(DbGateway).where(DbGateway.id == gateway_id)).scalar_one_or_none()
1565 # ═══════════════════════════════════════════════════════════════════════════
1566 # CRITICAL: Release DB transaction immediately after fetching resource/gateway
1567 # This ensures the transaction doesn't stay open if there's no gateway
1568 # or if the function returns early for any reason.
1569 # ═══════════════════════════════════════════════════════════════════════════
1570 db.commit()
1572 if gateway:
1574 start_time = time.monotonic()
1575 success = False
1576 error_message = None
1578 # Create database span for observability dashboard
1579 trace_id = current_trace_id.get()
1580 db_span_id = None
1581 db_span_ended = False
1582 observability_service = ObservabilityService() if trace_id else None
1584 if trace_id and observability_service:
1585 try:
1586 db_span_id = observability_service.start_span(
1587 db=db,
1588 trace_id=trace_id,
1589 name="invoke.resource",
1590 attributes={
1591 "resource.name": resource_name if resource_name else "unknown",
1592 "resource.id": str(resource_id) if resource_id else "unknown",
1593 "resource.uri": str(uri) or "unknown",
1594 "gateway.transport": getattr(gateway, "transport") or "uknown",
1595 "gateway.url": getattr(gateway, "url") or "unknown",
1596 },
1597 )
1598 logger.debug(f"✓ Created resource.read span: {db_span_id} for resource: {resource_id} & {uri}")
1599 except Exception as e:
1600 logger.warning(f"Failed to start the observability span for invoking resource: {e}")
1601 db_span_id = None
1603 with create_span(
1604 "invoke.resource",
1605 {
1606 "resource.name": resource_name if resource_name else "unknown",
1607 "resource.id": str(resource_id) if resource_id else "unknown",
1608 "resource.uri": str(uri) or "unknown",
1609 "gateway.transport": getattr(gateway, "transport") or "uknown",
1610 "gateway.url": getattr(gateway, "url") or "unknown",
1611 },
1612 ) as span:
1613 valid = False
1614 if gateway.ca_certificate:
1615 if settings.enable_ed25519_signing:
1616 public_key_pem = settings.ed25519_public_key
1617 valid = validate_signature(gateway.ca_certificate.encode(), gateway.ca_certificate_sig, public_key_pem)
1618 else:
1619 valid = True
1621 if valid:
1622 ssl_context = self.create_ssl_context(gateway.ca_certificate)
1623 else:
1624 ssl_context = None
1626 def _get_httpx_client_factory(
1627 headers: dict[str, str] | None = None,
1628 timeout: httpx.Timeout | None = None,
1629 auth: httpx.Auth | None = None,
1630 ) -> httpx.AsyncClient:
1631 """Factory function to create httpx.AsyncClient with optional CA certificate.
1633 Args:
1634 headers: Optional headers for the client
1635 timeout: Optional timeout for the client
1636 auth: Optional auth for the client
1638 Returns:
1639 httpx.AsyncClient: Configured HTTPX async client
1640 """
1641 # First-Party
1642 from mcpgateway.services.http_client_service import get_default_verify, get_http_timeout # pylint: disable=import-outside-toplevel
1644 return httpx.AsyncClient(
1645 verify=ssl_context if ssl_context else get_default_verify(), # pylint: disable=cell-var-from-loop
1646 follow_redirects=True,
1647 headers=headers,
1648 timeout=timeout if timeout else get_http_timeout(),
1649 auth=auth,
1650 limits=httpx.Limits(
1651 max_connections=settings.httpx_max_connections,
1652 max_keepalive_connections=settings.httpx_max_keepalive_connections,
1653 keepalive_expiry=settings.httpx_keepalive_expiry,
1654 ),
1655 )
1657 try:
1658 # ═══════════════════════════════════════════════════════════════════════════
1659 # Extract gateway data to local variables BEFORE OAuth handling
1660 # OAuth client_credentials flow makes network calls, so we must release
1661 # the DB transaction first to prevent idle-in-transaction during network I/O
1662 # ═══════════════════════════════════════════════════════════════════════════
1663 gateway_url = gateway.url
1664 gateway_transport = gateway.transport
1665 gateway_auth_type = gateway.auth_type
1666 gateway_auth_value = gateway.auth_value
1667 gateway_oauth_config = gateway.oauth_config
1668 gateway_name = gateway.name
1669 gateway_auth_query_params = getattr(gateway, "auth_query_params", None)
1671 # Apply query param auth to URL if applicable
1672 auth_query_params_decrypted: Optional[Dict[str, str]] = None
1673 if gateway_auth_type == "query_param" and gateway_auth_query_params:
1674 auth_query_params_decrypted = {}
1675 for param_key, encrypted_value in gateway_auth_query_params.items():
1676 if encrypted_value:
1677 try:
1678 decrypted = decode_auth(encrypted_value)
1679 auth_query_params_decrypted[param_key] = decrypted.get(param_key, "")
1680 except Exception: # noqa: S110 - intentionally skip failed decryptions
1681 # Silently skip params that fail decryption (corrupted or old key)
1682 logger.debug(f"Failed to decrypt query param '{param_key}' for resource")
1683 if auth_query_params_decrypted:
1684 gateway_url = apply_query_param_auth(gateway_url, auth_query_params_decrypted)
1686 # ═══════════════════════════════════════════════════════════════════════════
1687 # CRITICAL: Release DB connection back to pool BEFORE making HTTP/OAuth calls
1688 # This prevents connection pool exhaustion during slow upstream requests.
1689 # OAuth client_credentials flow makes network calls to token endpoints.
1690 # All needed data has been extracted to local variables above.
1691 # ═══════════════════════════════════════════════════════════════════════════
1692 db.commit() # End read-only transaction cleanly
1693 db.close()
1695 # Handle different authentication types (AFTER DB release)
1696 headers = {}
1697 if gateway_auth_type == "oauth" and gateway_oauth_config:
1698 grant_type = gateway_oauth_config.get("grant_type", "client_credentials")
1700 if grant_type == "authorization_code":
1701 # For Authorization Code flow, try to get stored tokens
1702 try:
1703 # First-Party
1704 from mcpgateway.services.token_storage_service import TokenStorageService # pylint: disable=import-outside-toplevel
1706 # Use fresh DB session for token lookup (original db was closed)
1707 with fresh_db_session() as token_db:
1708 token_storage = TokenStorageService(token_db)
1709 access_token: str = await token_storage.get_user_token(gateway_id, oauth_user_email)
1711 if access_token:
1712 headers["Authorization"] = f"Bearer {access_token}"
1713 else:
1714 if span:
1715 span.set_attribute("health.status", "unhealthy")
1716 span.set_attribute("error.message", "No valid OAuth token for user")
1718 except Exception as e:
1719 logger.error(f"Failed to obtain stored OAuth token for gateway {gateway_name}: {e}")
1720 if span:
1721 span.set_attribute("health.status", "unhealthy")
1722 span.set_attribute("error.message", "Failed to obtain stored OAuth token")
1723 else:
1724 # For Client Credentials flow, get token directly (makes network calls)
1725 try:
1726 access_token: str = await self.oauth_manager.get_access_token(gateway_oauth_config)
1727 headers["Authorization"] = f"Bearer {access_token}"
1728 except Exception as e:
1729 if span:
1730 span.set_attribute("health.status", "unhealthy")
1731 span.set_attribute("error.message", str(e))
1732 else:
1733 # Handle non-OAuth authentication (existing logic)
1734 auth_data = gateway_auth_value or {}
1735 if isinstance(auth_data, str):
1736 headers = decode_auth(auth_data)
1737 elif isinstance(auth_data, dict):
1738 headers = {str(k): str(v) for k, v in auth_data.items()}
1739 else:
1740 headers = {}
1742 async def connect_to_sse_session(server_url: str, uri: str, authentication: Optional[Dict[str, str]] = None) -> str | None:
1743 """
1744 Connect to an SSE-based gateway and retrieve the text content of a resource.
1746 This helper establishes an SSE (Server-Sent Events) session with the remote
1747 gateway, initializes a `ClientSession`, invokes `read_resource()` for the
1748 given URI, and returns the textual content from the first item in the
1749 response's `contents` list.
1751 If any error occurs (network failure, unexpected response format, session
1752 initialization failure, etc.), the method logs the exception and returns
1753 ``None`` instead of raising.
1755 Note:
1756 MCP SDK 1.25.0 read_resource() does not support meta parameter.
1757 When the SDK adds support, meta_data can be added back here.
1759 Args:
1760 server_url (str):
1761 The base URL of the SSE gateway to connect to.
1762 uri (str):
1763 The resource URI that should be requested from the gateway.
1764 authentication (Optional[Dict[str, str]]):
1765 Optional dictionary of headers (e.g., OAuth Bearer tokens) to
1766 include in the SSE connection request. Defaults to an empty
1767 dictionary when not provided.
1769 Returns:
1770 str | None:
1771 The text content returned by the remote resource, or ``None`` if the
1772 SSE connection fails or the response is invalid.
1774 Notes:
1775 - This function assumes the SSE client context manager yields:
1776 ``(read_stream, write_stream, get_session_id)``.
1777 - The expected response object from `session.read_resource()` must have a
1778 `contents` attribute containing a list, where the first element has a
1779 `text` attribute.
1780 """
1781 if authentication is None:
1782 authentication = {}
1783 try:
1784 # Use session pool if enabled for 10-20x latency improvement
1785 use_pool = False
1786 pool = None
1787 if settings.mcp_session_pool_enabled:
1788 try:
1789 pool = get_mcp_session_pool()
1790 use_pool = True
1791 except RuntimeError:
1792 # Pool not initialized (e.g., in tests), fall back to per-call sessions
1793 pass
1795 if use_pool and pool is not None:
1796 async with pool.session(
1797 url=server_url,
1798 headers=authentication,
1799 transport_type=TransportType.SSE,
1800 httpx_client_factory=_get_httpx_client_factory,
1801 user_identity=pool_user_identity,
1802 gateway_id=gateway_id,
1803 ) as pooled:
1804 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter
1805 resource_response = await pooled.session.read_resource(uri=uri)
1806 return getattr(getattr(resource_response, "contents")[0], "text")
1807 else:
1808 # Fallback to per-call sessions when pool disabled or not initialized
1809 async with sse_client(url=server_url, headers=authentication, timeout=settings.health_check_timeout, httpx_client_factory=_get_httpx_client_factory) as (
1810 read_stream,
1811 write_stream,
1812 _get_session_id,
1813 ):
1814 async with ClientSession(read_stream, write_stream) as session:
1815 _ = await session.initialize()
1816 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter
1817 resource_response = await session.read_resource(uri=uri)
1818 return getattr(getattr(resource_response, "contents")[0], "text")
1819 except Exception as e:
1820 # Sanitize error message to prevent URL secrets from leaking in logs
1821 sanitized_error = sanitize_exception_message(str(e), auth_query_params_decrypted)
1822 logger.debug(f"Exception while connecting to sse gateway: {sanitized_error}")
1823 return None
1825 async def connect_to_streamablehttp_server(server_url: str, uri: str, authentication: Optional[Dict[str, str]] = None) -> str | None:
1826 """
1827 Connect to a StreamableHTTP gateway and retrieve the text content of a resource.
1829 This helper establishes a StreamableHTTP client session with the specified
1830 gateway, initializes a `ClientSession`, invokes `read_resource()` for the
1831 given URI, and returns the textual content from the first element in the
1832 response's `contents` list.
1834 If any exception occurs during connection, session initialization, or
1835 resource reading, the function logs the error and returns ``None`` instead
1836 of propagating the exception.
1838 Note:
1839 MCP SDK 1.25.0 read_resource() does not support meta parameter.
1840 When the SDK adds support, meta_data can be added back here.
1842 Args:
1843 server_url (str):
1844 The endpoint URL of the StreamableHTTP gateway.
1845 uri (str):
1846 The resource URI to request from the gateway.
1847 authentication (Optional[Dict[str, str]]):
1848 Optional dictionary of authentication headers (e.g., API keys or
1849 Bearer tokens). Defaults to an empty dictionary when not provided.
1851 Returns:
1852 str | None:
1853 The text content returned by the StreamableHTTP resource, or ``None``
1854 if the connection fails or the response format is invalid.
1856 Notes:
1857 - The `streamablehttp_client` context manager must yield a tuple:
1858 ``(read_stream, write_stream, get_session_id)``.
1859 - The expected `resource_response` returned by ``session.read_resource()``
1860 must contain a `contents` list, whose first element exposes a `text`
1861 attribute.
1862 """
1863 if authentication is None:
1864 authentication = {}
1865 try:
1866 # Use session pool if enabled for 10-20x latency improvement
1867 use_pool = False
1868 pool = None
1869 if settings.mcp_session_pool_enabled:
1870 try:
1871 pool = get_mcp_session_pool()
1872 use_pool = True
1873 except RuntimeError:
1874 # Pool not initialized (e.g., in tests), fall back to per-call sessions
1875 pass
1877 if use_pool and pool is not None:
1878 async with pool.session(
1879 url=server_url,
1880 headers=authentication,
1881 transport_type=TransportType.STREAMABLE_HTTP,
1882 httpx_client_factory=_get_httpx_client_factory,
1883 user_identity=pool_user_identity,
1884 gateway_id=gateway_id,
1885 ) as pooled:
1886 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter
1887 resource_response = await pooled.session.read_resource(uri=uri)
1888 return getattr(getattr(resource_response, "contents")[0], "text")
1889 else:
1890 # Fallback to per-call sessions when pool disabled or not initialized
1891 async with streamablehttp_client(url=server_url, headers=authentication, timeout=settings.health_check_timeout, httpx_client_factory=_get_httpx_client_factory) as (
1892 read_stream,
1893 write_stream,
1894 _get_session_id,
1895 ):
1896 async with ClientSession(read_stream, write_stream) as session:
1897 _ = await session.initialize()
1898 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter
1899 resource_response = await session.read_resource(uri=uri)
1900 return getattr(getattr(resource_response, "contents")[0], "text")
1901 except Exception as e:
1902 # Sanitize error message to prevent URL secrets from leaking in logs
1903 sanitized_error = sanitize_exception_message(str(e), auth_query_params_decrypted)
1904 logger.debug(f"Exception while connecting to streamablehttp gateway: {sanitized_error}")
1905 return None
1907 if span:
1908 span.set_attribute("success", True)
1909 span.set_attribute("duration.ms", (time.monotonic() - start_time) * 1000)
1911 resource_text = ""
1912 if (gateway_transport).lower() == "sse":
1913 # Note: meta_data not passed - MCP SDK 1.25.0 read_resource() doesn't support it
1914 resource_text = await connect_to_sse_session(server_url=gateway_url, authentication=headers, uri=uri)
1915 else:
1916 # Note: meta_data not passed - MCP SDK 1.25.0 read_resource() doesn't support it
1917 resource_text = await connect_to_streamablehttp_server(server_url=gateway_url, authentication=headers, uri=uri)
1918 success = True # Mark as successful before returning
1919 return resource_text
1920 except Exception as e:
1921 success = False
1922 error_message = str(e)
1923 raise
1924 finally:
1925 if resource_text:
1926 try:
1927 # First-Party
1928 from mcpgateway.services.metrics_buffer_service import get_metrics_buffer_service # pylint: disable=import-outside-toplevel
1930 metrics_buffer = get_metrics_buffer_service()
1931 metrics_buffer.record_resource_metric(
1932 resource_id=resource_id,
1933 start_time=start_time,
1934 success=success,
1935 error_message=error_message,
1936 )
1937 except Exception as metrics_error:
1938 logger.warning(f"Failed to invoke resource metric: {metrics_error}")
1940 # End Invoke resource span for Observability dashboard
1941 # NOTE: Use fresh_db_session() since the original db was released
1942 # before making HTTP calls to prevent connection pool exhaustion
1943 if db_span_id and observability_service and not db_span_ended:
1944 try:
1945 with fresh_db_session() as fresh_db:
1946 observability_service.end_span(
1947 db=fresh_db,
1948 span_id=db_span_id,
1949 status="ok" if success else "error",
1950 status_message=error_message if error_message else None,
1951 )
1952 db_span_ended = True
1953 logger.debug(f"✓ Ended invoke.resource span: {db_span_id}")
1954 except Exception as e:
1955 logger.warning(f"Failed to end observability span for invoking resource: {e}")
1957 async def read_resource(
1958 self,
1959 db: Session,
1960 resource_id: Optional[Union[int, str]] = None,
1961 resource_uri: Optional[str] = None,
1962 request_id: Optional[str] = None,
1963 user: Optional[str] = None,
1964 server_id: Optional[str] = None,
1965 include_inactive: bool = False,
1966 token_teams: Optional[List[str]] = None,
1967 plugin_context_table: Optional[PluginContextTable] = None,
1968 plugin_global_context: Optional[GlobalContext] = None,
1969 meta_data: Optional[Dict[str, Any]] = None,
1970 ) -> ResourceContent:
1971 """Read a resource's content with plugin hook support.
1973 Args:
1974 db: Database session.
1975 resource_id: Optional ID of the resource to read.
1976 resource_uri: Optional URI of the resource to read.
1977 request_id: Optional request ID for tracing.
1978 user: Optional user email for authorization checks.
1979 server_id: Optional server ID for server scoping enforcement.
1980 include_inactive: Whether to include inactive resources. Defaults to False.
1981 token_teams: Optional list of team IDs from token for authorization.
1982 None = unrestricted admin, [] = public-only, [...] = team-scoped.
1983 plugin_context_table: Optional plugin context table from previous hooks for cross-hook state sharing.
1984 plugin_global_context: Optional global context from middleware for consistency across hooks.
1985 meta_data: Optional metadata dictionary to pass to the gateway during resource reading.
1987 Returns:
1988 Resource content object
1990 Raises:
1991 ResourceNotFoundError: If resource not found or access denied
1992 ResourceError: If blocked by plugin
1993 PluginError: If encounters issue with plugin
1994 PluginViolationError: If plugin violated the request. Example - In case of OPA plugin, if the request is denied by policy.
1995 ValueError: If neither resource_id nor resource_uri is provided
1997 Examples:
1998 >>> from mcpgateway.common.models import ResourceContent
1999 >>> from mcpgateway.services.resource_service import ResourceService
2000 >>> from unittest.mock import MagicMock, PropertyMock
2001 >>> service = ResourceService()
2002 >>> db = MagicMock()
2003 >>> uri = 'http://example.com/resource.txt'
2004 >>> mock_resource = MagicMock()
2005 >>> mock_resource.id = 123
2006 >>> mock_resource.uri = uri
2007 >>> type(mock_resource).content = PropertyMock(return_value='test')
2008 >>> db.execute.return_value.scalar_one_or_none.return_value = mock_resource
2009 >>> db.get.return_value = mock_resource
2010 >>> import asyncio
2011 >>> result = asyncio.run(service.read_resource(db, resource_uri=uri))
2012 >>> result.__class__.__name__ == 'ResourceContent'
2013 True
2015 Not found case returns ResourceNotFoundError:
2017 >>> db2 = MagicMock()
2018 >>> db2.execute.return_value.scalar_one_or_none.return_value = None
2019 >>> db2.get.return_value = None
2020 >>> import asyncio
2021 >>> # Disable path validation for doctest
2022 >>> import mcpgateway.config
2023 >>> old_val = getattr(mcpgateway.config.settings, 'experimental_validate_io', False)
2024 >>> mcpgateway.config.settings.experimental_validate_io = False
2025 >>> def _nf():
2026 ... try:
2027 ... asyncio.run(service.read_resource(db2, resource_uri='abc'))
2028 ... except ResourceNotFoundError:
2029 ... return True
2030 >>> result = _nf()
2031 >>> mcpgateway.config.settings.experimental_validate_io = old_val
2032 >>> result
2033 True
2034 """
2035 start_time = time.monotonic()
2036 success = False
2037 error_message = None
2038 resource_db = None
2039 resource_db_gateway = None # Only set when eager-loaded via Q2's joinedload
2040 content = None
2041 uri = resource_uri or "unknown"
2042 if resource_id:
2043 resource_db = db.get(DbResource, resource_id, options=[joinedload(DbResource.gateway)])
2044 if resource_db:
2045 uri = resource_db.uri
2046 resource_db_gateway = resource_db.gateway # Eager-loaded, safe to access
2047 # Check enabled status in Python (avoids redundant Q3/Q4 re-fetches)
2048 if not include_inactive and not resource_db.enabled:
2049 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive")
2050 content = resource_db.content
2051 else:
2052 uri = None
2054 # Create database span for observability dashboard
2055 trace_id = current_trace_id.get()
2056 db_span_id = None
2057 db_span_ended = False
2058 observability_service = ObservabilityService() if trace_id else None
2060 if trace_id and observability_service:
2061 try:
2062 db_span_id = observability_service.start_span(
2063 db=db,
2064 trace_id=trace_id,
2065 name="resource.read",
2066 attributes={
2067 "resource.uri": str(resource_uri) if resource_uri else "unknown",
2068 "user": user or "anonymous",
2069 "server_id": server_id,
2070 "request_id": request_id,
2071 "http.url": uri if uri is not None and uri.startswith("http") else None,
2072 "resource.type": "template" if (uri is not None and "{" in uri and "}" in uri) else "static",
2073 },
2074 )
2075 logger.debug(f"✓ Created resource.read span: {db_span_id} for resource: {uri}")
2076 except Exception as e:
2077 logger.warning(f"Failed to start observability span for resource reading: {e}")
2078 db_span_id = None
2080 with create_span(
2081 "resource.read",
2082 {
2083 "resource.uri": resource_uri or "unknown",
2084 "user": user or "anonymous",
2085 "server_id": server_id,
2086 "request_id": request_id,
2087 "http.url": uri if uri is not None and uri.startswith("http") else None,
2088 "resource.type": "template" if (uri is not None and "{" in uri and "}" in uri) else "static",
2089 },
2090 ) as span:
2091 try:
2092 # Generate request ID if not provided
2093 if not request_id:
2094 request_id = str(uuid.uuid4())
2096 original_uri = uri
2097 contexts = None
2099 # Check if plugin manager is available and eligible for this request
2100 plugin_eligible = bool(self._plugin_manager and PLUGINS_AVAILABLE and uri and ("://" in uri))
2102 # Initialize plugin manager if needed (lazy init must happen before has_hooks_for check)
2103 # pylint: disable=protected-access
2104 if plugin_eligible and not self._plugin_manager._initialized:
2105 await self._plugin_manager.initialize()
2106 # pylint: enable=protected-access
2108 # Check if any resource hooks are registered to avoid unnecessary context creation
2109 has_pre_fetch = plugin_eligible and self._plugin_manager.has_hooks_for(ResourceHookType.RESOURCE_PRE_FETCH)
2110 has_post_fetch = plugin_eligible and self._plugin_manager.has_hooks_for(ResourceHookType.RESOURCE_POST_FETCH)
2112 # Initialize plugin context variables only if hooks are registered
2113 global_context = None
2114 if has_pre_fetch or has_post_fetch:
2115 # Create plugin context
2116 # Normalize user to an identifier string if provided
2117 user_id = None
2118 if user is not None:
2119 if isinstance(user, dict) and "email" in user:
2120 user_id = user.get("email")
2121 elif isinstance(user, str):
2122 user_id = user
2123 else:
2124 # Attempt to fallback to attribute access
2125 user_id = getattr(user, "email", None)
2127 # Use existing global_context from middleware or create new one
2128 if plugin_global_context:
2129 global_context = plugin_global_context
2130 # Update fields with resource-specific information
2131 if user_id:
2132 global_context.user = user_id
2133 if server_id:
2134 global_context.server_id = server_id
2135 else:
2136 # Create new context (fallback when middleware didn't run)
2137 global_context = GlobalContext(request_id=request_id, user=user_id, server_id=server_id)
2139 # Call pre-fetch hooks if registered
2140 if has_pre_fetch:
2141 # Create pre-fetch payload
2142 pre_payload = ResourcePreFetchPayload(uri=uri, metadata={})
2144 # Execute pre-fetch hooks with context from previous hooks
2145 pre_result, contexts = await self._plugin_manager.invoke_hook(
2146 ResourceHookType.RESOURCE_PRE_FETCH,
2147 pre_payload,
2148 global_context,
2149 local_contexts=plugin_context_table, # Pass context from previous hooks
2150 violations_as_exceptions=True,
2151 )
2152 # Use modified URI if plugin changed it
2153 if pre_result.modified_payload:
2154 uri = pre_result.modified_payload.uri
2155 logger.debug(f"Resource URI modified by plugin: {original_uri} -> {uri}")
2157 # Validate resource path if experimental validation is enabled
2158 if getattr(settings, "experimental_validate_io", False) and uri and isinstance(uri, str):
2159 try:
2160 SecurityValidator.validate_path(uri, getattr(settings, "allowed_roots", None))
2161 except ValueError as e:
2162 raise ResourceError(f"Path validation failed: {e}")
2164 # Original resource fetching logic
2165 logger.info(f"Fetching resource: {resource_id} (URI: {uri})")
2166 # Check for template
2168 if resource_db is None and uri is not None: # and "{" in uri and "}" in uri:
2169 # Matches uri (modified value from pluggins if applicable)
2170 # with uri from resource DB
2171 # if uri is of type resource template then resource is retreived from DB
2172 query = select(DbResource).where(DbResource.uri == str(uri)).where(DbResource.enabled)
2173 if include_inactive:
2174 query = select(DbResource).where(DbResource.uri == str(uri))
2175 resource_db = db.execute(query).scalar_one_or_none()
2176 if resource_db:
2177 # resource_id = resource_db.id
2178 content = resource_db.content
2179 else:
2180 # Check the inactivity first
2181 check_inactivity = db.execute(select(DbResource).where(DbResource.uri == str(resource_uri)).where(not_(DbResource.enabled))).scalar_one_or_none()
2182 if check_inactivity:
2183 raise ResourceNotFoundError(f"Resource '{resource_uri}' exists but is inactive")
2185 if resource_db is None:
2186 if resource_uri:
2187 # if resource_uri is provided
2188 # modified uri have templatized resource with prefilled value
2189 # triggers _read_template_resource
2190 # it internally checks which uri matches the pattern of modified uri and fetches
2191 # the one which matches else raises ResourceNotFoundError
2192 try:
2193 content = await self._read_template_resource(db, uri) or None
2194 # ═══════════════════════════════════════════════════════════════════════════
2195 # SECURITY: Fetch the template's DbResource record for access checking
2196 # _read_template_resource returns ResourceContent with the template's ID
2197 # ═══════════════════════════════════════════════════════════════════════════
2198 if content is not None and hasattr(content, "id") and content.id:
2199 template_query = select(DbResource).where(DbResource.id == str(content.id))
2200 if not include_inactive:
2201 template_query = template_query.where(DbResource.enabled)
2202 resource_db = db.execute(template_query).scalar_one_or_none()
2203 except Exception as e:
2204 raise ResourceNotFoundError(f"Resource template not found for '{resource_uri}'") from e
2206 if resource_uri:
2207 if content is None and resource_db is None:
2208 raise ResourceNotFoundError(f"Resource template not found for '{resource_uri}'")
2210 if resource_id and resource_db is None:
2211 # if resource_id provided but not found by Q2 (shouldn't normally happen,
2212 # but handles race conditions where resource is deleted between requests)
2213 query = select(DbResource).where(DbResource.id == str(resource_id)).where(DbResource.enabled)
2214 if include_inactive:
2215 query = select(DbResource).where(DbResource.id == str(resource_id))
2216 resource_db = db.execute(query).scalar_one_or_none()
2217 if resource_db:
2218 original_uri = resource_db.uri or None
2219 content = resource_db.content
2220 else:
2221 check_inactivity = db.execute(select(DbResource).where(DbResource.id == str(resource_id)).where(not_(DbResource.enabled))).scalar_one_or_none()
2222 if check_inactivity:
2223 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive")
2224 raise ResourceNotFoundError(f"Resource not found for the resource id: {resource_id}")
2226 # ═══════════════════════════════════════════════════════════════════════════
2227 # SECURITY: Check resource access based on visibility and team membership
2228 # ═══════════════════════════════════════════════════════════════════════════
2229 if resource_db:
2230 if not await self._check_resource_access(db, resource_db, user, token_teams):
2231 # Don't reveal resource existence - return generic "not found"
2232 raise ResourceNotFoundError(f"Resource not found: {resource_uri or resource_id}")
2234 # ═══════════════════════════════════════════════════════════════════════════
2235 # SECURITY: Enforce server scoping if server_id is provided
2236 # Resource must be attached to the specified virtual server
2237 # ═══════════════════════════════════════════════════════════════════════════
2238 if server_id:
2239 server_match = db.execute(
2240 select(server_resource_association.c.resource_id).where(
2241 server_resource_association.c.server_id == server_id,
2242 server_resource_association.c.resource_id == resource_db.id,
2243 )
2244 ).first()
2245 if not server_match:
2246 raise ResourceNotFoundError(f"Resource not found: {resource_uri or resource_id}")
2248 # Set success attributes on span
2249 if span:
2250 span.set_attribute("success", True)
2251 span.set_attribute("duration.ms", (time.monotonic() - start_time) * 1000)
2252 if content:
2253 span.set_attribute("content.size", len(str(content)))
2255 success = True
2256 # Return standardized content without breaking callers that expect passthrough
2257 # Prefer returning first-class content models or objects with content-like attributes.
2258 # ResourceContent and TextContent already imported at top level
2260 # Release transaction before network calls to avoid idle-in-transaction during invoke_resource
2261 db.commit()
2263 # ═══════════════════════════════════════════════════════════════════════════
2264 # RESOLVE CONTENT: Fetch actual content from gateway if needed
2265 # ═══════════════════════════════════════════════════════════════════════════
2266 # If content is a Pydantic content model, invoke gateway
2268 if isinstance(content, (ResourceContent, TextContent)):
2269 resource_response = await self.invoke_resource(
2270 db=db,
2271 resource_id=getattr(content, "id"),
2272 resource_uri=getattr(content, "uri") or None,
2273 resource_template_uri=getattr(content, "text") or None,
2274 user_identity=user,
2275 meta_data=meta_data,
2276 resource_obj=resource_db,
2277 gateway_obj=resource_db_gateway,
2278 )
2279 if resource_response:
2280 setattr(content, "text", resource_response)
2281 # If content is any object that quacks like content
2282 elif hasattr(content, "text") or hasattr(content, "blob"):
2283 if hasattr(content, "blob"):
2284 resource_response = await self.invoke_resource(
2285 db=db,
2286 resource_id=getattr(content, "id"),
2287 resource_uri=getattr(content, "uri") or None,
2288 resource_template_uri=getattr(content, "blob") or None,
2289 user_identity=user,
2290 meta_data=meta_data,
2291 resource_obj=resource_db,
2292 gateway_obj=resource_db_gateway,
2293 )
2294 if resource_response:
2295 setattr(content, "blob", resource_response)
2296 elif hasattr(content, "text"):
2297 resource_response = await self.invoke_resource(
2298 db=db,
2299 resource_id=getattr(content, "id"),
2300 resource_uri=getattr(content, "uri") or None,
2301 resource_template_uri=getattr(content, "text") or None,
2302 user_identity=user,
2303 meta_data=meta_data,
2304 resource_obj=resource_db,
2305 gateway_obj=resource_db_gateway,
2306 )
2307 if resource_response:
2308 setattr(content, "text", resource_response)
2309 # Normalize primitive types to ResourceContent
2310 elif isinstance(content, bytes):
2311 content = ResourceContent(type="resource", id=str(resource_id), uri=original_uri, blob=content)
2312 elif isinstance(content, str):
2313 content = ResourceContent(type="resource", id=str(resource_id), uri=original_uri, text=content)
2314 else:
2315 # Fallback to stringified content
2316 content = ResourceContent(type="resource", id=str(resource_id) or str(content.id), uri=original_uri or content.uri, text=str(content))
2318 # ═══════════════════════════════════════════════════════════════════════════
2319 # POST-FETCH HOOKS: Now called AFTER content is resolved from gateway
2320 # ═══════════════════════════════════════════════════════════════════════════
2321 if has_post_fetch:
2322 post_payload = ResourcePostFetchPayload(uri=original_uri, content=content)
2323 post_result, _ = await self._plugin_manager.invoke_hook(ResourceHookType.RESOURCE_POST_FETCH, post_payload, global_context, contexts, violations_as_exceptions=True)
2324 if post_result.modified_payload:
2325 content = post_result.modified_payload.content
2327 return content
2328 except Exception as e:
2329 success = False
2330 error_message = str(e)
2331 raise
2332 finally:
2333 # Record metrics only if we found a resource (not for templates)
2334 if resource_db:
2335 try:
2336 # First-Party
2337 from mcpgateway.services.metrics_buffer_service import get_metrics_buffer_service # pylint: disable=import-outside-toplevel
2339 metrics_buffer = get_metrics_buffer_service()
2340 metrics_buffer.record_resource_metric(
2341 resource_id=resource_db.id,
2342 start_time=start_time,
2343 success=success,
2344 error_message=error_message,
2345 )
2346 except Exception as metrics_error:
2347 logger.warning(f"Failed to record resource metric: {metrics_error}")
2349 # End database span for observability dashboard
2350 # NOTE: Use fresh_db_session() since db may have been closed by invoke_resource
2351 if db_span_id and observability_service and not db_span_ended:
2352 try:
2353 with fresh_db_session() as fresh_db:
2354 observability_service.end_span(
2355 db=fresh_db,
2356 span_id=db_span_id,
2357 status="ok" if success else "error",
2358 status_message=error_message if error_message else None,
2359 )
2360 db_span_ended = True
2361 logger.debug(f"✓ Ended resource.read span: {db_span_id}")
2362 except Exception as e:
2363 logger.warning(f"Failed to end observability span for resource reading: {e}")
2365 async def set_resource_state(self, db: Session, resource_id: int, activate: bool, user_email: Optional[str] = None, skip_cache_invalidation: bool = False) -> ResourceRead:
2366 """
2367 Set the activation status of a resource.
2369 Args:
2370 db: Database session
2371 resource_id: Resource ID
2372 activate: True to activate, False to deactivate
2373 user_email: Optional[str] The email of the user to check if the user has permission to modify.
2374 skip_cache_invalidation: If True, skip cache invalidation (used for batch operations).
2376 Returns:
2377 The updated ResourceRead object
2379 Raises:
2380 ResourceNotFoundError: If the resource is not found.
2381 ResourceLockConflictError: If the resource is locked by another transaction.
2382 ResourceError: For other errors.
2383 PermissionError: If user doesn't own the resource.
2385 Examples:
2386 >>> from mcpgateway.services.resource_service import ResourceService
2387 >>> from unittest.mock import MagicMock, AsyncMock
2388 >>> from mcpgateway.schemas import ResourceRead
2389 >>> service = ResourceService()
2390 >>> db = MagicMock()
2391 >>> resource = MagicMock()
2392 >>> db.get.return_value = resource
2393 >>> db.commit = MagicMock()
2394 >>> db.refresh = MagicMock()
2395 >>> service._notify_resource_activated = AsyncMock()
2396 >>> service._notify_resource_deactivated = AsyncMock()
2397 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read')
2398 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read')
2399 >>> import asyncio
2400 >>> asyncio.run(service.set_resource_state(db, 1, True))
2401 'resource_read'
2402 """
2403 try:
2404 # Use nowait=True to fail fast if row is locked, preventing lock contention under high load
2405 try:
2406 resource = get_for_update(db, DbResource, resource_id, nowait=True)
2407 except OperationalError as lock_err:
2408 # Row is locked by another transaction - fail fast with 409
2409 db.rollback()
2410 raise ResourceLockConflictError(f"Resource {resource_id} is currently being modified by another request") from lock_err
2411 if not resource:
2412 raise ResourceNotFoundError(f"Resource not found: {resource_id}")
2414 if user_email:
2415 # First-Party
2416 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
2418 permission_service = PermissionService(db)
2419 if not await permission_service.check_resource_ownership(user_email, resource):
2420 raise PermissionError("Only the owner can activate the Resource" if activate else "Only the owner can deactivate the Resource")
2422 # Update status if it's different
2423 if resource.enabled != activate:
2424 resource.enabled = activate
2425 resource.updated_at = datetime.now(timezone.utc)
2426 db.commit()
2427 db.refresh(resource)
2429 # Invalidate cache after status change (skip for batch operations)
2430 if not skip_cache_invalidation:
2431 cache = _get_registry_cache()
2432 await cache.invalidate_resources()
2434 # Notify subscribers
2435 if activate:
2436 await self._notify_resource_activated(resource)
2437 else:
2438 await self._notify_resource_deactivated(resource)
2440 logger.info(f"Resource {resource.uri} {'activated' if activate else 'deactivated'}")
2442 # Structured logging: Audit trail for resource state change
2443 audit_trail.log_action(
2444 user_id=user_email or "system",
2445 action="set_resource_state",
2446 resource_type="resource",
2447 resource_id=str(resource.id),
2448 resource_name=resource.name,
2449 user_email=user_email,
2450 team_id=resource.team_id,
2451 new_values={
2452 "enabled": resource.enabled,
2453 },
2454 context={
2455 "action": "activate" if activate else "deactivate",
2456 },
2457 db=db,
2458 )
2460 # Structured logging: Log successful resource state change
2461 structured_logger.log(
2462 level="INFO",
2463 message=f"Resource {'activated' if activate else 'deactivated'} successfully",
2464 event_type="resource_state_changed",
2465 component="resource_service",
2466 user_email=user_email,
2467 team_id=resource.team_id,
2468 resource_type="resource",
2469 resource_id=str(resource.id),
2470 custom_fields={
2471 "resource_uri": resource.uri,
2472 "enabled": resource.enabled,
2473 },
2474 db=db,
2475 )
2477 resource.team = self._get_team_name(db, resource.team_id)
2478 return self.convert_resource_to_read(resource)
2479 except PermissionError as e:
2480 # Structured logging: Log permission error
2481 structured_logger.log(
2482 level="WARNING",
2483 message="Resource state change failed due to permission error",
2484 event_type="resource_state_change_permission_denied",
2485 component="resource_service",
2486 user_email=user_email,
2487 resource_type="resource",
2488 resource_id=str(resource_id),
2489 error=e,
2490 db=db,
2491 )
2492 raise e
2493 except ResourceLockConflictError:
2494 # Re-raise lock conflicts without wrapping - allows 409 response
2495 raise
2496 except ResourceNotFoundError:
2497 # Re-raise not found without wrapping - allows 404 response
2498 raise
2499 except Exception as e:
2500 db.rollback()
2502 # Structured logging: Log generic resource state change failure
2503 structured_logger.log(
2504 level="ERROR",
2505 message="Resource state change failed",
2506 event_type="resource_state_change_failed",
2507 component="resource_service",
2508 user_email=user_email,
2509 resource_type="resource",
2510 resource_id=str(resource_id),
2511 error=e,
2512 db=db,
2513 )
2514 raise ResourceError(f"Failed to set resource state: {str(e)}")
2516 async def subscribe_resource(self, db: Session, subscription: ResourceSubscription) -> None:
2517 """
2518 Subscribe to a resource.
2520 Args:
2521 db: Database session
2522 subscription: Resource subscription object
2524 Raises:
2525 ResourceNotFoundError: If the resource is not found or is inactive
2526 ResourceError: For other subscription errors
2528 Examples:
2529 >>> from mcpgateway.services.resource_service import ResourceService
2530 >>> from unittest.mock import MagicMock
2531 >>> service = ResourceService()
2532 >>> db = MagicMock()
2533 >>> subscription = MagicMock()
2534 >>> import asyncio
2535 >>> asyncio.run(service.subscribe_resource(db, subscription))
2536 """
2537 try:
2538 # Verify resource exists (single query to avoid TOCTOU between active/inactive checks)
2539 resource = db.execute(select(DbResource).where(DbResource.uri == subscription.uri)).scalar_one_or_none()
2541 if not resource:
2542 raise ResourceNotFoundError(f"Resource not found: {subscription.uri}")
2544 if not resource.enabled:
2545 raise ResourceNotFoundError(f"Resource '{subscription.uri}' exists but is inactive")
2547 # Create subscription
2548 db_sub = DbSubscription(resource_id=resource.id, subscriber_id=subscription.subscriber_id)
2549 db.add(db_sub)
2550 db.commit()
2552 logger.info(f"Added subscription for {subscription.uri} by {subscription.subscriber_id}")
2554 except Exception as e:
2555 db.rollback()
2556 raise ResourceError(f"Failed to subscribe: {str(e)}")
2558 async def unsubscribe_resource(self, db: Session, subscription: ResourceSubscription) -> None:
2559 """
2560 Unsubscribe from a resource.
2562 Args:
2563 db: Database session
2564 subscription: Resource subscription object
2566 Raises:
2568 Examples:
2569 >>> from mcpgateway.services.resource_service import ResourceService
2570 >>> from unittest.mock import MagicMock
2571 >>> service = ResourceService()
2572 >>> db = MagicMock()
2573 >>> subscription = MagicMock()
2574 >>> import asyncio
2575 >>> asyncio.run(service.unsubscribe_resource(db, subscription))
2576 """
2577 try:
2578 # Find resource
2579 resource = db.execute(select(DbResource).where(DbResource.uri == subscription.uri)).scalar_one_or_none()
2581 if not resource:
2582 return
2584 # Remove subscription
2585 db.execute(select(DbSubscription).where(DbSubscription.resource_id == resource.id).where(DbSubscription.subscriber_id == subscription.subscriber_id)).delete()
2586 db.commit()
2588 logger.info(f"Removed subscription for {subscription.uri} by {subscription.subscriber_id}")
2590 except Exception as e:
2591 db.rollback()
2592 logger.error(f"Failed to unsubscribe: {str(e)}")
2594 async def update_resource(
2595 self,
2596 db: Session,
2597 resource_id: Union[int, str],
2598 resource_update: ResourceUpdate,
2599 modified_by: Optional[str] = None,
2600 modified_from_ip: Optional[str] = None,
2601 modified_via: Optional[str] = None,
2602 modified_user_agent: Optional[str] = None,
2603 user_email: Optional[str] = None,
2604 ) -> ResourceRead:
2605 """
2606 Update a resource.
2608 Args:
2609 db: Database session
2610 resource_id: Resource ID
2611 resource_update: Resource update object
2612 modified_by: Username of the person modifying the resource
2613 modified_from_ip: IP address where the modification request originated
2614 modified_via: Source of modification (ui/api/import)
2615 modified_user_agent: User agent string from the modification request
2616 user_email: Email of user performing update (for ownership check)
2618 Returns:
2619 The updated ResourceRead object
2621 Raises:
2622 ResourceNotFoundError: If the resource is not found
2623 ResourceURIConflictError: If a resource with the same URI already exists.
2624 PermissionError: If user doesn't own the resource
2625 ResourceError: For other update errors
2626 IntegrityError: If a database integrity error occurs.
2627 Exception: For unexpected errors
2629 Example:
2630 >>> from mcpgateway.services.resource_service import ResourceService
2631 >>> from unittest.mock import MagicMock, AsyncMock
2632 >>> from mcpgateway.schemas import ResourceRead
2633 >>> service = ResourceService()
2634 >>> db = MagicMock()
2635 >>> resource = MagicMock()
2636 >>> db.get.return_value = resource
2637 >>> db.commit = MagicMock()
2638 >>> db.refresh = MagicMock()
2639 >>> service._notify_resource_updated = AsyncMock()
2640 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read')
2641 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read')
2642 >>> import asyncio
2643 >>> asyncio.run(service.update_resource(db, 'resource_id', MagicMock()))
2644 'resource_read'
2645 """
2646 try:
2647 logger.info(f"Updating resource: {resource_id}")
2648 resource = get_for_update(db, DbResource, resource_id)
2649 if not resource:
2650 raise ResourceNotFoundError(f"Resource not found: {resource_id}")
2652 # # Check for uri conflict if uri is being changed and visibility is public
2653 if resource_update.uri and resource_update.uri != resource.uri:
2654 visibility = resource_update.visibility or resource.visibility
2655 team_id = resource_update.team_id or resource.team_id
2656 if visibility.lower() == "public":
2657 # Check for existing public resources with the same uri
2658 existing_resource = get_for_update(db, DbResource, where=and_(DbResource.uri == resource_update.uri, DbResource.visibility == "public", DbResource.id != resource_id))
2659 if existing_resource:
2660 raise ResourceURIConflictError(resource_update.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility)
2661 elif visibility.lower() == "team" and team_id:
2662 # Check for existing team resource with the same uri
2663 existing_resource = get_for_update(
2664 db, DbResource, where=and_(DbResource.uri == resource_update.uri, DbResource.visibility == "team", DbResource.team_id == team_id, DbResource.id != resource_id)
2665 )
2666 if existing_resource:
2667 raise ResourceURIConflictError(resource_update.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility)
2669 # Check ownership if user_email provided
2670 if user_email:
2671 # First-Party
2672 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
2674 permission_service = PermissionService(db)
2675 if not await permission_service.check_resource_ownership(user_email, resource):
2676 raise PermissionError("Only the owner can update this resource")
2678 # Update fields if provided
2679 if resource_update.uri is not None:
2680 resource.uri = resource_update.uri
2681 if resource_update.name is not None:
2682 resource.name = resource_update.name
2683 if resource_update.description is not None:
2684 resource.description = resource_update.description
2685 if resource_update.mime_type is not None:
2686 resource.mime_type = resource_update.mime_type
2687 if resource_update.uri_template is not None:
2688 resource.uri_template = resource_update.uri_template
2689 if resource_update.visibility is not None:
2690 resource.visibility = resource_update.visibility
2692 # Update content if provided
2693 if resource_update.content is not None:
2694 # Determine content storage
2695 is_text = resource.mime_type and resource.mime_type.startswith("text/") or isinstance(resource_update.content, str)
2697 resource.text_content = resource_update.content if is_text else None
2698 resource.binary_content = (
2699 resource_update.content.encode() if is_text and isinstance(resource_update.content, str) else resource_update.content if isinstance(resource_update.content, bytes) else None
2700 )
2701 resource.size = len(resource_update.content)
2703 # Update tags if provided
2704 if resource_update.tags is not None:
2705 resource.tags = resource_update.tags
2707 # Update metadata fields
2708 resource.updated_at = datetime.now(timezone.utc)
2709 if modified_by:
2710 resource.modified_by = modified_by
2711 if modified_from_ip:
2712 resource.modified_from_ip = modified_from_ip
2713 if modified_via:
2714 resource.modified_via = modified_via
2715 if modified_user_agent:
2716 resource.modified_user_agent = modified_user_agent
2717 if hasattr(resource, "version") and resource.version is not None:
2718 resource.version = resource.version + 1
2719 else:
2720 resource.version = 1
2721 db.commit()
2722 db.refresh(resource)
2724 # Invalidate cache after successful update
2725 cache = _get_registry_cache()
2726 await cache.invalidate_resources()
2727 # Also invalidate tags cache since resource tags may have changed
2728 # First-Party
2729 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
2731 await admin_stats_cache.invalidate_tags()
2732 # First-Party
2733 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
2735 metrics_cache.invalidate_prefix("top_resources:")
2736 metrics_cache.invalidate("resources")
2738 # Notify subscribers
2739 await self._notify_resource_updated(resource)
2741 logger.info(f"Updated resource: {resource.uri}")
2743 # Structured logging: Audit trail for resource update
2744 changes = []
2745 if resource_update.uri:
2746 changes.append(f"uri: {resource_update.uri}")
2747 if resource_update.visibility:
2748 changes.append(f"visibility: {resource_update.visibility}")
2749 if resource_update.description:
2750 changes.append("description updated")
2752 audit_trail.log_action(
2753 user_id=user_email or modified_by or "system",
2754 action="update_resource",
2755 resource_type="resource",
2756 resource_id=str(resource.id),
2757 resource_name=resource.name,
2758 user_email=user_email,
2759 team_id=resource.team_id,
2760 client_ip=modified_from_ip,
2761 user_agent=modified_user_agent,
2762 new_values={
2763 "uri": resource.uri,
2764 "name": resource.name,
2765 "version": resource.version,
2766 },
2767 context={
2768 "modified_via": modified_via,
2769 "changes": ", ".join(changes) if changes else "metadata only",
2770 },
2771 db=db,
2772 )
2774 # Structured logging: Log successful resource update
2775 structured_logger.log(
2776 level="INFO",
2777 message="Resource updated successfully",
2778 event_type="resource_updated",
2779 component="resource_service",
2780 user_id=modified_by,
2781 user_email=user_email,
2782 team_id=resource.team_id,
2783 resource_type="resource",
2784 resource_id=str(resource.id),
2785 custom_fields={
2786 "resource_uri": resource.uri,
2787 "version": resource.version,
2788 },
2789 db=db,
2790 )
2792 return self.convert_resource_to_read(resource)
2793 except PermissionError as pe:
2794 db.rollback()
2796 # Structured logging: Log permission error
2797 structured_logger.log(
2798 level="WARNING",
2799 message="Resource update failed due to permission error",
2800 event_type="resource_update_permission_denied",
2801 component="resource_service",
2802 user_email=user_email,
2803 resource_type="resource",
2804 resource_id=str(resource_id),
2805 error=pe,
2806 db=db,
2807 )
2808 raise
2809 except IntegrityError as ie:
2810 db.rollback()
2811 logger.error(f"IntegrityErrors in group: {ie}")
2813 # Structured logging: Log database integrity error
2814 structured_logger.log(
2815 level="ERROR",
2816 message="Resource update failed due to database integrity error",
2817 event_type="resource_update_failed",
2818 component="resource_service",
2819 user_id=modified_by,
2820 user_email=user_email,
2821 resource_type="resource",
2822 resource_id=str(resource_id),
2823 error=ie,
2824 db=db,
2825 )
2826 raise ie
2827 except ResourceURIConflictError as pe:
2828 logger.error(f"Resource URI conflict: {pe}")
2830 # Structured logging: Log URI conflict error
2831 structured_logger.log(
2832 level="WARNING",
2833 message="Resource update failed due to URI conflict",
2834 event_type="resource_uri_conflict",
2835 component="resource_service",
2836 user_id=modified_by,
2837 user_email=user_email,
2838 resource_type="resource",
2839 resource_id=str(resource_id),
2840 error=pe,
2841 db=db,
2842 )
2843 raise pe
2844 except Exception as e:
2845 db.rollback()
2846 if isinstance(e, ResourceNotFoundError):
2847 # Structured logging: Log not found error
2848 structured_logger.log(
2849 level="ERROR",
2850 message="Resource update failed - resource not found",
2851 event_type="resource_not_found",
2852 component="resource_service",
2853 user_email=user_email,
2854 resource_type="resource",
2855 resource_id=str(resource_id),
2856 error=e,
2857 db=db,
2858 )
2859 raise e
2861 # Structured logging: Log generic resource update failure
2862 structured_logger.log(
2863 level="ERROR",
2864 message="Resource update failed",
2865 event_type="resource_update_failed",
2866 component="resource_service",
2867 user_id=modified_by,
2868 user_email=user_email,
2869 resource_type="resource",
2870 resource_id=str(resource_id),
2871 error=e,
2872 db=db,
2873 )
2874 raise ResourceError(f"Failed to update resource: {str(e)}")
2876 async def delete_resource(self, db: Session, resource_id: Union[int, str], user_email: Optional[str] = None, purge_metrics: bool = False) -> None:
2877 """
2878 Delete a resource.
2880 Args:
2881 db: Database session
2882 resource_id: Resource ID
2883 user_email: Email of user performing delete (for ownership check)
2884 purge_metrics: If True, delete raw + rollup metrics for this resource
2886 Raises:
2887 ResourceNotFoundError: If the resource is not found
2888 PermissionError: If user doesn't own the resource
2889 ResourceError: For other deletion errors
2891 Example:
2892 >>> from mcpgateway.services.resource_service import ResourceService
2893 >>> from unittest.mock import MagicMock, AsyncMock
2894 >>> service = ResourceService()
2895 >>> db = MagicMock()
2896 >>> resource = MagicMock()
2897 >>> db.get.return_value = resource
2898 >>> db.delete = MagicMock()
2899 >>> db.commit = MagicMock()
2900 >>> service._notify_resource_deleted = AsyncMock()
2901 >>> import asyncio
2902 >>> asyncio.run(service.delete_resource(db, 'resource_id'))
2903 """
2904 try:
2905 # Find resource by its URI.
2906 resource = db.execute(select(DbResource).where(DbResource.id == resource_id)).scalar_one_or_none()
2908 if not resource:
2909 # If resource doesn't exist, rollback and re-raise a ResourceNotFoundError.
2910 db.rollback()
2911 raise ResourceNotFoundError(f"Resource not found: {resource_id}")
2913 # Check ownership if user_email provided
2914 if user_email:
2915 # First-Party
2916 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
2918 permission_service = PermissionService(db)
2919 if not await permission_service.check_resource_ownership(user_email, resource):
2920 raise PermissionError("Only the owner can delete this resource")
2922 # Store resource info for notification before deletion.
2923 resource_info = {
2924 "id": resource.id,
2925 "uri": resource.uri,
2926 "name": resource.name,
2927 }
2929 # Remove subscriptions using SQLAlchemy's delete() expression.
2930 db.execute(delete(DbSubscription).where(DbSubscription.resource_id == resource.id))
2932 if purge_metrics:
2933 with pause_rollup_during_purge(reason=f"purge_resource:{resource.id}"):
2934 delete_metrics_in_batches(db, ResourceMetric, ResourceMetric.resource_id, resource.id)
2935 delete_metrics_in_batches(db, ResourceMetricsHourly, ResourceMetricsHourly.resource_id, resource.id)
2937 # Hard delete the resource.
2938 resource_uri = resource.uri
2939 resource_name = resource.name
2940 resource_team_id = resource.team_id
2942 db.delete(resource)
2943 db.commit()
2945 # Invalidate cache after successful deletion
2946 cache = _get_registry_cache()
2947 await cache.invalidate_resources()
2948 # Also invalidate tags cache since resource tags may have changed
2949 # First-Party
2950 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
2952 await admin_stats_cache.invalidate_tags()
2954 # Notify subscribers.
2955 await self._notify_resource_deleted(resource_info)
2957 logger.info(f"Permanently deleted resource: {resource.uri}")
2959 # Structured logging: Audit trail for resource deletion
2960 audit_trail.log_action(
2961 user_id=user_email or "system",
2962 action="delete_resource",
2963 resource_type="resource",
2964 resource_id=str(resource_info["id"]),
2965 resource_name=resource_name,
2966 user_email=user_email,
2967 team_id=resource_team_id,
2968 old_values={
2969 "uri": resource_uri,
2970 "name": resource_name,
2971 },
2972 db=db,
2973 )
2975 # Structured logging: Log successful resource deletion
2976 structured_logger.log(
2977 level="INFO",
2978 message="Resource deleted successfully",
2979 event_type="resource_deleted",
2980 component="resource_service",
2981 user_email=user_email,
2982 team_id=resource_team_id,
2983 resource_type="resource",
2984 resource_id=str(resource_info["id"]),
2985 custom_fields={
2986 "resource_uri": resource_uri,
2987 "purge_metrics": purge_metrics,
2988 },
2989 db=db,
2990 )
2992 except PermissionError as pe:
2993 db.rollback()
2995 # Structured logging: Log permission error
2996 structured_logger.log(
2997 level="WARNING",
2998 message="Resource deletion failed due to permission error",
2999 event_type="resource_delete_permission_denied",
3000 component="resource_service",
3001 user_email=user_email,
3002 resource_type="resource",
3003 resource_id=str(resource_id),
3004 error=pe,
3005 db=db,
3006 )
3007 raise
3008 except ResourceNotFoundError as rnfe:
3009 # ResourceNotFoundError is re-raised to be handled in the endpoint.
3010 # Structured logging: Log not found error
3011 structured_logger.log(
3012 level="ERROR",
3013 message="Resource deletion failed - resource not found",
3014 event_type="resource_not_found",
3015 component="resource_service",
3016 user_email=user_email,
3017 resource_type="resource",
3018 resource_id=str(resource_id),
3019 error=rnfe,
3020 db=db,
3021 )
3022 raise
3023 except Exception as e:
3024 db.rollback()
3026 # Structured logging: Log generic resource deletion failure
3027 structured_logger.log(
3028 level="ERROR",
3029 message="Resource deletion failed",
3030 event_type="resource_deletion_failed",
3031 component="resource_service",
3032 user_email=user_email,
3033 resource_type="resource",
3034 resource_id=str(resource_id),
3035 error=e,
3036 db=db,
3037 )
3038 raise ResourceError(f"Failed to delete resource: {str(e)}")
3040 async def get_resource_by_id(self, db: Session, resource_id: str, include_inactive: bool = False) -> ResourceRead:
3041 """
3042 Get a resource by ID.
3044 Args:
3045 db: Database session
3046 resource_id: Resource ID
3047 include_inactive: Whether to include inactive resources
3049 Returns:
3050 ResourceRead: The resource object
3052 Raises:
3053 ResourceNotFoundError: If the resource is not found
3055 Example:
3056 >>> from mcpgateway.services.resource_service import ResourceService
3057 >>> from unittest.mock import MagicMock
3058 >>> service = ResourceService()
3059 >>> db = MagicMock()
3060 >>> resource = MagicMock()
3061 >>> db.execute.return_value.scalar_one_or_none.return_value = resource
3062 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read')
3063 >>> import asyncio
3064 >>> asyncio.run(service.get_resource_by_id(db, "39334ce0ed2644d79ede8913a66930c9"))
3065 'resource_read'
3066 """
3067 query = select(DbResource).where(DbResource.id == resource_id)
3069 if not include_inactive:
3070 query = query.where(DbResource.enabled)
3072 resource = db.execute(query).scalar_one_or_none()
3074 if not resource:
3075 if not include_inactive:
3076 # Check if inactive resource exists
3077 inactive_resource = db.execute(select(DbResource).where(DbResource.id == resource_id).where(not_(DbResource.enabled))).scalar_one_or_none()
3079 if inactive_resource:
3080 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive")
3082 raise ResourceNotFoundError(f"Resource not found: {resource_id}")
3084 resource_read = self.convert_resource_to_read(resource)
3086 structured_logger.log(
3087 level="INFO",
3088 message="Resource retrieved successfully",
3089 event_type="resource_viewed",
3090 component="resource_service",
3091 team_id=getattr(resource, "team_id", None),
3092 resource_type="resource",
3093 resource_id=str(resource.id),
3094 custom_fields={
3095 "resource_uri": resource.uri,
3096 "include_inactive": include_inactive,
3097 },
3098 db=db,
3099 )
3101 return resource_read
3103 async def _notify_resource_activated(self, resource: DbResource) -> None:
3104 """
3105 Notify subscribers of resource activation.
3107 Args:
3108 resource: Resource to activate
3109 """
3110 event = {
3111 "type": "resource_activated",
3112 "data": {
3113 "id": resource.id,
3114 "uri": resource.uri,
3115 "name": resource.name,
3116 "enabled": True,
3117 },
3118 "timestamp": datetime.now(timezone.utc).isoformat(),
3119 }
3120 await self._publish_event(event)
3122 async def _notify_resource_deactivated(self, resource: DbResource) -> None:
3123 """
3124 Notify subscribers of resource deactivation.
3126 Args:
3127 resource: Resource to deactivate
3128 """
3129 event = {
3130 "type": "resource_deactivated",
3131 "data": {
3132 "id": resource.id,
3133 "uri": resource.uri,
3134 "name": resource.name,
3135 "enabled": False,
3136 },
3137 "timestamp": datetime.now(timezone.utc).isoformat(),
3138 }
3139 await self._publish_event(event)
3141 async def _notify_resource_deleted(self, resource_info: Dict[str, Any]) -> None:
3142 """
3143 Notify subscribers of resource deletion.
3145 Args:
3146 resource_info: Dictionary of resource to delete
3147 """
3148 event = {
3149 "type": "resource_deleted",
3150 "data": resource_info,
3151 "timestamp": datetime.now(timezone.utc).isoformat(),
3152 }
3153 await self._publish_event(event)
3155 async def _notify_resource_removed(self, resource: DbResource) -> None:
3156 """
3157 Notify subscribers of resource removal.
3159 Args:
3160 resource: Resource to remove
3161 """
3162 event = {
3163 "type": "resource_removed",
3164 "data": {
3165 "id": resource.id,
3166 "uri": resource.uri,
3167 "name": resource.name,
3168 "enabled": False,
3169 },
3170 "timestamp": datetime.now(timezone.utc).isoformat(),
3171 }
3172 await self._publish_event(event)
3174 async def subscribe_events(self) -> AsyncGenerator[Dict[str, Any], None]:
3175 """Subscribe to Resource events via the EventService.
3177 Yields:
3178 Resource event messages.
3179 """
3180 async for event in self._event_service.subscribe_events():
3181 yield event
3183 def _detect_mime_type(self, uri: str, content: Union[str, bytes]) -> str:
3184 """Detect mime type from URI and content.
3186 Args:
3187 uri: Resource URI
3188 content: Resource content
3190 Returns:
3191 Detected mime type
3192 """
3193 # Try from URI first
3194 mime_type, _ = mimetypes.guess_type(uri)
3195 if mime_type:
3196 return mime_type
3198 # Check content type
3199 if isinstance(content, str):
3200 return "text/plain"
3202 return "application/octet-stream"
3204 async def _read_template_resource(self, db: Session, uri: str, include_inactive: Optional[bool] = False) -> ResourceContent:
3205 """
3206 Read a templated resource.
3208 Args:
3209 db: Database session.
3210 uri: Template URI with parameters.
3211 include_inactive: Whether to include inactive resources in DB lookups.
3213 Returns:
3214 ResourceContent: The resolved content from the matching template.
3216 Raises:
3217 ResourceNotFoundError: If no matching template is found.
3218 ResourceError: For other template resolution errors.
3219 NotImplementedError: If a binary template resource is encountered.
3220 """
3221 # Find matching template # DRT BREAKPOINT
3222 template = None
3223 if not self._template_cache:
3224 logger.info("_template_cache is empty, fetching exisitng resource templates")
3225 resource_templates = await self.list_resource_templates(db=db, include_inactive=include_inactive)
3226 for i in resource_templates:
3227 self._template_cache[i.name] = i
3228 for cached in self._template_cache.values():
3229 if self._uri_matches_template(uri, cached.uri_template):
3230 template = cached
3231 break
3233 if template:
3234 check_inactivity = db.execute(select(DbResource).where(DbResource.id == str(template.id)).where(not_(DbResource.enabled))).scalar_one_or_none()
3235 if check_inactivity:
3236 raise ResourceNotFoundError(f"Resource '{template.id}' exists but is inactive")
3237 else:
3238 raise ResourceNotFoundError(f"No template matches URI: {uri}")
3240 try:
3241 # Extract parameters
3242 params = self._extract_template_params(uri, template.uri_template)
3243 # Generate content
3244 if template.mime_type and template.mime_type.startswith("text/"):
3245 content = template.uri_template.format(**params)
3246 return ResourceContent(type="resource", id=str(template.id) or None, uri=template.uri_template or None, mime_type=template.mime_type or None, text=content)
3247 # # Handle binary template
3248 raise NotImplementedError("Binary resource templates not yet supported")
3250 except ResourceNotFoundError:
3251 raise
3252 except Exception as e:
3253 raise ResourceError(f"Failed to process template: {str(e)}") from e
3255 @staticmethod
3256 @lru_cache(maxsize=256)
3257 def _build_regex(template: str) -> re.Pattern:
3258 """
3259 Convert a URI template into a compiled regular expression.
3261 This parser supports a subset of RFC 6570–style templates for path
3262 matching. It extracts path parameters and converts them into named
3263 regex groups.
3265 Supported template features:
3266 - `{var}`
3267 A simple path parameter. Matches a single URI segment
3268 (i.e., any characters except `/`).
3269 → Translates to `(?P<var>[^/]+)`
3270 - `{var*}`
3271 A wildcard parameter. Matches one or more URI segments,
3272 including `/`.
3273 → Translates to `(?P<var>.+)`
3274 - `{?var1,var2}`
3275 Query-parameter expressions. These are ignored when building
3276 the regex for path matching and are stripped from the template.
3278 Example:
3279 Template: "files://root/{path*}/meta/{id}{?expand,debug}"
3280 Regex: r"^files://root/(?P<path>.+)/meta/(?P<id>[^/]+)$"
3282 Args:
3283 template: The URI template string containing parameter expressions.
3285 Returns:
3286 A compiled regular expression (re.Pattern) that can be used to
3287 match URIs and extract parameter values.
3289 Note:
3290 Results are cached using LRU cache (maxsize=256) to avoid
3291 recompiling the same template pattern repeatedly.
3292 """
3293 # Remove query parameter syntax for path matching
3294 template_without_query = re.sub(r"\{\?[^}]+\}", "", template)
3296 parts = re.split(r"(\{[^}]+\})", template_without_query)
3297 pattern = ""
3298 for part in parts:
3299 if part.startswith("{") and part.endswith("}"):
3300 name = part[1:-1]
3301 if name.endswith("*"):
3302 name = name[:-1]
3303 pattern += f"(?P<{name}>.+)"
3304 else:
3305 pattern += f"(?P<{name}>[^/]+)"
3306 else:
3307 pattern += re.escape(part)
3308 return re.compile(f"^{pattern}$")
3310 @staticmethod
3311 @lru_cache(maxsize=256)
3312 def _compile_parse_pattern(template: str) -> parse.Parser:
3313 """
3314 Compile a parse pattern for URI template parameter extraction.
3316 Args:
3317 template: The template pattern (e.g. "file:///{name}/{id}").
3319 Returns:
3320 Compiled parse.Parser object.
3322 Note:
3323 Results are cached using LRU cache (maxsize=256) to avoid
3324 recompiling the same template pattern repeatedly.
3325 """
3326 return parse.compile(template)
3328 def _extract_template_params(self, uri: str, template: str) -> Dict[str, str]:
3329 """
3330 Extract parameters from a URI based on a template.
3332 Args:
3333 uri: The actual URI containing parameter values.
3334 template: The template pattern (e.g. "file:///{name}/{id}").
3336 Returns:
3337 Dict of parameter names and extracted values.
3339 Note:
3340 Uses cached compiled parse patterns for better performance.
3341 """
3342 parser = self._compile_parse_pattern(template)
3343 result = parser.parse(uri)
3344 return result.named if result else {}
3346 def _uri_matches_template(self, uri: str, template: str) -> bool:
3347 """
3348 Check whether a URI matches a given template pattern.
3350 Args:
3351 uri: The URI to check.
3352 template: The template pattern.
3354 Returns:
3355 True if the URI matches the template, otherwise False.
3357 Note:
3358 Uses cached compiled regex patterns for better performance.
3359 """
3360 uri_path, _, _ = uri.partition("?")
3361 regex = self._build_regex(template)
3362 return bool(regex.match(uri_path))
3364 async def _notify_resource_added(self, resource: DbResource) -> None:
3365 """
3366 Notify subscribers of resource addition.
3368 Args:
3369 resource: Resource to add
3370 """
3371 event = {
3372 "type": "resource_added",
3373 "data": {
3374 "id": resource.id,
3375 "uri": resource.uri,
3376 "name": resource.name,
3377 "description": resource.description,
3378 "enabled": resource.enabled,
3379 },
3380 "timestamp": datetime.now(timezone.utc).isoformat(),
3381 }
3382 await self._publish_event(event)
3384 async def _notify_resource_updated(self, resource: DbResource) -> None:
3385 """
3386 Notify subscribers of resource update.
3388 Args:
3389 resource: Resource to update
3390 """
3391 event = {
3392 "type": "resource_updated",
3393 "data": {
3394 "id": resource.id,
3395 "uri": resource.uri,
3396 "enabled": resource.enabled,
3397 },
3398 "timestamp": datetime.now(timezone.utc).isoformat(),
3399 }
3400 await self._publish_event(event)
3402 async def _publish_event(self, event: Dict[str, Any]) -> None:
3403 """
3404 Publish event to all subscribers via the EventService.
3406 Args:
3407 event: Event to publish
3408 """
3409 await self._event_service.publish_event(event)
3411 # --- Resource templates ---
3412 async def list_resource_templates(
3413 self,
3414 db: Session,
3415 include_inactive: bool = False,
3416 user_email: Optional[str] = None,
3417 token_teams: Optional[List[str]] = None,
3418 tags: Optional[List[str]] = None,
3419 visibility: Optional[str] = None,
3420 ) -> List[ResourceTemplate]:
3421 """
3422 List resource templates with visibility-based access control.
3424 Args:
3425 db: Database session
3426 include_inactive: Whether to include inactive templates
3427 user_email: Email of requesting user (for private visibility check)
3428 token_teams: Teams from JWT. None = admin (no filtering),
3429 [] = public-only (no owner access), [...] = team-scoped
3430 tags (Optional[List[str]]): Filter resources by tags. If provided, only resources with at least one matching tag will be returned.
3431 visibility (Optional[str]): Filter by visibility (private, team, public).
3433 Returns:
3434 List of ResourceTemplate objects the user has access to
3436 Examples:
3437 >>> from mcpgateway.services.resource_service import ResourceService
3438 >>> from unittest.mock import MagicMock, patch
3439 >>> service = ResourceService()
3440 >>> db = MagicMock()
3441 >>> template_obj = MagicMock()
3442 >>> db.execute.return_value.scalars.return_value.all.return_value = [template_obj]
3443 >>> with patch('mcpgateway.services.resource_service.ResourceTemplate') as MockResourceTemplate:
3444 ... MockResourceTemplate.model_validate.return_value = 'resource_template'
3445 ... import asyncio
3446 ... result = asyncio.run(service.list_resource_templates(db))
3447 ... result == ['resource_template']
3448 True
3449 """
3450 query = select(DbResource).where(DbResource.uri_template.isnot(None))
3452 if not include_inactive:
3453 query = query.where(DbResource.enabled)
3455 # Apply visibility filtering when token_teams is set (non-admin access)
3456 if token_teams is not None:
3457 # Check if this is a public-only token (empty teams array)
3458 # Public-only tokens can ONLY see public templates - no owner access
3459 is_public_only_token = len(token_teams) == 0
3461 conditions = [DbResource.visibility == "public"]
3463 # Only include owner access for non-public-only tokens with user_email
3464 if not is_public_only_token and user_email:
3465 conditions.append(DbResource.owner_email == user_email)
3467 if token_teams:
3468 conditions.append(and_(DbResource.team_id.in_(token_teams), DbResource.visibility.in_(["team", "public"])))
3470 query = query.where(or_(*conditions))
3472 # Cursor-based pagination logic can be implemented here in the future.
3473 if visibility:
3474 query = query.where(DbResource.visibility == visibility)
3476 if tags:
3477 query = query.where(json_contains_tag_expr(db, DbResource.tags, tags, match_any=True))
3479 templates = db.execute(query).scalars().all()
3480 result = [ResourceTemplate.model_validate(t) for t in templates]
3481 return result
3483 # --- Metrics ---
3484 async def aggregate_metrics(self, db: Session) -> ResourceMetrics:
3485 """
3486 Aggregate metrics for all resource invocations across all resources.
3488 Combines recent raw metrics (within retention period) with historical
3489 hourly rollups for complete historical coverage. Uses in-memory caching
3490 (10s TTL) to reduce database load under high request rates.
3492 Args:
3493 db: Database session
3495 Returns:
3496 ResourceMetrics: Aggregated metrics from raw + hourly rollup tables.
3498 Examples:
3499 >>> from mcpgateway.services.resource_service import ResourceService
3500 >>> service = ResourceService()
3501 >>> # Method exists and is callable
3502 >>> callable(service.aggregate_metrics)
3503 True
3504 """
3505 # Check cache first (if enabled)
3506 # First-Party
3507 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel
3509 if is_cache_enabled():
3510 cached = metrics_cache.get("resources")
3511 if cached is not None:
3512 return ResourceMetrics(**cached)
3514 # Use combined raw + rollup query for full historical coverage
3515 # First-Party
3516 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel
3518 result = aggregate_metrics_combined(db, "resource")
3520 metrics = ResourceMetrics(
3521 total_executions=result.total_executions,
3522 successful_executions=result.successful_executions,
3523 failed_executions=result.failed_executions,
3524 failure_rate=result.failure_rate,
3525 min_response_time=result.min_response_time,
3526 max_response_time=result.max_response_time,
3527 avg_response_time=result.avg_response_time,
3528 last_execution_time=result.last_execution_time,
3529 )
3531 # Cache the result as dict for serialization compatibility (if enabled)
3532 if is_cache_enabled():
3533 metrics_cache.set("resources", metrics.model_dump())
3535 return metrics
3537 async def reset_metrics(self, db: Session) -> None:
3538 """
3539 Reset all resource metrics by deleting raw and hourly rollup records.
3541 Args:
3542 db: Database session
3544 Examples:
3545 >>> from mcpgateway.services.resource_service import ResourceService
3546 >>> from unittest.mock import MagicMock
3547 >>> service = ResourceService()
3548 >>> db = MagicMock()
3549 >>> db.execute = MagicMock()
3550 >>> db.commit = MagicMock()
3551 >>> import asyncio
3552 >>> asyncio.run(service.reset_metrics(db))
3553 """
3554 db.execute(delete(ResourceMetric))
3555 db.execute(delete(ResourceMetricsHourly))
3556 db.commit()
3558 # Invalidate metrics cache
3559 # First-Party
3560 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
3562 metrics_cache.invalidate("resources")
3563 metrics_cache.invalidate_prefix("top_resources:")
3566# Lazy singleton - created on first access, not at module import time.
3567# This avoids instantiation when only exception classes are imported.
3568_resource_service_instance = None # pylint: disable=invalid-name
3571def __getattr__(name: str):
3572 """Module-level __getattr__ for lazy singleton creation.
3574 Args:
3575 name: The attribute name being accessed.
3577 Returns:
3578 The resource_service singleton instance if name is "resource_service".
3580 Raises:
3581 AttributeError: If the attribute name is not "resource_service".
3582 """
3583 global _resource_service_instance # pylint: disable=global-statement
3584 if name == "resource_service":
3585 if _resource_service_instance is None:
3586 _resource_service_instance = ResourceService()
3587 return _resource_service_instance
3588 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")