Coverage for mcpgateway / services / resource_service.py: 98%
1297 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/resource_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Resource Service Implementation.
8This module implements resource management according to the MCP specification.
9It handles:
10- Resource registration and retrieval
11- Resource templates and URI handling
12- Resource subscriptions and updates
13- Content type management
14- Active/inactive resource management
16Examples:
17 >>> from mcpgateway.services.resource_service import ResourceService, ResourceError
18 >>> service = ResourceService()
19 >>> isinstance(service._event_service, EventService)
20 True
21"""
23# Standard
24import binascii
25from datetime import datetime, timezone
26from functools import lru_cache
27import mimetypes
28import os
29import re
30import ssl
31import time
32from types import SimpleNamespace
33from typing import Any, AsyncGenerator, Dict, List, Optional, Union
34import uuid
36# Third-Party
37import httpx
38from mcp import ClientSession
39from mcp.client.sse import sse_client
40from mcp.client.streamable_http import streamablehttp_client
41import parse
42from pydantic import ValidationError
43from sqlalchemy import and_, delete, desc, not_, or_, select
44from sqlalchemy.exc import IntegrityError, MultipleResultsFound, OperationalError
45from sqlalchemy.orm import joinedload, selectinload, Session
47# First-Party
48from mcpgateway.common.models import ResourceContent, ResourceContents, ResourceTemplate, TextContent
49from mcpgateway.common.validators import SecurityValidator
50from mcpgateway.config import settings
51from mcpgateway.db import EmailTeam
52from mcpgateway.db import EmailTeamMember as DbEmailTeamMember
53from mcpgateway.db import fresh_db_session
54from mcpgateway.db import Gateway as DbGateway
55from mcpgateway.db import get_for_update
56from mcpgateway.db import Resource as DbResource
57from mcpgateway.db import ResourceMetric, ResourceMetricsHourly
58from mcpgateway.db import ResourceSubscription as DbSubscription
59from mcpgateway.db import server_resource_association
60from mcpgateway.observability import create_span, set_span_attribute, set_span_error
61from mcpgateway.schemas import ResourceCreate, ResourceMetrics, ResourceRead, ResourceSubscription, ResourceUpdate, TopPerformer
62from mcpgateway.services.audit_trail_service import get_audit_trail_service
63from mcpgateway.services.base_service import BaseService
64from mcpgateway.services.content_security import ContentSizeError, ContentTypeError, get_content_security_service
65from mcpgateway.services.event_service import EventService
66from mcpgateway.services.logging_service import LoggingService
67from mcpgateway.services.mcp_session_pool import get_mcp_session_pool, TransportType
68from mcpgateway.services.metrics_buffer_service import get_metrics_buffer_service
69from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge
70from mcpgateway.services.oauth_manager import OAuthManager
71from mcpgateway.services.observability_service import current_trace_id, ObservabilityService
72from mcpgateway.services.structured_logger import get_structured_logger
73from mcpgateway.utils.gateway_access import build_gateway_auth_headers, check_gateway_access
74from mcpgateway.utils.metrics_common import build_top_performers
75from mcpgateway.utils.pagination import unified_paginate
76from mcpgateway.utils.services_auth import decode_auth
77from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr
78from mcpgateway.utils.ssl_context_cache import get_cached_ssl_context
79from mcpgateway.utils.trace_context import format_trace_team_scope
80from mcpgateway.utils.trace_redaction import is_input_capture_enabled, is_output_capture_enabled, serialize_trace_payload
81from mcpgateway.utils.url_auth import apply_query_param_auth, sanitize_exception_message
82from mcpgateway.utils.validate_signature import validate_signature
84# Plugin support imports (conditional)
85try:
86 # First-Party
87 from mcpgateway.plugins.framework import get_plugin_manager, GlobalContext, PluginContextTable, ResourceHookType, ResourcePostFetchPayload, ResourcePreFetchPayload
89 PLUGINS_AVAILABLE = True
90except ImportError:
91 PLUGINS_AVAILABLE = False
93# Cache import (lazy to avoid circular dependencies)
94_REGISTRY_CACHE = None
97def _get_registry_cache():
98 """Get registry cache singleton lazily.
100 Returns:
101 RegistryCache instance.
102 """
103 global _REGISTRY_CACHE # pylint: disable=global-statement
104 if _REGISTRY_CACHE is None:
105 # First-Party
106 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel
108 _REGISTRY_CACHE = registry_cache
109 return _REGISTRY_CACHE
112# Initialize logging service first
113logging_service = LoggingService()
114logger = logging_service.get_logger(__name__)
116# Initialize structured logger, audit trail, and metrics buffer for resource operations
117structured_logger = get_structured_logger("resource_service")
118audit_trail = get_audit_trail_service()
119metrics_buffer = get_metrics_buffer_service()
122class ResourceError(Exception):
123 """Base class for resource-related errors."""
126class ResourceNotFoundError(ResourceError):
127 """Raised when a requested resource is not found."""
130class ResourceURIConflictError(ResourceError):
131 """Raised when a resource URI conflicts with existing (active or inactive) resource."""
133 def __init__(self, uri: str, enabled: bool = True, resource_id: Optional[int] = None, visibility: str = "public") -> None:
134 """Initialize the error with resource information.
136 Args:
137 uri: The conflicting resource URI
138 enabled: Whether the existing resource is active
139 resource_id: ID of the existing resource if available
140 visibility: Visibility status of the resource
141 """
142 self.uri = uri
143 self.enabled = enabled
144 self.resource_id = resource_id
145 message = f"{visibility.capitalize()} Resource already exists with URI: {uri}"
146 logger.info(f"ResourceURIConflictError: {message}")
147 if not enabled:
148 message += f" (currently inactive, ID: {resource_id})"
149 super().__init__(message)
152class ResourceValidationError(ResourceError):
153 """Raised when resource validation fails."""
156class ResourceLockConflictError(ResourceError):
157 """Raised when a resource row is locked by another transaction.
159 Raises:
160 ResourceLockConflictError: When attempting to modify a resource that is
161 currently locked by another concurrent request.
162 """
165def _validate_resource_team_assignment(db: Session, user_email: Optional[str], target_team_id: Optional[str]) -> None:
166 """Validate team assignment for resource updates.
168 Args:
169 db: Database session used for membership checks.
170 user_email: Requesting user email. When omitted, ownership checks are skipped.
171 target_team_id: Team identifier to validate.
173 Raises:
174 ValueError: If team does not exist or caller lacks ownership.
175 """
176 if not target_team_id:
177 raise ValueError("Cannot set visibility to 'team' without a team_id")
179 team = db.query(EmailTeam).filter(EmailTeam.id == target_team_id).first()
180 if not team:
181 raise ValueError(f"Team {target_team_id} not found")
183 if not user_email:
184 return
186 membership = (
187 db.query(DbEmailTeamMember)
188 .filter(DbEmailTeamMember.team_id == target_team_id, DbEmailTeamMember.user_email == user_email, DbEmailTeamMember.is_active, DbEmailTeamMember.role == "owner")
189 .first()
190 )
191 if not membership:
192 raise ValueError("User membership in team not sufficient for this update.")
195class ResourceService(BaseService):
196 """Service for managing resources.
198 Handles:
199 - Resource registration and retrieval
200 - Resource templates and URIs
201 - Resource subscriptions
202 - Content type detection
203 - Active/inactive status management
204 """
206 _visibility_model_cls = DbResource
208 def __init__(self) -> None:
209 """Initialize the resource service."""
210 self._event_service = EventService(channel_name="mcpgateway:resource_events")
211 self._template_cache: Dict[str, ResourceTemplate] = {}
212 self.oauth_manager = OAuthManager(request_timeout=int(os.getenv("OAUTH_REQUEST_TIMEOUT", "30")), max_retries=int(os.getenv("OAUTH_MAX_RETRIES", "3")))
214 # Initialize plugin manager if plugins are enabled in settings
215 self._plugin_manager = None
216 if PLUGINS_AVAILABLE:
217 try:
218 self._plugin_manager = get_plugin_manager()
219 if self._plugin_manager:
220 logger.info("Plugin manager initialized for ResourceService with config: %s", settings.plugins.config_file)
221 except Exception as e:
222 logger.warning("Plugin manager initialization failed in ResourceService: %s", e)
223 self._plugin_manager = None
225 # Initialize mime types
226 mimetypes.init()
228 async def initialize(self) -> None:
229 """Initialize the service."""
230 logger.info("Initializing resource service")
231 await self._event_service.initialize()
233 async def shutdown(self) -> None:
234 """Shutdown the service."""
235 # Clear subscriptions
236 await self._event_service.shutdown()
237 logger.info("Resource service shutdown complete")
239 async def get_top_resources(self, db: Session, limit: Optional[int] = 5, include_deleted: bool = False) -> List[TopPerformer]:
240 """Retrieve the top-performing resources based on execution count.
242 Queries the database to get resources with their metrics, ordered by the number of executions
243 in descending order. Combines recent raw metrics with historical hourly rollups for complete
244 historical coverage. Uses the resource URI as the name field for TopPerformer objects.
245 Returns a list of TopPerformer objects containing resource details and performance metrics.
246 Results are cached for performance.
248 Args:
249 db (Session): Database session for querying resource metrics.
250 limit (Optional[int]): Maximum number of resources to return. Defaults to 5.
251 include_deleted (bool): Whether to include deleted resources from rollups.
253 Returns:
254 List[TopPerformer]: A list of TopPerformer objects, each containing:
255 - id: Resource ID.
256 - name: Resource URI (used as the name field).
257 - execution_count: Total number of executions.
258 - avg_response_time: Average response time in seconds, or None if no metrics.
259 - success_rate: Success rate percentage, or None if no metrics.
260 - last_execution: Timestamp of the last execution, or None if no metrics.
261 """
262 # Check cache first (if enabled)
263 # First-Party
264 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel
266 effective_limit = limit or 5
267 cache_key = f"top_resources:{effective_limit}:include_deleted={include_deleted}"
269 if is_cache_enabled():
270 cached = metrics_cache.get(cache_key)
271 if cached is not None:
272 return cached
274 # Use combined query that includes both raw metrics and rollup data
275 # Use name_column="uri" to maintain backward compatibility (resources show URI as name)
276 # First-Party
277 from mcpgateway.services.metrics_query_service import get_top_performers_combined # pylint: disable=import-outside-toplevel
279 results = get_top_performers_combined(
280 db=db,
281 metric_type="resource",
282 entity_model=DbResource,
283 limit=effective_limit,
284 name_column="uri", # Resources use URI as display name
285 include_deleted=include_deleted,
286 )
287 top_performers = build_top_performers(results)
289 # Cache the result (if enabled)
290 if is_cache_enabled():
291 metrics_cache.set(cache_key, top_performers)
293 return top_performers
295 def convert_resource_to_read(self, resource: DbResource, include_metrics: bool = False) -> ResourceRead:
296 """
297 Converts a DbResource instance into a ResourceRead model, optionally including aggregated metrics.
299 Args:
300 resource (DbResource): The ORM instance of the resource.
301 include_metrics (bool): Whether to include metrics in the result. Defaults to False.
302 Set to False for list operations to avoid N+1 query issues.
304 Returns:
305 ResourceRead: The Pydantic model representing the resource, optionally including aggregated metrics.
307 Examples:
308 >>> from types import SimpleNamespace
309 >>> from datetime import datetime, timezone
310 >>> svc = ResourceService()
311 >>> now = datetime.now(timezone.utc)
312 >>> # Fake metrics
313 >>> m1 = SimpleNamespace(is_success=True, response_time=0.1, timestamp=now)
314 >>> m2 = SimpleNamespace(is_success=False, response_time=0.3, timestamp=now)
315 >>> r = SimpleNamespace(
316 ... id="ca627760127d409080fdefc309147e08", uri='res://x', name='R', description=None, mime_type='text/plain', size=123,
317 ... created_at=now, updated_at=now, enabled=True, tags=[{"id": "t", "label": "T"}], metrics=[m1, m2],
318 ... metrics_summary={"total_executions": 2, "successful_executions": 1, "failed_executions": 1,
319 ... "failure_rate": 0.5, "min_response_time": 0.1, "max_response_time": 0.3,
320 ... "avg_response_time": 0.2, "last_execution_time": now}
321 ... )
322 >>> out = svc.convert_resource_to_read(r, include_metrics=True)
323 >>> out.metrics.total_executions
324 2
325 >>> out.metrics.successful_executions
326 1
327 """
328 resource_dict = resource.__dict__.copy()
329 # Remove SQLAlchemy state and any pre-existing 'metrics' attribute
330 resource_dict.pop("_sa_instance_state", None)
331 resource_dict.pop("metrics", None)
333 # Ensure required base fields are present even if SQLAlchemy hasn't loaded them into __dict__ yet
334 resource_dict["id"] = getattr(resource, "id", resource_dict.get("id"))
335 resource_dict["uri"] = getattr(resource, "uri", resource_dict.get("uri"))
336 resource_dict["name"] = getattr(resource, "name", resource_dict.get("name"))
337 resource_dict["description"] = getattr(resource, "description", resource_dict.get("description"))
338 resource_dict["mime_type"] = getattr(resource, "mime_type", resource_dict.get("mime_type"))
339 resource_dict["size"] = getattr(resource, "size", resource_dict.get("size"))
340 resource_dict["created_at"] = getattr(resource, "created_at", resource_dict.get("created_at"))
341 resource_dict["updated_at"] = getattr(resource, "updated_at", resource_dict.get("updated_at"))
342 resource_dict["is_active"] = getattr(resource, "is_active", resource_dict.get("is_active"))
343 resource_dict["enabled"] = getattr(resource, "enabled", resource_dict.get("enabled"))
345 # Compute aggregated metrics from the resource's metrics list (only if requested)
346 if include_metrics:
347 # Use metrics_summary which combines raw + hourly rollup data (matches tool_service pattern)
348 metrics = resource.metrics_summary
349 resource_dict["metrics"] = {
350 "total_executions": metrics["total_executions"],
351 "successful_executions": metrics["successful_executions"],
352 "failed_executions": metrics["failed_executions"],
353 "failure_rate": metrics["failure_rate"],
354 "min_response_time": metrics["min_response_time"],
355 "max_response_time": metrics["max_response_time"],
356 "avg_response_time": metrics["avg_response_time"],
357 "last_execution_time": metrics["last_execution_time"],
358 }
359 else:
360 resource_dict["metrics"] = None
362 raw_tags = resource.tags or []
363 normalized_tags = []
364 for tag in raw_tags:
365 if isinstance(tag, str):
366 normalized_tags.append(tag)
367 continue
368 if isinstance(tag, dict):
369 label = tag.get("label") or tag.get("name")
370 if label:
371 normalized_tags.append(label)
372 continue
373 label = getattr(tag, "label", None) or getattr(tag, "name", None)
374 if label:
375 normalized_tags.append(label)
376 resource_dict["tags"] = normalized_tags
377 resource_dict["team"] = getattr(resource, "team", None)
379 # Include metadata fields for proper API response
380 resource_dict["created_by"] = getattr(resource, "created_by", None)
381 resource_dict["modified_by"] = getattr(resource, "modified_by", None)
382 resource_dict["created_at"] = getattr(resource, "created_at", None)
383 resource_dict["updated_at"] = getattr(resource, "updated_at", None)
384 resource_dict["version"] = getattr(resource, "version", None)
385 resource_dict["gateway_id"] = getattr(resource, "gateway_id", None)
386 return ResourceRead.model_validate(resource_dict)
388 def _get_team_name(self, db: Session, team_id: Optional[str]) -> Optional[str]:
389 """Retrieve the team name given a team ID.
391 Args:
392 db (Session): Database session for querying teams.
393 team_id (Optional[str]): The ID of the team.
395 Returns:
396 Optional[str]: The name of the team if found, otherwise None.
397 """
398 if not team_id:
399 return None
400 team = db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first()
401 db.commit() # Release transaction to avoid idle-in-transaction
402 return team.name if team else None
404 async def register_resource(
405 self,
406 db: Session,
407 resource: ResourceCreate,
408 created_by: Optional[str] = None,
409 created_from_ip: Optional[str] = None,
410 created_via: Optional[str] = None,
411 created_user_agent: Optional[str] = None,
412 import_batch_id: Optional[str] = None,
413 federation_source: Optional[str] = None,
414 team_id: Optional[str] = None,
415 owner_email: Optional[str] = None,
416 visibility: Optional[str] = "public",
417 ) -> ResourceRead:
418 """Register a new resource.
420 MIME Type Resolution Priority:
421 1. **User-provided type** (highest priority) - Caller explicitly declares content type
422 2. **URI-detected type** - Fallback via ``mimetypes.guess_type(uri)``
423 3. **Content-based fallback** - ``text/plain`` for strings, ``application/octet-stream`` for bytes
425 Args:
426 db: Database session
427 resource: Resource creation schema
428 created_by: User who created the resource
429 created_from_ip: IP address of the creator
430 created_via: Method used to create the resource (e.g., API, UI)
431 created_user_agent: User agent of the creator
432 import_batch_id: Optional batch ID for bulk imports
433 federation_source: Optional source of the resource if federated
434 team_id (Optional[str]): Team ID to assign the resource to.
435 owner_email (Optional[str]): Email of the user who owns this resource.
436 visibility (str): Resource visibility level (private, team, public).
438 Returns:
439 Created resource information
441 Raises:
442 IntegrityError: If a database integrity error occurs.
443 ResourceURIConflictError: If a resource with the same URI already exists.
444 ResourceError: For other resource registration errors
445 ContentSizeError: For content size exceed
446 ContentTypeError: If the MIME type is not allowed
448 Examples:
449 >>> from mcpgateway.services.resource_service import ResourceService
450 >>> from unittest.mock import MagicMock, AsyncMock, patch
451 >>> from mcpgateway.schemas import ResourceRead
452 >>> service = ResourceService()
453 >>> db = MagicMock()
454 >>> resource = MagicMock()
455 >>> resource.uri = "test://example"
456 >>> resource.content = "test content"
457 >>> resource.mime_type = None
458 >>> db.execute.return_value.scalar_one_or_none.return_value = None
459 >>> db.add = MagicMock()
460 >>> db.commit = MagicMock()
461 >>> db.refresh = MagicMock()
462 >>> service._notify_resource_added = AsyncMock()
463 >>> service._detect_mime_type_from_uri = MagicMock(return_value=None)
464 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read')
465 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read')
466 >>> import asyncio
467 >>> asyncio.run(service.register_resource(db, resource))
468 'resource_read'
469 """
470 try:
471 logger.info(f"Registering resource: {resource.uri}")
473 # Validate content size BEFORE any database operations
474 content_security = get_content_security_service()
475 content_to_validate = ""
477 # Extract content from resource for validation
478 # Use raw bytes for accurate size measurement to prevent bypass via non-UTF-8 content
479 if hasattr(resource, "content") and resource.content:
480 if isinstance(resource.content, bytes):
481 # Validate using raw bytes to get accurate size
482 content_to_validate = resource.content
483 else:
484 # Convert string to bytes for consistent size measurement
485 content_to_validate = str(resource.content)
487 content_security.validate_resource_size(content=content_to_validate, uri=resource.uri, user_email=created_by, ip_address=created_from_ip)
489 # MIME type resolution priority:
490 # 1. User-provided type (caller explicitly declares the content type)
491 # 2. URI-detected type (fallback when user omits the field)
492 # 3. Content-based fallback (text/plain for str, application/octet-stream for bytes)
493 if resource.mime_type:
494 mime_type = resource.mime_type
495 else:
496 mime_type = self._detect_mime_type(resource.uri, resource.content)
497 logger.info(f"Auto-detected MIME type for {resource.uri}: {mime_type}")
499 # Validate MIME type against allowlist
500 content_security.validate_resource_mime_type(
501 mime_type=mime_type,
502 uri=resource.uri,
503 user_email=created_by,
504 ip_address=created_from_ip,
505 )
507 # Extract gateway_id from resource if present
508 gateway_id = getattr(resource, "gateway_id", None)
510 # Check for existing server with the same uri
511 if visibility.lower() == "public":
512 logger.info(f"visibility:: {visibility}")
513 # Check for existing public resource with the same uri and gateway_id
514 existing_resource = db.execute(select(DbResource).where(DbResource.uri == resource.uri, DbResource.visibility == "public", DbResource.gateway_id == gateway_id)).scalar_one_or_none()
515 if existing_resource:
516 raise ResourceURIConflictError(resource.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility)
517 elif visibility.lower() == "team" and team_id:
518 # Check for existing team resource with the same uri and gateway_id
519 existing_resource = db.execute(
520 select(DbResource).where(DbResource.uri == resource.uri, DbResource.visibility == "team", DbResource.team_id == team_id, DbResource.gateway_id == gateway_id)
521 ).scalar_one_or_none()
522 if existing_resource:
523 raise ResourceURIConflictError(resource.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility)
525 # Determine content storage (mime_type already detected above)
526 is_text = mime_type and mime_type.startswith("text/") or isinstance(resource.content, str)
528 # Create DB model
529 db_resource = DbResource(
530 uri=resource.uri,
531 name=resource.name,
532 title=resource.title,
533 description=resource.description,
534 mime_type=mime_type,
535 uri_template=resource.uri_template,
536 text_content=resource.content if is_text else None,
537 binary_content=(resource.content.encode() if is_text and isinstance(resource.content, str) else resource.content if isinstance(resource.content, bytes) else None),
538 size=len(resource.content) if resource.content else 0,
539 tags=resource.tags or [],
540 created_by=created_by,
541 created_from_ip=created_from_ip,
542 created_via=created_via,
543 created_user_agent=created_user_agent,
544 import_batch_id=import_batch_id,
545 federation_source=federation_source,
546 version=1,
547 # Team scoping fields - use schema values if provided, otherwise fallback to parameters
548 team_id=getattr(resource, "team_id", None) or team_id,
549 owner_email=getattr(resource, "owner_email", None) or owner_email or created_by,
550 # Endpoint visibility parameter takes precedence over schema default
551 visibility=visibility if visibility is not None else (getattr(resource, "visibility", None) or "public"),
552 gateway_id=gateway_id,
553 )
555 # Add to DB
556 db.add(db_resource)
557 db.commit()
558 db.refresh(db_resource)
560 # Notify subscribers
561 await self._notify_resource_added(db_resource)
563 logger.info(f"Registered resource: {resource.uri}")
565 # Structured logging: Audit trail for resource creation
566 audit_trail.log_action(
567 user_id=created_by or "system",
568 action="create_resource",
569 resource_type="resource",
570 resource_id=str(db_resource.id),
571 resource_name=db_resource.name,
572 user_email=owner_email,
573 team_id=team_id,
574 client_ip=created_from_ip,
575 user_agent=created_user_agent,
576 new_values={
577 "uri": db_resource.uri,
578 "name": db_resource.name,
579 "visibility": visibility,
580 "mime_type": db_resource.mime_type,
581 },
582 context={
583 "created_via": created_via,
584 "import_batch_id": import_batch_id,
585 "federation_source": federation_source,
586 },
587 db=db,
588 )
590 # Structured logging: Log successful resource creation
591 structured_logger.log(
592 level="INFO",
593 message="Resource created successfully",
594 event_type="resource_created",
595 component="resource_service",
596 user_id=created_by,
597 user_email=owner_email,
598 team_id=team_id,
599 resource_type="resource",
600 resource_id=str(db_resource.id),
601 custom_fields={
602 "resource_uri": db_resource.uri,
603 "resource_name": db_resource.name,
604 "visibility": visibility,
605 },
606 )
608 db_resource.team = self._get_team_name(db, db_resource.team_id)
609 return self.convert_resource_to_read(db_resource)
610 except IntegrityError as ie:
611 logger.error(f"IntegrityErrors in group: {ie}")
613 # Structured logging: Log database integrity error
614 structured_logger.log(
615 level="ERROR",
616 message="Resource creation failed due to database integrity error",
617 event_type="resource_creation_failed",
618 component="resource_service",
619 user_id=created_by,
620 user_email=owner_email,
621 error=ie,
622 custom_fields={
623 "resource_uri": resource.uri,
624 },
625 )
626 raise ie
627 except ResourceURIConflictError as rce:
628 logger.error(f"ResourceURIConflictError in group: {resource.uri}")
630 # Structured logging: Log URI conflict error
631 structured_logger.log(
632 level="WARNING",
633 message="Resource creation failed due to URI conflict",
634 event_type="resource_uri_conflict",
635 component="resource_service",
636 user_id=created_by,
637 user_email=owner_email,
638 custom_fields={
639 "resource_uri": resource.uri,
640 "visibility": visibility,
641 },
642 )
643 raise rce
644 except ContentSizeError as cse:
646 structured_logger.log(
647 level="ERROR",
648 message=f"Resource content size limit exceeded: {cse.actual_size} bytes (max: {cse.max_size} bytes)",
649 event_type="resource_size_exceed",
650 component="resource_service",
651 user_id=created_by,
652 user_email=owner_email,
653 custom_fields={
654 "resource_uri": resource.uri,
655 "visibility": visibility,
656 },
657 )
658 raise cse
659 except ContentTypeError as cte:
661 structured_logger.log(
662 level="ERROR",
663 message=f"Resource MIME type not allowed: {cte.mime_type}",
664 event_type="resource_mime_type_rejected",
665 component="resource_service",
666 user_id=created_by,
667 user_email=owner_email,
668 custom_fields={
669 "resource_uri": resource.uri,
670 "mime_type": cte.mime_type,
671 "visibility": visibility,
672 },
673 )
674 raise cte
675 except Exception as e:
676 db.rollback()
678 # Structured logging: Log generic resource creation failure
679 structured_logger.log(
680 level="ERROR",
681 message="Resource creation failed",
682 event_type="resource_creation_failed",
683 component="resource_service",
684 user_id=created_by,
685 user_email=owner_email,
686 error=e,
687 custom_fields={
688 "resource_uri": resource.uri,
689 },
690 )
691 raise ResourceError(f"Failed to register resource: {str(e)}")
693 async def register_resources_bulk(
694 self,
695 db: Session,
696 resources: List[ResourceCreate],
697 created_by: Optional[str] = None,
698 created_from_ip: Optional[str] = None,
699 created_via: Optional[str] = None,
700 created_user_agent: Optional[str] = None,
701 import_batch_id: Optional[str] = None,
702 federation_source: Optional[str] = None,
703 team_id: Optional[str] = None,
704 owner_email: Optional[str] = None,
705 visibility: Optional[str] = "public",
706 conflict_strategy: str = "skip",
707 ) -> Dict[str, Any]:
708 """Register multiple resources in bulk with a single commit.
710 This method provides significant performance improvements over individual
711 resource registration by:
712 - Using db.add_all() instead of individual db.add() calls
713 - Performing a single commit for all resources
714 - Batch conflict detection
715 - Chunking for very large imports (>500 items)
717 Args:
718 db: Database session
719 resources: List of resource creation schemas
720 created_by: Username who created these resources
721 created_from_ip: IP address of creator
722 created_via: Creation method (ui, api, import, federation)
723 created_user_agent: User agent of creation request
724 import_batch_id: UUID for bulk import operations
725 federation_source: Source gateway for federated resources
726 team_id: Team ID to assign the resources to
727 owner_email: Email of the user who owns these resources
728 visibility: Resource visibility level (private, team, public)
729 conflict_strategy: How to handle conflicts (skip, update, rename, fail)
731 Returns:
732 Dict with statistics:
733 - created: Number of resources created
734 - updated: Number of resources updated
735 - skipped: Number of resources skipped
736 - failed: Number of resources that failed
737 - errors: List of error messages
739 Raises:
740 ResourceError: If bulk registration fails critically
742 Examples:
743 >>> from mcpgateway.services.resource_service import ResourceService
744 >>> from unittest.mock import MagicMock
745 >>> service = ResourceService()
746 >>> db = MagicMock()
747 >>> resources = [MagicMock(), MagicMock()]
748 >>> import asyncio
749 >>> try:
750 ... result = asyncio.run(service.register_resources_bulk(db, resources))
751 ... except Exception:
752 ... pass
753 """
754 if not resources:
755 return {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []}
757 stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []}
759 # Process in chunks to avoid memory issues and SQLite parameter limits
760 chunk_size = 500
762 for chunk_start in range(0, len(resources), chunk_size):
763 chunk = resources[chunk_start : chunk_start + chunk_size]
765 try:
766 # Batch check for existing resources to detect conflicts
767 resource_uris = [resource.uri for resource in chunk]
769 # Build base query conditions
770 if visibility.lower() == "public":
771 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "public"]
772 elif visibility.lower() == "team" and team_id:
773 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "team", DbResource.team_id == team_id]
774 else:
775 # Private resources - check by owner
776 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "private", DbResource.owner_email == (owner_email or created_by)]
778 existing_resources_query = select(DbResource).where(*base_conditions)
779 existing_resources = db.execute(existing_resources_query).scalars().all()
780 # Use (uri, gateway_id) tuple as key for proper conflict detection with gateway_id scoping
781 existing_resources_map = {(r.uri, r.gateway_id): r for r in existing_resources}
783 resources_to_add = []
784 resources_to_update = []
786 for resource in chunk:
787 try:
788 # Validate content size before processing
789 content_security = get_content_security_service()
790 if hasattr(resource, "content") and resource.content:
791 content_security.validate_resource_size(
792 content=resource.content,
793 uri=resource.uri,
794 user_email=created_by,
795 ip_address=created_from_ip,
796 )
798 # MIME type resolution (same priority as register_resource):
799 # user-provided > URI-detected > content-based fallback
800 if getattr(resource, "mime_type", None):
801 bulk_mime_type = resource.mime_type
802 else:
803 bulk_mime_type = self._detect_mime_type(resource.uri, getattr(resource, "content", "") or "")
805 # Validate MIME type against allowlist
806 content_security.validate_resource_mime_type(
807 mime_type=bulk_mime_type,
808 uri=resource.uri,
809 user_email=created_by,
810 ip_address=created_from_ip,
811 )
813 # Use provided parameters or schema values
814 resource_team_id = team_id if team_id is not None else getattr(resource, "team_id", None)
815 resource_owner_email = owner_email or getattr(resource, "owner_email", None) or created_by
816 resource_visibility = visibility if visibility is not None else getattr(resource, "visibility", "public")
817 resource_gateway_id = getattr(resource, "gateway_id", None)
819 # Look up existing resource by (uri, gateway_id) tuple
820 existing_resource = existing_resources_map.get((resource.uri, resource_gateway_id))
822 if existing_resource:
823 # Handle conflict based on strategy
824 if conflict_strategy == "skip":
825 stats["skipped"] += 1
826 continue
827 if conflict_strategy == "update":
828 # Update existing resource
829 existing_resource.name = resource.name
830 existing_resource.title = getattr(resource, "title", None)
831 existing_resource.description = resource.description
832 existing_resource.mime_type = bulk_mime_type
833 existing_resource.size = getattr(resource, "size", None)
834 existing_resource.uri_template = resource.uri_template
835 existing_resource.tags = resource.tags or []
836 existing_resource.modified_by = created_by
837 existing_resource.modified_from_ip = created_from_ip
838 existing_resource.modified_via = created_via
839 existing_resource.modified_user_agent = created_user_agent
840 existing_resource.updated_at = datetime.now(timezone.utc)
841 existing_resource.version = (existing_resource.version or 1) + 1
843 resources_to_update.append(existing_resource)
844 stats["updated"] += 1
845 elif conflict_strategy == "rename":
846 # Create with renamed resource
847 new_uri = f"{resource.uri}_imported_{int(datetime.now().timestamp())}"
848 db_resource = DbResource(
849 uri=new_uri,
850 name=resource.name,
851 title=getattr(resource, "title", None),
852 description=resource.description,
853 mime_type=bulk_mime_type,
854 size=getattr(resource, "size", None),
855 uri_template=resource.uri_template,
856 gateway_id=getattr(resource, "gateway_id", None),
857 tags=resource.tags or [],
858 created_by=created_by,
859 created_from_ip=created_from_ip,
860 created_via=created_via,
861 created_user_agent=created_user_agent,
862 import_batch_id=import_batch_id,
863 federation_source=federation_source,
864 version=1,
865 team_id=resource_team_id,
866 owner_email=resource_owner_email,
867 visibility=resource_visibility,
868 )
869 resources_to_add.append(db_resource)
870 stats["created"] += 1
871 elif conflict_strategy == "fail":
872 stats["failed"] += 1
873 stats["errors"].append(f"Resource URI conflict: {resource.uri}")
874 continue
875 else:
876 # Create new resource
877 db_resource = DbResource(
878 uri=resource.uri,
879 name=resource.name,
880 title=getattr(resource, "title", None),
881 description=resource.description,
882 mime_type=bulk_mime_type,
883 size=getattr(resource, "size", None),
884 uri_template=resource.uri_template,
885 gateway_id=getattr(resource, "gateway_id", None),
886 tags=resource.tags or [],
887 created_by=created_by,
888 created_from_ip=created_from_ip,
889 created_via=created_via,
890 created_user_agent=created_user_agent,
891 import_batch_id=import_batch_id,
892 federation_source=federation_source,
893 version=1,
894 team_id=resource_team_id,
895 owner_email=resource_owner_email,
896 visibility=resource_visibility,
897 )
898 resources_to_add.append(db_resource)
899 stats["created"] += 1
901 except Exception as e:
902 stats["failed"] += 1
903 stats["errors"].append(f"Failed to process resource {resource.uri}: {str(e)}")
904 logger.warning(f"Failed to process resource {resource.uri} in bulk operation: {str(e)}")
905 continue
907 # Bulk add new resources
908 if resources_to_add:
909 db.add_all(resources_to_add)
911 # Commit the chunk
912 db.commit()
914 # Refresh resources for notifications and audit trail
915 for db_resource in resources_to_add:
916 db.refresh(db_resource)
917 # Notify subscribers
918 await self._notify_resource_added(db_resource)
920 # Log bulk audit trail entry
921 if resources_to_add or resources_to_update:
922 audit_trail.log_action(
923 user_id=created_by or "system",
924 action="bulk_create_resources" if resources_to_add else "bulk_update_resources",
925 resource_type="resource",
926 resource_id=import_batch_id or "bulk_operation",
927 resource_name=f"Bulk operation: {len(resources_to_add)} created, {len(resources_to_update)} updated",
928 user_email=owner_email,
929 team_id=team_id,
930 client_ip=created_from_ip,
931 user_agent=created_user_agent,
932 new_values={
933 "resources_created": len(resources_to_add),
934 "resources_updated": len(resources_to_update),
935 "visibility": visibility,
936 },
937 context={
938 "created_via": created_via,
939 "import_batch_id": import_batch_id,
940 "federation_source": federation_source,
941 "conflict_strategy": conflict_strategy,
942 },
943 db=db,
944 )
946 logger.info(f"Bulk registered {len(resources_to_add)} resources, updated {len(resources_to_update)} resources in chunk")
948 except Exception as e:
949 db.rollback()
950 logger.error(f"Failed to process chunk in bulk resource registration: {str(e)}")
951 stats["failed"] += len(chunk)
952 stats["errors"].append(f"Chunk processing failed: {str(e)}")
953 continue
955 # Final structured logging
956 structured_logger.log(
957 level="INFO",
958 message="Bulk resource registration completed",
959 event_type="resources_bulk_created",
960 component="resource_service",
961 user_id=created_by,
962 user_email=owner_email,
963 team_id=team_id,
964 resource_type="resource",
965 custom_fields={
966 "resources_created": stats["created"],
967 "resources_updated": stats["updated"],
968 "resources_skipped": stats["skipped"],
969 "resources_failed": stats["failed"],
970 "total_resources": len(resources),
971 "visibility": visibility,
972 "conflict_strategy": conflict_strategy,
973 },
974 )
976 return stats
978 async def _check_resource_access(
979 self,
980 db: Session,
981 resource: DbResource,
982 user_email: Optional[str],
983 token_teams: Optional[List[str]],
984 ) -> bool:
985 """Check if user has access to a resource based on visibility rules.
987 Implements the same access control logic as list_resources() for consistency.
989 Args:
990 db: Database session for team membership lookup if needed.
991 resource: Resource ORM object with visibility, team_id, owner_email.
992 user_email: Email of the requesting user (None = unauthenticated).
993 token_teams: List of team IDs from token.
994 - None = unrestricted admin access
995 - [] = public-only token
996 - [...] = team-scoped token
998 Returns:
999 True if access is allowed, False otherwise.
1000 """
1001 visibility = getattr(resource, "visibility", "public")
1002 resource_team_id = getattr(resource, "team_id", None)
1003 resource_owner_email = getattr(resource, "owner_email", None)
1005 # Public resources are accessible by everyone
1006 if visibility == "public":
1007 return True
1009 # Admin bypass: token_teams=None AND user_email=None means unrestricted admin
1010 # This happens when is_admin=True and no team scoping in token
1011 if token_teams is None and user_email is None:
1012 return True
1014 # No user context (but not admin) = deny access to non-public resources
1015 if not user_email:
1016 return False
1018 # Public-only tokens (empty teams array) can ONLY access public resources
1019 is_public_only_token = token_teams is not None and len(token_teams) == 0
1020 if is_public_only_token:
1021 return False # Already checked public above
1023 # Owner can access their own private resources
1024 if visibility == "private" and resource_owner_email and resource_owner_email == user_email:
1025 return True
1027 # Team resources: check team membership (matches list_resources behavior)
1028 if resource_team_id:
1029 # Use token_teams if provided, otherwise look up from DB
1030 if token_teams is not None:
1031 team_ids = token_teams
1032 else:
1033 if db is None:
1034 logger.warning("Missing database session for team-scoped resource access check")
1035 return False
1036 # First-Party
1037 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel
1039 team_service = TeamManagementService(db)
1040 user_teams = await team_service.get_user_teams(user_email)
1041 team_ids = [team.id for team in user_teams]
1043 # Team/public visibility allows access if user is in the team
1044 if visibility in ["team", "public"] and resource_team_id in team_ids:
1045 return True
1047 return False
1049 async def list_resources(
1050 self,
1051 db: Session,
1052 include_inactive: bool = False,
1053 cursor: Optional[str] = None,
1054 tags: Optional[List[str]] = None,
1055 gateway_id: Optional[str] = None,
1056 limit: Optional[int] = None,
1057 page: Optional[int] = None,
1058 per_page: Optional[int] = None,
1059 user_email: Optional[str] = None,
1060 team_id: Optional[str] = None,
1061 visibility: Optional[str] = None,
1062 token_teams: Optional[List[str]] = None,
1063 ) -> Union[tuple[List[ResourceRead], Optional[str]], Dict[str, Any]]:
1064 """
1065 Retrieve a list of registered resources from the database with pagination support.
1067 This method retrieves resources from the database and converts them into a list
1068 of ResourceRead objects. It supports filtering out inactive resources based on the
1069 include_inactive parameter and cursor-based pagination.
1071 Args:
1072 db (Session): The SQLAlchemy database session.
1073 include_inactive (bool): If True, include inactive resources in the result.
1074 Defaults to False.
1075 cursor (Optional[str], optional): An opaque cursor token for pagination.
1076 Opaque base64-encoded string containing last item's ID and created_at.
1077 tags (Optional[List[str]]): Filter resources by tags. If provided, only resources with at least one matching tag will be returned.
1078 gateway_id (Optional[str]): Filter resources by gateway ID. Accepts the literal value 'null' to match NULL gateway_id.
1079 limit (Optional[int]): Maximum number of resources to return. Use 0 for all resources (no limit).
1080 If not specified, uses pagination_default_page_size.
1081 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
1082 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size.
1083 user_email (Optional[str]): User email for team-based access control. If None, no access control is applied.
1084 team_id (Optional[str]): Filter by specific team ID. Requires user_email for access validation.
1085 visibility (Optional[str]): Filter by visibility (private, team, public).
1086 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API token access
1087 where the token scope should be respected instead of the user's full team memberships.
1089 Returns:
1090 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}}
1091 If cursor is provided or neither: tuple of (list of ResourceRead objects, next_cursor).
1093 Examples:
1094 >>> from mcpgateway.services.resource_service import ResourceService
1095 >>> from unittest.mock import MagicMock
1096 >>> service = ResourceService()
1097 >>> db = MagicMock()
1098 >>> resource_read = MagicMock()
1099 >>> service.convert_resource_to_read = MagicMock(return_value=resource_read)
1100 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()]
1101 >>> import asyncio
1102 >>> resources, next_cursor = asyncio.run(service.list_resources(db))
1103 >>> isinstance(resources, list)
1104 True
1106 With tags filter:
1107 >>> db2 = MagicMock()
1108 >>> bind = MagicMock()
1109 >>> bind.dialect = MagicMock()
1110 >>> bind.dialect.name = "sqlite" # or "postgresql"
1111 >>> db2.get_bind.return_value = bind
1112 >>> db2.execute.return_value.scalars.return_value.all.return_value = [MagicMock()]
1113 >>> result2, _ = asyncio.run(service.list_resources(db2, tags=['api']))
1114 >>> isinstance(result2, list)
1115 True
1116 """
1117 with create_span(
1118 "resource.list",
1119 {
1120 "include_inactive": include_inactive,
1121 "tags.count": len(tags) if tags else 0,
1122 "gateway_id": gateway_id,
1123 "limit": limit,
1124 "page": page,
1125 "per_page": per_page,
1126 "user.email": user_email,
1127 "team.scope": format_trace_team_scope(token_teams) if token_teams is not None else None,
1128 "team.filter": team_id,
1129 "visibility": visibility,
1130 },
1131 ):
1132 # Check cache for first page only (cursor=None)
1133 # Skip caching when:
1134 # - user_email is provided (team-filtered results are user-specific)
1135 # - token_teams is set (scoped access, e.g., public-only or team-scoped tokens)
1136 # - page-based pagination is used
1137 # This prevents cache poisoning where admin results could leak to public-only requests
1138 cache = _get_registry_cache()
1139 if cursor is None and user_email is None and token_teams is None and page is None:
1140 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None, gateway_id=gateway_id, limit=limit, visibility=visibility)
1141 cached = await cache.get("resources", filters_hash)
1142 if cached is not None:
1143 # Reconstruct ResourceRead objects from cached dicts
1144 cached_resources = [ResourceRead.model_validate(r) for r in cached["resources"]]
1145 return (cached_resources, cached.get("next_cursor"))
1147 # Build base query with ordering
1148 query = select(DbResource).where(DbResource.uri_template.is_(None)).order_by(desc(DbResource.created_at), desc(DbResource.id))
1150 # Apply active/inactive filter
1151 if not include_inactive:
1152 query = query.where(DbResource.enabled)
1154 query = await self._apply_access_control(query, db, user_email, token_teams, team_id)
1156 if visibility:
1157 query = query.where(DbResource.visibility == visibility)
1159 # Add gateway_id filtering if provided
1160 if gateway_id:
1161 if gateway_id.lower() == "null":
1162 query = query.where(DbResource.gateway_id.is_(None))
1163 else:
1164 query = query.where(DbResource.gateway_id == gateway_id)
1166 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats)
1167 if tags:
1168 query = query.where(json_contains_tag_expr(db, DbResource.tags, tags, match_any=True))
1170 # Use unified pagination helper - handles both page and cursor pagination
1171 pag_result = await unified_paginate(
1172 db=db,
1173 query=query,
1174 page=page,
1175 per_page=per_page,
1176 cursor=cursor,
1177 limit=limit,
1178 base_url="/admin/resources", # Used for page-based links
1179 query_params={"include_inactive": include_inactive} if include_inactive else {},
1180 )
1182 next_cursor = None
1183 # Extract servers based on pagination type
1184 if page is not None:
1185 # Page-based: pag_result is a dict
1186 resources_db = pag_result["data"]
1187 else:
1188 # Cursor-based: pag_result is a tuple
1189 resources_db, next_cursor = pag_result
1191 # Fetch team names for the resources (common for both pagination types)
1192 team_ids_set = {s.team_id for s in resources_db if s.team_id}
1193 team_map = {}
1194 if team_ids_set:
1195 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(team_ids_set), EmailTeam.is_active.is_(True))).all()
1196 team_map = {team.id: team.name for team in teams}
1198 db.commit() # Release transaction to avoid idle-in-transaction
1200 # Convert to ResourceRead (common for both pagination types)
1201 result = []
1202 for s in resources_db:
1203 try:
1204 s.team = team_map.get(s.team_id) if s.team_id else None
1205 result.append(self.convert_resource_to_read(s, include_metrics=False))
1206 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
1207 logger.exception(f"Failed to convert resource {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}")
1208 # Continue with remaining resources instead of failing completely
1209 # Return appropriate format based on pagination type
1210 if page is not None:
1211 # Page-based format
1212 return {
1213 "data": result,
1214 "pagination": pag_result["pagination"],
1215 "links": pag_result["links"],
1216 }
1218 # Cursor-based format
1220 # Cache first page results - only for non-user-specific/non-scoped queries
1221 # Must match the same conditions as cache lookup to prevent cache poisoning
1222 if cursor is None and user_email is None and token_teams is None:
1223 try:
1224 cache_data = {"resources": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor}
1225 await cache.set("resources", cache_data, filters_hash)
1226 except AttributeError:
1227 pass # Skip caching if result objects don't support model_dump (e.g., in doctests)
1229 return (result, next_cursor)
1231 async def list_resources_for_user(
1232 self, db: Session, user_email: str, team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100
1233 ) -> List[ResourceRead]:
1234 """
1235 DEPRECATED: Use list_resources() with user_email parameter instead.
1237 List resources user has access to with team filtering.
1239 This method is maintained for backward compatibility but is no longer used.
1240 New code should call list_resources() with user_email, team_id, and visibility parameters.
1242 Args:
1243 db: Database session
1244 user_email: Email of the user requesting resources
1245 team_id: Optional team ID to filter by specific team
1246 visibility: Optional visibility filter (private, team, public)
1247 include_inactive: Whether to include inactive resources
1248 skip: Number of resources to skip for pagination
1249 limit: Maximum number of resources to return
1251 Returns:
1252 List[ResourceRead]: Resources the user has access to
1254 Examples:
1255 >>> from unittest.mock import MagicMock
1256 >>> import asyncio
1257 >>> service = ResourceService()
1258 >>> db = MagicMock()
1259 >>> # Patch out TeamManagementService so it doesn't run real logic
1260 >>> import mcpgateway.services.resource_service as _rs
1261 >>> class FakeTeamService:
1262 ... def __init__(self, db): pass
1263 ... async def get_user_teams(self, email): return []
1264 >>> _rs.TeamManagementService = FakeTeamService
1265 >>> # Force DB to return one fake row with a 'team' attribute
1266 >>> class FakeResource:
1267 ... team_id = None
1268 >>> fake_resource = FakeResource()
1269 >>> db.execute.return_value.scalars.return_value.all.return_value = [fake_resource]
1270 >>> service.convert_resource_to_read = MagicMock(return_value="converted")
1271 >>> asyncio.run(service.list_resources_for_user(db, "user@example.com"))
1272 ['converted']
1274 Without team_id (default/public access):
1275 >>> db2 = MagicMock()
1276 >>> class FakeResource2:
1277 ... team_id = None
1278 >>> fake_resource2 = FakeResource2()
1279 >>> db2.execute.return_value.scalars.return_value.all.return_value = [fake_resource2]
1280 >>> service.convert_resource_to_read = MagicMock(return_value="converted2")
1281 >>> out2 = asyncio.run(service.list_resources_for_user(db2, "user@example.com"))
1282 >>> out2
1283 ['converted2']
1284 """
1285 # First-Party
1286 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel
1288 # Build query following existing patterns from list_resources()
1289 team_service = TeamManagementService(db)
1290 user_teams = await team_service.get_user_teams(user_email)
1291 team_ids = [team.id for team in user_teams]
1293 # Build query following existing patterns from list_resources()
1294 query = select(DbResource)
1296 # Apply active/inactive filter
1297 if not include_inactive:
1298 query = query.where(DbResource.enabled)
1300 if team_id:
1301 if team_id not in team_ids:
1302 return [] # No access to team
1304 access_conditions = []
1305 # Filter by specific team
1306 access_conditions.append(and_(DbResource.team_id == team_id, DbResource.visibility.in_(["team", "public"])))
1308 access_conditions.append(and_(DbResource.team_id == team_id, DbResource.owner_email == user_email))
1310 query = query.where(or_(*access_conditions))
1311 else:
1312 # Get user's accessible teams
1313 # Build access conditions following existing patterns
1314 access_conditions = []
1315 # 1. User's personal resources (owner_email matches)
1316 access_conditions.append(DbResource.owner_email == user_email)
1317 # 2. Team resources where user is member
1318 if team_ids:
1319 access_conditions.append(and_(DbResource.team_id.in_(team_ids), DbResource.visibility.in_(["team", "public"])))
1320 # 3. Public resources (if visibility allows)
1321 access_conditions.append(DbResource.visibility == "public")
1323 query = query.where(or_(*access_conditions))
1325 # Apply visibility filter if specified
1326 if visibility:
1327 query = query.where(DbResource.visibility == visibility)
1329 # Apply pagination following existing patterns
1330 query = query.offset(skip).limit(limit)
1332 resources = db.execute(query).scalars().all()
1334 # Batch fetch team names to avoid N+1 queries
1335 resource_team_ids = {r.team_id for r in resources if r.team_id}
1336 team_map = {}
1337 if resource_team_ids:
1338 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(resource_team_ids), EmailTeam.is_active.is_(True))).all()
1339 team_map = {str(team.id): team.name for team in teams}
1341 db.commit() # Release transaction to avoid idle-in-transaction
1343 result = []
1344 for t in resources:
1345 try:
1346 t.team = team_map.get(str(t.team_id)) if t.team_id else None
1347 result.append(self.convert_resource_to_read(t, include_metrics=False))
1348 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
1349 logger.exception(f"Failed to convert resource {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}")
1350 # Continue with remaining resources instead of failing completely
1351 return result
1353 async def list_server_resources(
1354 self,
1355 db: Session,
1356 server_id: str,
1357 include_inactive: bool = False,
1358 include_metrics: bool = False,
1359 user_email: Optional[str] = None,
1360 token_teams: Optional[List[str]] = None,
1361 ) -> List[ResourceRead]:
1362 """
1363 Retrieve a list of registered resources from the database.
1365 This method retrieves resources from the database and converts them into a list
1366 of ResourceRead objects. It supports filtering out inactive resources based on the
1367 include_inactive parameter. The cursor parameter is reserved for future pagination support
1368 but is currently not implemented.
1370 Args:
1371 db (Session): The SQLAlchemy database session.
1372 server_id (str): Server ID
1373 include_inactive (bool): If True, include inactive resources in the result.
1374 Defaults to False.
1375 include_metrics (bool): If True, include metrics data in the result.
1376 Defaults to False.
1377 user_email (Optional[str]): User email for visibility filtering. If None, no filtering applied.
1378 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API
1379 token access where the token scope should be respected.
1381 Returns:
1382 List[ResourceRead]: A list of resources represented as ResourceRead objects.
1384 Examples:
1385 >>> from mcpgateway.services.resource_service import ResourceService
1386 >>> from unittest.mock import MagicMock
1387 >>> service = ResourceService()
1388 >>> db = MagicMock()
1389 >>> resource_read = MagicMock()
1390 >>> service.convert_resource_to_read = MagicMock(return_value=resource_read)
1391 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()]
1392 >>> import asyncio
1393 >>> result = asyncio.run(service.list_server_resources(db, 'server1'))
1394 >>> isinstance(result, list)
1395 True
1396 >>> # Include inactive branch
1397 >>> result = asyncio.run(service.list_server_resources(db, 'server1', include_inactive=True))
1398 >>> isinstance(result, list)
1399 True
1400 """
1401 with create_span(
1402 "resource.list",
1403 {
1404 "server_id": server_id,
1405 "include_inactive": include_inactive,
1406 "include_metrics": include_metrics,
1407 "user.email": user_email,
1408 "team.scope": format_trace_team_scope(token_teams) if token_teams is not None else None,
1409 },
1410 ):
1411 logger.debug(f"Listing resources for server_id: {server_id}, include_inactive: {include_inactive}")
1412 query = (
1413 select(DbResource)
1414 .join(server_resource_association, DbResource.id == server_resource_association.c.resource_id)
1415 .where(DbResource.uri_template.is_(None))
1416 .where(server_resource_association.c.server_id == server_id)
1417 )
1419 # Eager load metrics relationships to prevent N+1 queries when include_metrics=true
1420 if include_metrics:
1421 query = query.options(selectinload(DbResource.metrics), selectinload(DbResource.metrics_hourly))
1422 if not include_inactive:
1423 query = query.where(DbResource.enabled)
1425 # Add visibility filtering if user context OR token_teams provided
1426 # This ensures unauthenticated requests with token_teams=[] only see public resources
1427 if user_email is not None or token_teams is not None: # empty-string user_email -> public-only filtering (secure default)
1428 # Use token_teams if provided (for MCP/API token access), otherwise look up from DB
1429 if token_teams is not None:
1430 team_ids = token_teams
1431 elif user_email:
1432 # First-Party
1433 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel
1435 team_service = TeamManagementService(db)
1436 user_teams = await team_service.get_user_teams(user_email)
1437 team_ids = [team.id for team in user_teams]
1438 else:
1439 team_ids = []
1441 # Check if this is a public-only token (empty teams array)
1442 # Public-only tokens can ONLY see public resources - no owner access
1443 is_public_only_token = token_teams is not None and len(token_teams) == 0
1445 access_conditions = [
1446 DbResource.visibility == "public",
1447 ]
1448 # Only include owner access for non-public-only tokens with user_email
1449 if not is_public_only_token and user_email:
1450 access_conditions.append(DbResource.owner_email == user_email)
1451 if team_ids:
1452 access_conditions.append(and_(DbResource.team_id.in_(team_ids), DbResource.visibility.in_(["team", "public"])))
1453 query = query.where(or_(*access_conditions))
1455 # Cursor-based pagination logic can be implemented here in the future.
1456 resources = db.execute(query).scalars().all()
1458 # Batch fetch team names to avoid N+1 queries
1459 resource_team_ids = {r.team_id for r in resources if r.team_id}
1460 team_map = {}
1461 if resource_team_ids:
1462 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(resource_team_ids), EmailTeam.is_active.is_(True))).all()
1463 team_map = {str(team.id): team.name for team in teams}
1465 db.commit() # Release transaction to avoid idle-in-transaction
1467 result = []
1468 for t in resources:
1469 try:
1470 t.team = team_map.get(str(t.team_id)) if t.team_id else None
1471 result.append(self.convert_resource_to_read(t, include_metrics=include_metrics))
1472 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
1473 logger.exception(f"Failed to convert resource {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}")
1474 # Continue with remaining resources instead of failing completely
1475 return result
1477 async def _record_resource_metric(self, db: Session, resource: DbResource, start_time: float, success: bool, error_message: Optional[str]) -> None:
1478 """
1479 Records a metric for a resource access.
1481 Args:
1482 db: Database session
1483 resource: The resource that was accessed
1484 start_time: Monotonic start time of the access
1485 success: True if successful, False otherwise
1486 error_message: Error message if failed, None otherwise
1487 """
1488 end_time = time.monotonic()
1489 response_time = end_time - start_time
1491 metric = ResourceMetric(
1492 resource_id=resource.id,
1493 response_time=response_time,
1494 is_success=success,
1495 error_message=error_message,
1496 )
1497 db.add(metric)
1498 db.commit()
1500 async def _record_invoke_resource_metric(self, db: Session, resource_id: str, start_time: float, success: bool, error_message: Optional[str]) -> None:
1501 """
1502 Records a metric for invoking resource.
1504 Args:
1505 db: Database Session
1506 resource_id: unique identifier to access & invoke resource
1507 start_time: Monotonic start time of the access
1508 success: True if successful, False otherwise
1509 error_message: Error message if failed, None otherwise
1510 """
1511 end_time = time.monotonic()
1512 response_time = end_time - start_time
1514 metric = ResourceMetric(
1515 resource_id=resource_id,
1516 response_time=response_time,
1517 is_success=success,
1518 error_message=error_message,
1519 )
1520 db.add(metric)
1521 db.commit()
1523 def create_ssl_context(self, ca_certificate: str) -> ssl.SSLContext:
1524 """Create an SSL context with the provided CA certificate.
1526 Uses caching to avoid repeated SSL context creation for the same certificate.
1528 Args:
1529 ca_certificate: CA certificate in PEM format
1531 Returns:
1532 ssl.SSLContext: Configured SSL context
1533 """
1534 return get_cached_ssl_context(ca_certificate)
1536 async def invoke_resource( # pylint: disable=unused-argument
1537 self,
1538 db: Session,
1539 resource_id: str,
1540 resource_uri: str,
1541 resource_template_uri: Optional[str] = None,
1542 user_identity: Optional[Union[str, Dict[str, Any]]] = None,
1543 meta_data: Optional[Dict[str, Any]] = None, # Reserved for future MCP SDK support
1544 resource_obj: Optional[Any] = None,
1545 gateway_obj: Optional[Any] = None,
1546 server_id: Optional[str] = None,
1547 ) -> Any:
1548 """
1549 Invoke a resource via its configured gateway using SSE or StreamableHTTP transport.
1551 This method determines the correct URI to invoke, loads the associated resource
1552 and gateway from the database, validates certificates if applicable, prepares
1553 authentication headers (OAuth, header-based, or none), and then connects to
1554 the gateway to read the resource using the appropriate transport.
1556 The function supports:
1557 - CA certificate validation / SSL context creation
1558 - OAuth client-credentials and authorization-code flow
1559 - Header-based auth
1560 - SSE transport gateways
1561 - StreamableHTTP transport gateways
1563 Args:
1564 db (Session):
1565 SQLAlchemy session for retrieving resource and gateway information.
1566 resource_id (str):
1567 ID of the resource to invoke.
1568 resource_uri (str):
1569 Direct resource URI configured for the resource.
1570 resource_template_uri (Optional[str]):
1571 URI from the template. Overrides `resource_uri` when provided.
1572 user_identity (Optional[Union[str, Dict[str, Any]]]):
1573 Identity of the user making the request, used for session pool isolation.
1574 Can be a string (email) or a dict with an 'email' key.
1575 Defaults to "anonymous" for pool isolation if not provided.
1576 OAuth Authorization Code token lookup uses this user identity.
1577 meta_data (Optional[Dict[str, Any]]):
1578 Additional metadata to pass to the gateway during invocation.
1579 resource_obj (Optional[Any]):
1580 Pre-fetched DbResource object to skip the internal resource lookup query.
1581 gateway_obj (Optional[Any]):
1582 Pre-fetched DbGateway object to skip the internal gateway lookup query.
1583 server_id (Optional[str]):
1584 Virtual server ID for server metrics recording. When provided, indicates
1585 the resource was invoked through a specific virtual server endpoint.
1586 Direct resource calls (e.g., from admin UI) should pass None.
1588 Returns:
1589 Any: The text content returned by the remote resource, or ``None`` if the
1590 gateway could not be contacted or an error occurred.
1592 Raises:
1593 Exception: Any unhandled internal errors (e.g., DB issues).
1595 ---
1596 Doctest Examples
1597 ----------------
1599 >>> class FakeDB:
1600 ... "Simple DB stub returning fake resource and gateway rows."
1601 ... def execute(self, query):
1602 ... class Result:
1603 ... def scalar_one_or_none(self):
1604 ... # Return fake objects with the needed attributes
1605 ... class FakeResource:
1606 ... id = "res123"
1607 ... name = "Demo Resource"
1608 ... gateway_id = "gw1"
1609 ... return FakeResource()
1610 ... return Result()
1612 >>> class FakeGateway:
1613 ... id = "gw1"
1614 ... name = "Fake Gateway"
1615 ... url = "https://fake.gateway"
1616 ... ca_certificate = None
1617 ... ca_certificate_sig = None
1618 ... transport = "sse"
1619 ... auth_type = None
1620 ... auth_value = {}
1622 >>> # Monkeypatch the DB lookup for gateway
1623 >>> def fake_execute_gateway(self, query):
1624 ... class Result:
1625 ... def scalar_one_or_none(self_inner):
1626 ... return FakeGateway()
1627 ... return Result()
1629 >>> FakeDB.execute_gateway = fake_execute_gateway
1631 >>> class FakeService:
1632 ... "Service stub replacing network calls with predictable outputs."
1633 ... async def invoke_resource(self, db, resource_id, resource_uri, resource_template_uri=None):
1634 ... # Represent the behavior of a successful SSE response.
1635 ... return "hello from gateway"
1637 >>> svc = FakeService()
1638 >>> import asyncio
1639 >>> asyncio.run(svc.invoke_resource(FakeDB(), "res123", "/test"))
1640 'hello from gateway'
1642 ---
1643 Example: Template URI overrides resource URI
1644 --------------------------------------------
1646 >>> class FakeService2(FakeService):
1647 ... async def invoke_resource(self, db, resource_id, resource_uri, resource_template_uri=None):
1648 ... if resource_template_uri:
1649 ... return f"using template: {resource_template_uri}"
1650 ... return f"using direct: {resource_uri}"
1652 >>> svc2 = FakeService2()
1653 >>> asyncio.run(svc2.invoke_resource(FakeDB(), "res123", "/direct", "/template"))
1654 'using template: /template'
1656 """
1657 uri = None
1658 if resource_uri and resource_template_uri:
1659 uri = resource_template_uri
1660 elif resource_uri:
1661 uri = resource_uri
1663 logger.info(f"Invoking the resource: {uri}")
1664 gateway_id = None
1665 # Use pre-fetched resource if provided (avoids Q5 re-fetch)
1666 resource_info = resource_obj
1667 if resource_info is None:
1668 resource_info = db.execute(select(DbResource).where(DbResource.id == resource_id)).scalar_one_or_none()
1670 # Release transaction immediately after resource lookup to prevent idle-in-transaction
1671 # This is especially important when resource isn't found - we don't want to hold the transaction
1672 db.commit()
1674 # Normalize user_identity to string for session pool isolation.
1675 if isinstance(user_identity, dict):
1676 pool_user_identity = user_identity.get("email") or "anonymous"
1677 elif isinstance(user_identity, str):
1678 pool_user_identity = user_identity
1679 else:
1680 pool_user_identity = "anonymous"
1682 oauth_user_email: Optional[str] = None
1683 if isinstance(user_identity, dict):
1684 user_email_value = user_identity.get("email")
1685 if isinstance(user_email_value, str):
1686 oauth_user_email = user_email_value.strip().lower() or None
1687 elif isinstance(user_identity, str):
1688 oauth_user_email = user_identity.strip().lower() or None
1690 if resource_info:
1691 gateway_id = getattr(resource_info, "gateway_id", None)
1692 resource_name = getattr(resource_info, "name", None)
1693 # Use pre-fetched gateway if provided (avoids Q6 re-fetch)
1694 gateway = gateway_obj
1695 if gateway is None and gateway_id:
1696 gateway = db.execute(select(DbGateway).where(DbGateway.id == gateway_id)).scalar_one_or_none()
1698 # ═══════════════════════════════════════════════════════════════════════════
1699 # CRITICAL: Release DB transaction immediately after fetching resource/gateway
1700 # This ensures the transaction doesn't stay open if there's no gateway
1701 # or if the function returns early for any reason.
1702 # ═══════════════════════════════════════════════════════════════════════════
1703 db.commit()
1705 if gateway:
1707 start_time = time.monotonic()
1708 success = False
1709 error_message = None
1711 # Create database span for observability dashboard
1712 trace_id = current_trace_id.get()
1713 db_span_id = None
1714 db_span_ended = False
1715 observability_service = ObservabilityService() if trace_id else None
1717 if trace_id and observability_service:
1718 try:
1719 db_span_id = observability_service.start_span(
1720 db=db,
1721 trace_id=trace_id,
1722 name="invoke.resource",
1723 attributes={
1724 "resource.name": resource_name if resource_name else "unknown",
1725 "resource.id": str(resource_id) if resource_id else "unknown",
1726 "resource.uri": str(uri) or "unknown",
1727 "gateway.transport": getattr(gateway, "transport") or "uknown",
1728 "gateway.url": getattr(gateway, "url") or "unknown",
1729 },
1730 )
1731 logger.debug(f"✓ Created resource.read span: {db_span_id} for resource: {resource_id} & {uri}")
1732 except Exception as e:
1733 logger.warning(f"Failed to start the observability span for invoking resource: {e}")
1734 db_span_id = None
1736 span_attributes = {
1737 "resource.name": resource_name if resource_name else "unknown",
1738 "resource.id": str(resource_id) if resource_id else "unknown",
1739 "resource.uri": str(uri) or "unknown",
1740 "gateway.transport": getattr(gateway, "transport") or "uknown",
1741 "gateway.url": getattr(gateway, "url") or "unknown",
1742 }
1743 if is_input_capture_enabled("invoke.resource"):
1744 span_attributes["langfuse.observation.input"] = serialize_trace_payload({"uri": str(uri) if uri else "unknown"})
1746 with create_span("invoke.resource", span_attributes) as span:
1747 valid = False
1748 if gateway.ca_certificate:
1749 if settings.enable_ed25519_signing:
1750 public_key_pem = settings.ed25519_public_key
1751 valid = validate_signature(gateway.ca_certificate.encode(), gateway.ca_certificate_sig, public_key_pem)
1752 else:
1753 valid = True
1755 if valid:
1756 ssl_context = self.create_ssl_context(gateway.ca_certificate)
1757 else:
1758 ssl_context = None
1760 def _get_httpx_client_factory(
1761 headers: dict[str, str] | None = None,
1762 timeout: httpx.Timeout | None = None,
1763 auth: httpx.Auth | None = None,
1764 ) -> httpx.AsyncClient:
1765 """Factory function to create httpx.AsyncClient with optional CA certificate.
1767 Args:
1768 headers: Optional headers for the client
1769 timeout: Optional timeout for the client
1770 auth: Optional auth for the client
1772 Returns:
1773 httpx.AsyncClient: Configured HTTPX async client
1774 """
1775 # First-Party
1776 from mcpgateway.services.http_client_service import get_default_verify, get_http_timeout # pylint: disable=import-outside-toplevel
1778 return httpx.AsyncClient(
1779 verify=ssl_context if ssl_context else get_default_verify(), # pylint: disable=cell-var-from-loop
1780 follow_redirects=True,
1781 headers=headers,
1782 timeout=timeout if timeout else get_http_timeout(),
1783 auth=auth,
1784 limits=httpx.Limits(
1785 max_connections=settings.httpx_max_connections,
1786 max_keepalive_connections=settings.httpx_max_keepalive_connections,
1787 keepalive_expiry=settings.httpx_keepalive_expiry,
1788 ),
1789 )
1791 try:
1792 # ═══════════════════════════════════════════════════════════════════════════
1793 # Extract gateway data to local variables BEFORE OAuth handling
1794 # OAuth client_credentials flow makes network calls, so we must release
1795 # the DB transaction first to prevent idle-in-transaction during network I/O
1796 # ═══════════════════════════════════════════════════════════════════════════
1797 gateway_url = gateway.url
1798 gateway_transport = gateway.transport
1799 gateway_auth_type = gateway.auth_type
1800 gateway_auth_value = gateway.auth_value
1801 gateway_oauth_config = gateway.oauth_config
1802 gateway_name = gateway.name
1803 gateway_auth_query_params = getattr(gateway, "auth_query_params", None)
1805 # Apply query param auth to URL if applicable
1806 auth_query_params_decrypted: Optional[Dict[str, str]] = None
1807 if gateway_auth_type == "query_param" and gateway_auth_query_params:
1808 auth_query_params_decrypted = {}
1809 for param_key, encrypted_value in gateway_auth_query_params.items():
1810 if encrypted_value:
1811 try:
1812 decrypted = decode_auth(encrypted_value)
1813 auth_query_params_decrypted[param_key] = decrypted.get(param_key, "")
1814 except Exception: # noqa: S110 - intentionally skip failed decryptions
1815 # Silently skip params that fail decryption (corrupted or old key)
1816 logger.debug(f"Failed to decrypt query param '{param_key}' for resource")
1817 if auth_query_params_decrypted:
1818 gateway_url = apply_query_param_auth(gateway_url, auth_query_params_decrypted)
1820 # ═══════════════════════════════════════════════════════════════════════════
1821 # CRITICAL: Release DB connection back to pool BEFORE making HTTP/OAuth calls
1822 # This prevents connection pool exhaustion during slow upstream requests.
1823 # OAuth client_credentials flow makes network calls to token endpoints.
1824 # All needed data has been extracted to local variables above.
1825 # ═══════════════════════════════════════════════════════════════════════════
1826 db.commit() # End read-only transaction cleanly
1827 db.close()
1829 # Handle different authentication types (AFTER DB release)
1830 headers = {}
1831 if gateway_auth_type == "oauth" and gateway_oauth_config:
1832 grant_type = gateway_oauth_config.get("grant_type", "client_credentials")
1834 if grant_type == "authorization_code":
1835 # For Authorization Code flow, try to get stored tokens
1836 try:
1837 # First-Party
1838 from mcpgateway.services.token_storage_service import TokenStorageService # pylint: disable=import-outside-toplevel
1840 # Use fresh DB session for token lookup (original db was closed)
1841 access_token = None
1842 if oauth_user_email:
1843 with fresh_db_session() as token_db:
1844 token_storage = TokenStorageService(token_db)
1845 access_token = await token_storage.get_user_token(gateway_id, oauth_user_email)
1847 if access_token:
1848 headers["Authorization"] = f"Bearer {access_token}"
1849 else:
1850 if span:
1851 set_span_attribute(span, "health.status", "unhealthy")
1852 set_span_error(span, "No valid OAuth token for user")
1854 except Exception as e:
1855 logger.error(f"Failed to obtain stored OAuth token for gateway {gateway_name}: {e}")
1856 if span:
1857 set_span_attribute(span, "health.status", "unhealthy")
1858 set_span_error(span, "Failed to obtain stored OAuth token")
1859 else:
1860 # For Client Credentials flow, get token directly (makes network calls)
1861 try:
1862 access_token: str = await self.oauth_manager.get_access_token(gateway_oauth_config)
1863 headers["Authorization"] = f"Bearer {access_token}"
1864 except Exception as e:
1865 if span:
1866 set_span_attribute(span, "health.status", "unhealthy")
1867 set_span_error(span, e)
1868 else:
1869 # Handle non-OAuth authentication (existing logic)
1870 auth_data = gateway_auth_value or {}
1871 if isinstance(auth_data, str):
1872 headers = decode_auth(auth_data)
1873 elif isinstance(auth_data, dict):
1874 headers = {str(k): str(v) for k, v in auth_data.items()}
1875 else:
1876 headers = {}
1878 async def connect_to_sse_session(server_url: str, uri: str, authentication: Optional[Dict[str, str]] = None) -> str | None:
1879 """
1880 Connect to an SSE-based gateway and retrieve the text content of a resource.
1882 This helper establishes an SSE (Server-Sent Events) session with the remote
1883 gateway, initializes a `ClientSession`, invokes `read_resource()` for the
1884 given URI, and returns the textual content from the first item in the
1885 response's `contents` list.
1887 If any error occurs (network failure, unexpected response format, session
1888 initialization failure, etc.), the method logs the exception and returns
1889 ``None`` instead of raising.
1891 Note:
1892 MCP SDK 1.25.0 read_resource() does not support meta parameter.
1893 When the SDK adds support, meta_data can be added back here.
1895 Args:
1896 server_url (str):
1897 The base URL of the SSE gateway to connect to.
1898 uri (str):
1899 The resource URI that should be requested from the gateway.
1900 authentication (Optional[Dict[str, str]]):
1901 Optional dictionary of headers (e.g., OAuth Bearer tokens) to
1902 include in the SSE connection request. Defaults to an empty
1903 dictionary when not provided.
1905 Returns:
1906 str | None:
1907 The text content returned by the remote resource, or ``None`` if the
1908 SSE connection fails or the response is invalid.
1910 Notes:
1911 - This function assumes the SSE client context manager yields:
1912 ``(read_stream, write_stream, get_session_id)``.
1913 - The expected response object from `session.read_resource()` must have a
1914 `contents` attribute containing a list, where the first element has a
1915 `text` attribute.
1916 """
1917 if authentication is None:
1918 authentication = {}
1919 try:
1920 # Use session pool if enabled for 10-20x latency improvement
1921 use_pool = False
1922 pool = None
1923 if settings.mcp_session_pool_enabled:
1924 try:
1925 pool = get_mcp_session_pool()
1926 use_pool = True
1927 except RuntimeError:
1928 # Pool not initialized (e.g., in tests), fall back to per-call sessions
1929 pass
1931 if use_pool and pool is not None:
1932 async with pool.session(
1933 url=server_url,
1934 headers=authentication,
1935 transport_type=TransportType.SSE,
1936 httpx_client_factory=_get_httpx_client_factory,
1937 user_identity=pool_user_identity,
1938 gateway_id=gateway_id,
1939 ) as pooled:
1940 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter
1941 resource_response = await pooled.session.read_resource(uri=uri)
1942 return getattr(getattr(resource_response, "contents")[0], "text")
1943 else:
1944 # Fallback to per-call sessions when pool disabled or not initialized
1945 async with sse_client(url=server_url, headers=authentication, timeout=settings.health_check_timeout, httpx_client_factory=_get_httpx_client_factory) as (
1946 read_stream,
1947 write_stream,
1948 ):
1949 async with ClientSession(read_stream, write_stream) as session:
1950 _ = await session.initialize()
1951 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter
1952 resource_response = await session.read_resource(uri=uri)
1953 return getattr(getattr(resource_response, "contents")[0], "text")
1954 except Exception as e:
1955 # Sanitize error message to prevent URL secrets from leaking in logs
1956 sanitized_error = sanitize_exception_message(str(e), auth_query_params_decrypted)
1957 logger.debug(f"Exception while connecting to sse gateway: {sanitized_error}")
1958 return None
1960 async def connect_to_streamablehttp_server(server_url: str, uri: str, authentication: Optional[Dict[str, str]] = None) -> str | None:
1961 """
1962 Connect to a StreamableHTTP gateway and retrieve the text content of a resource.
1964 This helper establishes a StreamableHTTP client session with the specified
1965 gateway, initializes a `ClientSession`, invokes `read_resource()` for the
1966 given URI, and returns the textual content from the first element in the
1967 response's `contents` list.
1969 If any exception occurs during connection, session initialization, or
1970 resource reading, the function logs the error and returns ``None`` instead
1971 of propagating the exception.
1973 Note:
1974 MCP SDK 1.25.0 read_resource() does not support meta parameter.
1975 When the SDK adds support, meta_data can be added back here.
1977 Args:
1978 server_url (str):
1979 The endpoint URL of the StreamableHTTP gateway.
1980 uri (str):
1981 The resource URI to request from the gateway.
1982 authentication (Optional[Dict[str, str]]):
1983 Optional dictionary of authentication headers (e.g., API keys or
1984 Bearer tokens). Defaults to an empty dictionary when not provided.
1986 Returns:
1987 str | None:
1988 The text content returned by the StreamableHTTP resource, or ``None``
1989 if the connection fails or the response format is invalid.
1991 Notes:
1992 - The `streamablehttp_client` context manager must yield a tuple:
1993 ``(read_stream, write_stream, get_session_id)``.
1994 - The expected `resource_response` returned by ``session.read_resource()``
1995 must contain a `contents` list, whose first element exposes a `text`
1996 attribute.
1997 """
1998 if authentication is None:
1999 authentication = {}
2000 try:
2001 # Use session pool if enabled for 10-20x latency improvement
2002 use_pool = False
2003 pool = None
2004 if settings.mcp_session_pool_enabled:
2005 try:
2006 pool = get_mcp_session_pool()
2007 use_pool = True
2008 except RuntimeError:
2009 # Pool not initialized (e.g., in tests), fall back to per-call sessions
2010 pass
2012 if use_pool and pool is not None:
2013 async with pool.session(
2014 url=server_url,
2015 headers=authentication,
2016 transport_type=TransportType.STREAMABLE_HTTP,
2017 httpx_client_factory=_get_httpx_client_factory,
2018 user_identity=pool_user_identity,
2019 gateway_id=gateway_id,
2020 ) as pooled:
2021 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter
2022 resource_response = await pooled.session.read_resource(uri=uri)
2023 return getattr(getattr(resource_response, "contents")[0], "text")
2024 else:
2025 # Fallback to per-call sessions when pool disabled or not initialized
2026 async with streamablehttp_client(url=server_url, headers=authentication, timeout=settings.health_check_timeout, httpx_client_factory=_get_httpx_client_factory) as (
2027 read_stream,
2028 write_stream,
2029 _get_session_id,
2030 ):
2031 async with ClientSession(read_stream, write_stream) as session:
2032 _ = await session.initialize()
2033 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter
2034 resource_response = await session.read_resource(uri=uri)
2035 return getattr(getattr(resource_response, "contents")[0], "text")
2036 except Exception as e:
2037 # Sanitize error message to prevent URL secrets from leaking in logs
2038 sanitized_error = sanitize_exception_message(str(e), auth_query_params_decrypted)
2039 logger.debug(f"Exception while connecting to streamablehttp gateway: {sanitized_error}")
2040 return None
2042 if span:
2043 set_span_attribute(span, "success", True)
2044 set_span_attribute(span, "duration.ms", (time.monotonic() - start_time) * 1000)
2046 resource_text = ""
2047 if (gateway_transport).lower() == "sse":
2048 # Note: meta_data not passed - MCP SDK 1.25.0 read_resource() doesn't support it
2049 resource_text = await connect_to_sse_session(server_url=gateway_url, authentication=headers, uri=uri)
2050 else:
2051 # Note: meta_data not passed - MCP SDK 1.25.0 read_resource() doesn't support it
2052 resource_text = await connect_to_streamablehttp_server(server_url=gateway_url, authentication=headers, uri=uri)
2053 if span and resource_text is not None and is_output_capture_enabled("invoke.resource"):
2054 set_span_attribute(span, "langfuse.observation.output", serialize_trace_payload({"content": resource_text}))
2055 success = True # Mark as successful before returning
2056 return resource_text
2057 except Exception as e:
2058 success = False
2059 error_message = str(e)
2060 raise
2061 finally:
2062 # Metrics are now recorded only in read_resource finally block
2063 # This eliminates duplicate metrics and provides a single source of truth
2064 # End Invoke resource span for Observability dashboard
2065 # NOTE: Use fresh_db_session() since the original db was released
2066 # before making HTTP calls to prevent connection pool exhaustion
2067 if db_span_id and observability_service and not db_span_ended:
2068 try:
2069 with fresh_db_session() as fresh_db:
2070 observability_service.end_span(
2071 db=fresh_db,
2072 span_id=db_span_id,
2073 status="ok" if success else "error",
2074 status_message=error_message if error_message else None,
2075 )
2076 db_span_ended = True
2077 logger.debug(f"✓ Ended invoke.resource span: {db_span_id}")
2078 except Exception as e:
2079 logger.warning(f"Failed to end observability span for invoking resource: {e}")
2081 async def read_resource(
2082 self,
2083 db: Session,
2084 resource_id: Optional[Union[int, str]] = None,
2085 resource_uri: Optional[str] = None,
2086 request_id: Optional[str] = None,
2087 user: Optional[str] = None,
2088 server_id: Optional[str] = None,
2089 include_inactive: bool = False,
2090 token_teams: Optional[List[str]] = None,
2091 plugin_context_table: Optional[PluginContextTable] = None,
2092 plugin_global_context: Optional[GlobalContext] = None,
2093 meta_data: Optional[Dict[str, Any]] = None,
2094 ) -> Union[ResourceContent, ResourceContents]:
2095 """Read a resource's content with plugin hook support.
2097 Args:
2098 db: Database session.
2099 resource_id: Optional ID of the resource to read.
2100 resource_uri: Optional URI of the resource to read.
2101 request_id: Optional request ID for tracing.
2102 user: Optional user email for authorization checks.
2103 server_id: Optional server ID for server scoping enforcement.
2104 include_inactive: Whether to include inactive resources. Defaults to False.
2105 token_teams: Optional list of team IDs from token for authorization.
2106 None = unrestricted admin, [] = public-only, [...] = team-scoped.
2107 plugin_context_table: Optional plugin context table from previous hooks for cross-hook state sharing.
2108 plugin_global_context: Optional global context from middleware for consistency across hooks.
2109 meta_data: Optional metadata dictionary to pass to the gateway during resource reading.
2111 Returns:
2112 Resource content object
2114 Raises:
2115 ResourceNotFoundError: If resource not found or access denied
2116 ResourceError: If blocked by plugin
2117 PluginError: If encounters issue with plugin
2118 PluginViolationError: If plugin violated the request. Example - In case of OPA plugin, if the request is denied by policy.
2119 ValueError: If neither resource_id nor resource_uri is provided
2121 Examples:
2122 >>> from mcpgateway.common.models import ResourceContent
2123 >>> from mcpgateway.services.resource_service import ResourceService
2124 >>> from unittest.mock import MagicMock, PropertyMock
2125 >>> service = ResourceService()
2126 >>> db = MagicMock()
2127 >>> uri = 'http://example.com/resource.txt'
2128 >>> mock_resource = MagicMock()
2129 >>> mock_resource.id = 123
2130 >>> mock_resource.uri = uri
2131 >>> type(mock_resource).content = PropertyMock(return_value='test')
2132 >>> db.execute.return_value.scalar_one_or_none.return_value = mock_resource
2133 >>> db.get.return_value = mock_resource
2134 >>> import asyncio
2135 >>> result = asyncio.run(service.read_resource(db, resource_uri=uri))
2136 >>> result.__class__.__name__ == 'ResourceContent'
2137 True
2139 Not found case returns ResourceNotFoundError:
2141 >>> db2 = MagicMock()
2142 >>> db2.execute.return_value.scalar_one_or_none.return_value = None
2143 >>> db2.get.return_value = None
2144 >>> import asyncio
2145 >>> # Disable path validation for doctest
2146 >>> import mcpgateway.config
2147 >>> old_val = getattr(mcpgateway.config.settings, 'experimental_validate_io', False)
2148 >>> mcpgateway.config.settings.experimental_validate_io = False
2149 >>> def _nf():
2150 ... try:
2151 ... asyncio.run(service.read_resource(db2, resource_uri='abc'))
2152 ... except ResourceNotFoundError:
2153 ... return True
2154 >>> result = _nf()
2155 >>> mcpgateway.config.settings.experimental_validate_io = old_val
2156 >>> result
2157 True
2158 """
2159 start_time = time.monotonic()
2160 success = False
2161 error_message = None
2162 resource_db = None
2163 server_scoped = False
2164 resource_db_gateway = None # Only set when eager-loaded via Q2's joinedload
2165 content = None
2166 uri = resource_uri or "unknown"
2167 if resource_id:
2168 resource_db = db.get(DbResource, resource_id, options=[joinedload(DbResource.gateway)])
2169 if resource_db:
2170 uri = resource_db.uri
2171 resource_db_gateway = resource_db.gateway # Eager-loaded, safe to access
2172 # Check enabled status in Python (avoids redundant Q3/Q4 re-fetches)
2173 if not include_inactive and not resource_db.enabled:
2174 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive")
2175 content = resource_db.content
2176 else:
2177 uri = None
2179 # Create database span for observability dashboard
2180 trace_id = current_trace_id.get()
2181 db_span_id = None
2182 db_span_ended = False
2183 observability_service = ObservabilityService() if trace_id else None
2185 if trace_id and observability_service:
2186 try:
2187 db_span_id = observability_service.start_span(
2188 db=db,
2189 trace_id=trace_id,
2190 name="resource.read",
2191 attributes={
2192 "resource.uri": str(resource_uri) if resource_uri else "unknown",
2193 "user": user or "anonymous",
2194 "server_id": server_id,
2195 "request_id": request_id,
2196 "http.url": uri if uri is not None and uri.startswith("http") else None,
2197 "resource.type": "template" if (uri is not None and "{" in uri and "}" in uri) else "static",
2198 },
2199 )
2200 logger.debug(f"✓ Created resource.read span: {db_span_id} for resource: {uri}")
2201 except Exception as e:
2202 logger.warning(f"Failed to start observability span for resource reading: {e}")
2203 db_span_id = None
2205 span_attributes = {
2206 "resource.uri": resource_uri or "unknown",
2207 "user": user or "anonymous",
2208 "server_id": server_id,
2209 "request_id": request_id,
2210 "http.url": uri if uri is not None and uri.startswith("http") else None,
2211 "resource.type": "template" if (uri is not None and "{" in uri and "}" in uri) else "static",
2212 }
2213 if is_input_capture_enabled("resource.read"):
2214 span_attributes["langfuse.observation.input"] = serialize_trace_payload({"uri": resource_uri or resource_id or "unknown"})
2216 with create_span("resource.read", span_attributes) as span:
2217 try:
2218 # Generate request ID if not provided
2219 if not request_id:
2220 request_id = str(uuid.uuid4())
2222 original_uri = uri
2223 contexts = None
2225 # Check if plugin manager is available and eligible for this request
2226 plugin_eligible = bool(self._plugin_manager and PLUGINS_AVAILABLE and uri and ("://" in uri))
2228 # Initialize plugin manager if needed (lazy init must happen before has_hooks_for check)
2229 # pylint: disable=protected-access
2230 if plugin_eligible and not self._plugin_manager._initialized:
2231 await self._plugin_manager.initialize()
2232 # pylint: enable=protected-access
2234 # Check if any resource hooks are registered to avoid unnecessary context creation
2235 has_pre_fetch = plugin_eligible and self._plugin_manager.has_hooks_for(ResourceHookType.RESOURCE_PRE_FETCH)
2236 has_post_fetch = plugin_eligible and self._plugin_manager.has_hooks_for(ResourceHookType.RESOURCE_POST_FETCH)
2238 # Initialize plugin context variables only if hooks are registered
2239 global_context = None
2240 if has_pre_fetch or has_post_fetch:
2241 # Create plugin context
2242 # Normalize user to an identifier string if provided
2243 user_id = None
2244 if user is not None:
2245 if isinstance(user, dict) and "email" in user:
2246 user_id = user.get("email")
2247 elif isinstance(user, str):
2248 user_id = user
2249 else:
2250 # Attempt to fallback to attribute access
2251 user_id = getattr(user, "email", None)
2253 # Use existing global_context from middleware or create new one
2254 if plugin_global_context:
2255 global_context = plugin_global_context
2256 # Update fields with resource-specific information
2257 if user_id:
2258 global_context.user = user_id
2259 if server_id:
2260 global_context.server_id = server_id
2261 else:
2262 # Create new context (fallback when middleware didn't run)
2263 global_context = GlobalContext(request_id=request_id, user=user_id, server_id=server_id)
2265 # Call pre-fetch hooks if registered
2266 if has_pre_fetch:
2267 # Create pre-fetch payload
2268 pre_payload = ResourcePreFetchPayload(uri=uri, metadata={})
2270 # Execute pre-fetch hooks with context from previous hooks
2271 pre_result, contexts = await self._plugin_manager.invoke_hook(
2272 ResourceHookType.RESOURCE_PRE_FETCH,
2273 pre_payload,
2274 global_context,
2275 local_contexts=plugin_context_table, # Pass context from previous hooks
2276 violations_as_exceptions=True,
2277 )
2278 # Use modified URI if plugin changed it
2279 if pre_result.modified_payload:
2280 uri = pre_result.modified_payload.uri
2281 logger.debug(f"Resource URI modified by plugin: {original_uri} -> {uri}")
2283 # Validate resource path if experimental validation is enabled
2284 if getattr(settings, "experimental_validate_io", False) and uri and isinstance(uri, str):
2285 try:
2286 SecurityValidator.validate_path(uri, getattr(settings, "allowed_roots", None))
2287 except ValueError as e:
2288 raise ResourceError(f"Path validation failed: {e}")
2290 # Original resource fetching logic
2291 logger.info(f"Fetching resource: {resource_id} (URI: {uri})")
2293 # Check if resource's gateway is in direct_proxy mode
2294 # First, try to find the resource to get its gateway
2295 # Check for template
2297 if resource_db is None and uri is not None: # and "{" in uri and "}" in uri:
2298 # Matches uri (modified value from pluggins if applicable)
2299 # with uri from resource DB
2300 # if uri is of type resource template then resource is retreived from DB
2301 query = select(DbResource)
2302 if server_id:
2303 query = query.join(
2304 server_resource_association,
2305 server_resource_association.c.resource_id == DbResource.id,
2306 ).where(server_resource_association.c.server_id == server_id)
2307 query = query.where(DbResource.uri == str(uri)).where(DbResource.enabled)
2308 if include_inactive:
2309 query = select(DbResource)
2310 if server_id:
2311 query = query.join(
2312 server_resource_association,
2313 server_resource_association.c.resource_id == DbResource.id,
2314 ).where(server_resource_association.c.server_id == server_id)
2315 query = query.where(DbResource.uri == str(uri))
2316 try:
2317 resource_db = db.execute(query).scalar_one_or_none()
2318 except MultipleResultsFound as exc:
2319 if server_id:
2320 raise ResourceError(f"Multiple resources matched URI '{uri}' for server '{server_id}'.") from exc
2321 raise ResourceError(f"Resource URI '{uri}' is ambiguous across multiple servers; use /servers/{{id}}/mcp.") from exc
2323 # Check for direct_proxy mode
2324 if resource_db and resource_db.gateway and getattr(resource_db.gateway, "gateway_mode", "cache") == "direct_proxy" and settings.mcpgateway_direct_proxy_enabled:
2325 # SECURITY: Check gateway access before allowing direct proxy
2326 if not await check_gateway_access(db, resource_db.gateway, user, token_teams):
2327 raise ResourceNotFoundError(f"Resource not found: {uri}")
2329 logger.info(f"Using direct_proxy mode for resource '{uri}' via gateway {resource_db.gateway.id}")
2331 try: # First-Party
2332 # First-Party
2333 from mcpgateway.common.models import BlobResourceContents, TextResourceContents # pylint: disable=import-outside-toplevel
2335 gateway = resource_db.gateway
2337 # Prepare headers with gateway auth
2338 headers = build_gateway_auth_headers(gateway)
2340 # Use MCP SDK to connect and read resource
2341 async with streamablehttp_client(url=gateway.url, headers=headers, timeout=settings.mcpgateway_direct_proxy_timeout) as (read_stream, write_stream, _get_session_id):
2342 async with ClientSession(read_stream, write_stream) as session:
2343 await session.initialize()
2345 # Note: MCP SDK read_resource() only accepts uri; _meta is not supported
2346 result = await session.read_resource(uri=uri)
2348 # Convert MCP result to MCP-compliant content models
2349 # result.contents is a list of TextResourceContents or BlobResourceContents
2350 if result.contents:
2351 first_content = result.contents[0]
2352 if hasattr(first_content, "text"):
2353 content = TextResourceContents(uri=uri, mimeType=first_content.mimeType if hasattr(first_content, "mimeType") else "text/plain", text=first_content.text)
2354 elif hasattr(first_content, "blob"):
2355 content = BlobResourceContents(
2356 uri=uri, mimeType=first_content.mimeType if hasattr(first_content, "mimeType") else "application/octet-stream", blob=first_content.blob
2357 )
2358 else:
2359 content = TextResourceContents(uri=uri, text="")
2360 else:
2361 content = TextResourceContents(uri=uri, text="")
2363 success = True
2364 logger.info(
2365 f"[READ RESOURCE] Using direct_proxy mode for gateway {SecurityValidator.sanitize_log_message(gateway.id)} (from X-Context-Forge-Gateway-Id header). Meta Attached: {meta_data is not None}"
2366 )
2367 # Skip the rest of the DB lookup logic
2369 except Exception as e:
2370 logger.exception(f"Error in direct_proxy mode for resource '{uri}': {e}")
2371 raise ResourceError(f"Direct proxy resource read failed: {str(e)}")
2373 elif resource_db:
2374 # Normal cache mode - resource found in DB
2375 content = resource_db.content
2376 else:
2377 # Check the inactivity first using the same server scope that
2378 # governed the active lookup. Without this, duplicate URIs
2379 # across different virtual servers/gateways can produce
2380 # ambiguous results even though the current request is
2381 # already scoped to a single server.
2382 inactive_query = select(DbResource)
2383 if server_id:
2384 inactive_query = inactive_query.join(
2385 server_resource_association,
2386 server_resource_association.c.resource_id == DbResource.id,
2387 ).where(server_resource_association.c.server_id == server_id)
2388 try:
2389 check_inactivity = db.execute(inactive_query.where(DbResource.uri == str(resource_uri)).where(not_(DbResource.enabled))).scalar_one_or_none()
2390 except MultipleResultsFound as exc:
2391 if server_id:
2392 raise ResourceError(f"Multiple inactive resources matched URI '{resource_uri}' for server '{server_id}'.") from exc
2393 raise ResourceError(f"Resource URI '{resource_uri}' is ambiguous across multiple servers; use /servers/{{id}}/mcp.") from exc
2394 if check_inactivity:
2395 raise ResourceNotFoundError(f"Resource '{resource_uri}' exists but is inactive")
2397 if resource_db is None:
2398 if resource_uri:
2399 # if resource_uri is provided
2400 # modified uri have templatized resource with prefilled value
2401 # triggers _read_template_resource
2402 # it internally checks which uri matches the pattern of modified uri and fetches
2403 # the one which matches else raises ResourceNotFoundError
2404 try:
2405 content = await self._read_template_resource(db, uri) or None
2406 # ═══════════════════════════════════════════════════════════════════════════
2407 # SECURITY: Fetch the template's DbResource record for access checking
2408 # _read_template_resource returns ResourceContent with the template's ID
2409 # ═══════════════════════════════════════════════════════════════════════════
2410 if content is not None and hasattr(content, "id") and content.id:
2411 template_query = select(DbResource).where(DbResource.id == str(content.id))
2412 if not include_inactive:
2413 template_query = template_query.where(DbResource.enabled)
2414 resource_db = db.execute(template_query).scalar_one_or_none()
2415 except Exception as e:
2416 raise ResourceNotFoundError(f"Resource template not found for '{resource_uri}'") from e
2418 if resource_uri:
2419 if content is None and resource_db is None:
2420 raise ResourceNotFoundError(f"Resource template not found for '{resource_uri}'")
2422 if resource_id and resource_db is None:
2423 # if resource_id provided but not found by Q2 (shouldn't normally happen,
2424 # but handles race conditions where resource is deleted between requests)
2425 query = select(DbResource).where(DbResource.id == str(resource_id)).where(DbResource.enabled)
2426 if include_inactive:
2427 query = select(DbResource).where(DbResource.id == str(resource_id))
2428 resource_db = db.execute(query).scalar_one_or_none()
2429 if resource_db:
2430 original_uri = resource_db.uri or None
2431 content = resource_db.content
2432 else:
2433 check_inactivity = db.execute(select(DbResource).where(DbResource.id == str(resource_id)).where(not_(DbResource.enabled))).scalar_one_or_none()
2434 if check_inactivity:
2435 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive")
2436 raise ResourceNotFoundError(f"Resource not found for the resource id: {resource_id}")
2438 # ═══════════════════════════════════════════════════════════════════════════
2439 # SECURITY: Check resource access based on visibility and team membership
2440 # ═══════════════════════════════════════════════════════════════════════════
2441 if resource_db:
2442 if not await self._check_resource_access(db, resource_db, user, token_teams):
2443 # Don't reveal resource existence - return generic "not found"
2444 raise ResourceNotFoundError(f"Resource not found: {resource_uri or resource_id}")
2446 # ═══════════════════════════════════════════════════════════════════════════
2447 # SECURITY: Enforce server scoping if server_id is provided
2448 # Resource must be attached to the specified virtual server
2449 # ═══════════════════════════════════════════════════════════════════════════
2450 if server_id:
2451 server_match = db.execute(
2452 select(server_resource_association.c.resource_id).where(
2453 server_resource_association.c.server_id == server_id,
2454 server_resource_association.c.resource_id == resource_db.id,
2455 )
2456 ).first()
2457 if not server_match:
2458 raise ResourceNotFoundError(f"Resource not found: {resource_uri or resource_id}")
2459 server_scoped = True
2461 # Set success attributes on span
2462 if span:
2463 set_span_attribute(span, "success", True)
2464 set_span_attribute(span, "duration.ms", (time.monotonic() - start_time) * 1000)
2465 if content:
2466 set_span_attribute(span, "content.size", len(str(content)))
2468 success = True
2469 # Return standardized content without breaking callers that expect passthrough
2470 # Prefer returning first-class content models or objects with content-like attributes.
2471 # ResourceContent and TextContent already imported at top level
2473 # Release transaction before network calls to avoid idle-in-transaction during invoke_resource
2474 db.commit()
2476 # ═══════════════════════════════════════════════════════════════════════════
2477 # RESOLVE CONTENT: Fetch actual content from gateway if needed
2478 # ═══════════════════════════════════════════════════════════════════════════
2479 # If content is a Pydantic content model, invoke gateway
2481 # ResourceContents covers TextResourceContents and BlobResourceContents (MCP-compliant)
2482 # ResourceContent is the legacy model for backwards compatibility
2484 if isinstance(content, (ResourceContent, ResourceContents, TextContent)):
2485 # Metrics are recorded in read_resource finally block for all resources
2486 resource_response = await self.invoke_resource(
2487 db=db,
2488 resource_id=getattr(content, "id"),
2489 resource_uri=getattr(content, "uri") or None,
2490 resource_template_uri=getattr(content, "text") or None,
2491 user_identity=user,
2492 meta_data=meta_data,
2493 resource_obj=resource_db,
2494 gateway_obj=resource_db_gateway,
2495 server_id=server_id,
2496 )
2497 if resource_response:
2498 setattr(content, "text", resource_response)
2499 # If content is any object that quacks like content
2500 elif hasattr(content, "text") or hasattr(content, "blob"):
2501 # Metrics are recorded in read_resource finally block for all resources
2502 if hasattr(content, "blob"):
2503 resource_response = await self.invoke_resource(
2504 db=db,
2505 resource_id=getattr(content, "id"),
2506 resource_uri=getattr(content, "uri") or None,
2507 resource_template_uri=getattr(content, "blob") or None,
2508 user_identity=user,
2509 meta_data=meta_data,
2510 resource_obj=resource_db,
2511 gateway_obj=resource_db_gateway,
2512 server_id=server_id,
2513 )
2514 if resource_response:
2515 setattr(content, "blob", resource_response)
2516 elif hasattr(content, "text"):
2517 resource_response = await self.invoke_resource(
2518 db=db,
2519 resource_id=getattr(content, "id"),
2520 resource_uri=getattr(content, "uri") or None,
2521 resource_template_uri=getattr(content, "text") or None,
2522 user_identity=user,
2523 meta_data=meta_data,
2524 resource_obj=resource_db,
2525 gateway_obj=resource_db_gateway,
2526 server_id=server_id,
2527 )
2528 if resource_response:
2529 setattr(content, "text", resource_response)
2530 # Normalize primitive types to ResourceContent
2531 elif isinstance(content, bytes):
2532 content = ResourceContent(type="resource", id=str(resource_id), uri=original_uri, blob=content)
2533 elif isinstance(content, str):
2534 content = ResourceContent(type="resource", id=str(resource_id), uri=original_uri, text=content)
2535 else:
2536 # Fallback to stringified content
2537 content = ResourceContent(type="resource", id=str(resource_id) or str(content.id), uri=original_uri or content.uri, text=str(content))
2539 # ═══════════════════════════════════════════════════════════════════════════
2540 # POST-FETCH HOOKS: Now called AFTER content is resolved from gateway
2541 # ═══════════════════════════════════════════════════════════════════════════
2542 if has_post_fetch:
2543 post_payload = ResourcePostFetchPayload(uri=original_uri, content=content)
2544 post_result, _ = await self._plugin_manager.invoke_hook(ResourceHookType.RESOURCE_POST_FETCH, post_payload, global_context, contexts, violations_as_exceptions=True)
2545 if post_result.modified_payload:
2546 content = post_result.modified_payload.content
2548 if span and content is not None and is_output_capture_enabled("resource.read"):
2549 set_span_attribute(span, "langfuse.observation.output", serialize_trace_payload(content))
2551 return content
2552 except Exception as e:
2553 success = False
2554 error_message = str(e)
2555 raise
2556 finally:
2557 # Record metrics only if we found a resource (not for templates)
2558 logger.debug(f"read_resource finally block: resource_db={'present' if resource_db else None}, resource_id={resource_db.id if resource_db else None}, server_id={server_id}")
2560 if resource_db:
2561 try:
2562 metrics_buffer.record_resource_metric(
2563 resource_id=resource_db.id,
2564 start_time=start_time,
2565 success=success,
2566 error_message=error_message,
2567 )
2568 except Exception as metrics_error:
2569 logger.warning(f"Failed to record resource metric: {metrics_error}")
2571 # Record server metrics ONLY when the server scoping check passed.
2572 # This prevents recording metrics with unvalidated server_id values
2573 # from admin API headers (X-Server-ID) or RPC params.
2574 if resource_db and server_scoped:
2575 try:
2576 logger.debug(f"Recording server metric for server_id={server_id}, resource_id={resource_db.id}, success={success}")
2577 # Record server metric only for the specific virtual server being accessed
2578 metrics_buffer.record_server_metric(
2579 server_id=server_id,
2580 start_time=start_time,
2581 success=success,
2582 error_message=error_message,
2583 )
2584 except Exception as metrics_error:
2585 logger.warning(f"Failed to record server metric: {metrics_error}")
2587 # End database span for observability dashboard
2588 # NOTE: Use fresh_db_session() since db may have been closed by invoke_resource
2589 if db_span_id and observability_service and not db_span_ended:
2590 try:
2591 with fresh_db_session() as fresh_db:
2592 observability_service.end_span(
2593 db=fresh_db,
2594 span_id=db_span_id,
2595 status="ok" if success else "error",
2596 status_message=error_message if error_message else None,
2597 )
2598 db_span_ended = True
2599 logger.debug(f"✓ Ended resource.read span: {db_span_id}")
2600 except Exception as e:
2601 logger.warning(f"Failed to end observability span for resource reading: {e}")
2603 async def set_resource_state(self, db: Session, resource_id: int, activate: bool, user_email: Optional[str] = None, skip_cache_invalidation: bool = False) -> ResourceRead:
2604 """
2605 Set the activation status of a resource.
2607 Args:
2608 db: Database session
2609 resource_id: Resource ID
2610 activate: True to activate, False to deactivate
2611 user_email: Optional[str] The email of the user to check if the user has permission to modify.
2612 skip_cache_invalidation: If True, skip cache invalidation (used for batch operations).
2614 Returns:
2615 The updated ResourceRead object
2617 Raises:
2618 ResourceNotFoundError: If the resource is not found.
2619 ResourceLockConflictError: If the resource is locked by another transaction.
2620 ResourceError: For other errors.
2621 PermissionError: If user doesn't own the resource.
2623 Examples:
2624 >>> from mcpgateway.services.resource_service import ResourceService
2625 >>> from unittest.mock import MagicMock, AsyncMock
2626 >>> from mcpgateway.schemas import ResourceRead
2627 >>> service = ResourceService()
2628 >>> db = MagicMock()
2629 >>> resource = MagicMock()
2630 >>> db.get.return_value = resource
2631 >>> db.commit = MagicMock()
2632 >>> db.refresh = MagicMock()
2633 >>> service._notify_resource_activated = AsyncMock()
2634 >>> service._notify_resource_deactivated = AsyncMock()
2635 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read')
2636 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read')
2637 >>> import asyncio
2638 >>> asyncio.run(service.set_resource_state(db, 1, True))
2639 'resource_read'
2640 """
2641 try:
2642 # Use nowait=True to fail fast if row is locked, preventing lock contention under high load
2643 try:
2644 resource = get_for_update(db, DbResource, resource_id, nowait=True)
2645 except OperationalError as lock_err:
2646 # Row is locked by another transaction - fail fast with 409
2647 db.rollback()
2648 raise ResourceLockConflictError(f"Resource {resource_id} is currently being modified by another request") from lock_err
2649 if not resource:
2650 raise ResourceNotFoundError(f"Resource not found: {resource_id}")
2652 if user_email:
2653 # First-Party
2654 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
2656 permission_service = PermissionService(db)
2657 if not await permission_service.check_resource_ownership(user_email, resource):
2658 raise PermissionError("Only the owner can activate the Resource" if activate else "Only the owner can deactivate the Resource")
2660 # Update status if it's different
2661 if resource.enabled != activate:
2662 resource.enabled = activate
2663 resource.updated_at = datetime.now(timezone.utc)
2664 db.commit()
2665 db.refresh(resource)
2667 # Invalidate cache after status change (skip for batch operations)
2668 if not skip_cache_invalidation:
2669 cache = _get_registry_cache()
2670 await cache.invalidate_resources()
2672 # Notify subscribers
2673 if activate:
2674 await self._notify_resource_activated(resource)
2675 else:
2676 await self._notify_resource_deactivated(resource)
2678 logger.info(f"Resource {resource.uri} {'activated' if activate else 'deactivated'}")
2680 # Structured logging: Audit trail for resource state change
2681 audit_trail.log_action(
2682 user_id=user_email or "system",
2683 action="set_resource_state",
2684 resource_type="resource",
2685 resource_id=str(resource.id),
2686 resource_name=resource.name,
2687 user_email=user_email,
2688 team_id=resource.team_id,
2689 new_values={
2690 "enabled": resource.enabled,
2691 },
2692 context={
2693 "action": "activate" if activate else "deactivate",
2694 },
2695 db=db,
2696 )
2698 # Structured logging: Log successful resource state change
2699 structured_logger.log(
2700 level="INFO",
2701 message=f"Resource {'activated' if activate else 'deactivated'} successfully",
2702 event_type="resource_state_changed",
2703 component="resource_service",
2704 user_email=user_email,
2705 team_id=resource.team_id,
2706 resource_type="resource",
2707 resource_id=str(resource.id),
2708 custom_fields={
2709 "resource_uri": resource.uri,
2710 "enabled": resource.enabled,
2711 },
2712 )
2714 resource.team = self._get_team_name(db, resource.team_id)
2715 return self.convert_resource_to_read(resource)
2716 except PermissionError as e:
2717 # Structured logging: Log permission error
2718 structured_logger.log(
2719 level="WARNING",
2720 message="Resource state change failed due to permission error",
2721 event_type="resource_state_change_permission_denied",
2722 component="resource_service",
2723 user_email=user_email,
2724 resource_type="resource",
2725 resource_id=str(resource_id),
2726 error=e,
2727 )
2728 raise e
2729 except ResourceLockConflictError:
2730 # Re-raise lock conflicts without wrapping - allows 409 response
2731 raise
2732 except ResourceNotFoundError:
2733 # Re-raise not found without wrapping - allows 404 response
2734 raise
2735 except Exception as e:
2736 db.rollback()
2738 # Structured logging: Log generic resource state change failure
2739 structured_logger.log(
2740 level="ERROR",
2741 message="Resource state change failed",
2742 event_type="resource_state_change_failed",
2743 component="resource_service",
2744 user_email=user_email,
2745 resource_type="resource",
2746 resource_id=str(resource_id),
2747 error=e,
2748 )
2749 raise ResourceError(f"Failed to set resource state: {str(e)}")
2751 async def subscribe_resource(
2752 self,
2753 db: Session,
2754 subscription: ResourceSubscription,
2755 *,
2756 user_email: Optional[str] = None,
2757 token_teams: Optional[List[str]] = None,
2758 ) -> None:
2759 """
2760 Subscribe to a resource.
2762 Args:
2763 db: Database session
2764 subscription: Resource subscription object
2765 user_email: Requester email used for visibility checks.
2766 token_teams: Token team scope used for visibility checks.
2768 Raises:
2769 ResourceNotFoundError: If the resource is not found or is inactive
2770 PermissionError: If the requester is not authorized for the resource
2771 ResourceError: For other subscription errors
2773 Examples:
2774 >>> from mcpgateway.services.resource_service import ResourceService
2775 >>> from unittest.mock import MagicMock
2776 >>> service = ResourceService()
2777 >>> db = MagicMock()
2778 >>> subscription = MagicMock()
2779 >>> import asyncio
2780 >>> asyncio.run(service.subscribe_resource(db, subscription))
2781 """
2782 try:
2783 # Verify resource exists (single query to avoid TOCTOU between active/inactive checks)
2784 resource = db.execute(select(DbResource).where(DbResource.uri == subscription.uri)).scalar_one_or_none()
2786 if not resource:
2787 raise ResourceNotFoundError(f"Resource not found: {subscription.uri}")
2789 if not resource.enabled:
2790 raise ResourceNotFoundError(f"Resource '{subscription.uri}' exists but is inactive")
2792 if not await self._check_resource_access(db, resource, user_email=user_email, token_teams=token_teams):
2793 raise PermissionError(f"Access denied for resource subscription: {subscription.uri}")
2795 # Create subscription
2796 db_sub = DbSubscription(resource_id=resource.id, subscriber_id=subscription.subscriber_id)
2797 db.add(db_sub)
2798 db.commit()
2800 logger.info(f"Added subscription for {subscription.uri} by {subscription.subscriber_id}")
2802 except PermissionError:
2803 db.rollback()
2804 raise
2805 except Exception as e:
2806 db.rollback()
2807 raise ResourceError(f"Failed to subscribe: {str(e)}")
2809 async def unsubscribe_resource(self, db: Session, subscription: ResourceSubscription) -> None:
2810 """
2811 Unsubscribe from a resource.
2813 Args:
2814 db: Database session
2815 subscription: Resource subscription object
2817 Raises:
2819 Examples:
2820 >>> from mcpgateway.services.resource_service import ResourceService
2821 >>> from unittest.mock import MagicMock
2822 >>> service = ResourceService()
2823 >>> db = MagicMock()
2824 >>> subscription = MagicMock()
2825 >>> import asyncio
2826 >>> asyncio.run(service.unsubscribe_resource(db, subscription))
2827 """
2828 try:
2829 # Find resource
2830 resource = db.execute(select(DbResource).where(DbResource.uri == subscription.uri)).scalar_one_or_none()
2832 if not resource:
2833 return
2835 # Remove subscription
2836 db.execute(select(DbSubscription).where(DbSubscription.resource_id == resource.id).where(DbSubscription.subscriber_id == subscription.subscriber_id)).delete()
2837 db.commit()
2839 logger.info(f"Removed subscription for {subscription.uri} by {subscription.subscriber_id}")
2841 except Exception as e:
2842 db.rollback()
2843 logger.error(f"Failed to unsubscribe: {str(e)}")
2845 async def update_resource(
2846 self,
2847 db: Session,
2848 resource_id: Union[int, str],
2849 resource_update: ResourceUpdate,
2850 modified_by: Optional[str] = None,
2851 modified_from_ip: Optional[str] = None,
2852 modified_via: Optional[str] = None,
2853 modified_user_agent: Optional[str] = None,
2854 user_email: Optional[str] = None,
2855 ) -> ResourceRead:
2856 """
2857 Update a resource.
2859 MIME Type Resolution Priority:
2860 1. **User-provided type** (highest priority) - Caller explicitly declares content type
2861 2. **URI-detected type** - Fallback when empty string provided
2862 3. **Content-based fallback** - If empty string and no URI detection
2863 4. **Preserve existing** - If None/not provided
2865 This ensures accuracy by preferring URL-detected types (e.g., .md → text/markdown)
2866 over potentially incorrect user input.
2868 Args:
2869 db: Database session
2870 resource_id: Resource ID
2871 resource_update: Resource update object
2872 modified_by: Username of the person modifying the resource
2873 modified_from_ip: IP address where the modification request originated
2874 modified_via: Source of modification (ui/api/import)
2875 modified_user_agent: User agent string from the modification request
2876 user_email: Email of user performing update (for ownership check)
2878 Returns:
2879 The updated ResourceRead object
2881 Raises:
2882 ResourceNotFoundError: If the resource is not found
2883 ResourceURIConflictError: If a resource with the same URI already exists.
2884 PermissionError: If user doesn't own the resource
2885 ResourceError: For other update errors
2886 IntegrityError: If a database integrity error occurs.
2887 Exception: For unexpected errors
2888 ContentSizeError: For content size exceed
2889 ContentTypeError: If the MIME type is not allowed
2891 Example:
2892 >>> from mcpgateway.services.resource_service import ResourceService
2893 >>> from unittest.mock import MagicMock, AsyncMock
2894 >>> from mcpgateway.schemas import ResourceRead, ResourceUpdate
2895 >>> service = ResourceService()
2896 >>> db = MagicMock()
2897 >>> resource = MagicMock()
2898 >>> resource.uri = "test://example"
2899 >>> resource.visibility = "private"
2900 >>> resource.team_id = None
2901 >>> db.get.return_value = resource
2902 >>> db.commit = MagicMock()
2903 >>> db.refresh = MagicMock()
2904 >>> service._notify_resource_updated = AsyncMock()
2905 >>> service._detect_mime_type_from_uri = MagicMock(return_value=None)
2906 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read')
2907 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read')
2908 >>> import asyncio
2909 >>> asyncio.run(service.update_resource(db, 'resource_id', ResourceUpdate()))
2910 'resource_read'
2911 """
2912 try:
2913 logger.info(f"Updating resource: {resource_id}")
2914 resource = get_for_update(db, DbResource, resource_id)
2915 if not resource:
2916 raise ResourceNotFoundError(f"Resource not found: {resource_id}")
2918 # # Check for uri conflict if uri is being changed and visibility is public
2919 if resource_update.uri and resource_update.uri != resource.uri:
2920 visibility = resource_update.visibility or resource.visibility
2921 team_id = resource_update.team_id or resource.team_id
2922 if visibility.lower() == "public":
2923 # Check for existing public resources with the same uri
2924 existing_resource = get_for_update(db, DbResource, where=and_(DbResource.uri == resource_update.uri, DbResource.visibility == "public", DbResource.id != resource_id))
2925 if existing_resource:
2926 raise ResourceURIConflictError(resource_update.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility)
2927 elif visibility.lower() == "team" and team_id:
2928 # Check for existing team resource with the same uri
2929 existing_resource = get_for_update(
2930 db, DbResource, where=and_(DbResource.uri == resource_update.uri, DbResource.visibility == "team", DbResource.team_id == team_id, DbResource.id != resource_id)
2931 )
2932 if existing_resource:
2933 raise ResourceURIConflictError(resource_update.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility)
2935 # Check ownership if user_email provided
2936 if user_email:
2937 # First-Party
2938 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
2940 permission_service = PermissionService(db)
2941 if not await permission_service.check_resource_ownership(user_email, resource):
2942 raise PermissionError("Only the owner can update this resource")
2944 # Update fields if provided
2945 if resource_update.uri is not None:
2946 resource.uri = resource_update.uri
2947 if resource_update.name is not None:
2948 resource.name = resource_update.name
2949 if resource_update.description is not None:
2950 resource.description = resource_update.description
2951 if resource_update.title is not None:
2952 resource.title = resource_update.title
2953 # Resolve the new MIME type into a local variable and validate
2954 # BEFORE writing to the tracked model to avoid dirty session state.
2955 # Priority: user-provided > URI-detected > content-based fallback.
2956 resolved_mime_type = None
2957 if resource_update.mime_type is not None:
2958 if resource_update.mime_type:
2959 # Non-empty: user explicitly provided a type — trust it
2960 resolved_mime_type = resource_update.mime_type
2961 else:
2962 # Empty string: auto-detect from URI/content
2963 content_for_detection = resource_update.content if resource_update.content is not None else (resource.text_content or resource.binary_content)
2964 uri_for_detection = resource_update.uri if resource_update.uri is not None else resource.uri
2965 resolved_mime_type = self._detect_mime_type(uri_for_detection, content_for_detection)
2966 logger.info(f"Auto-detected MIME type for resource {resource_id}: {resolved_mime_type}")
2967 elif resource_update.uri is not None:
2968 # URI changed but no MIME type provided — try URI detection as fallback
2969 url_detected_mime = self._detect_mime_type_from_uri(resource_update.uri)
2970 if url_detected_mime:
2971 resolved_mime_type = url_detected_mime
2973 # Validate the candidate MIME type BEFORE mutating the model
2974 content_security = get_content_security_service()
2975 if resolved_mime_type is not None:
2976 content_security.validate_resource_mime_type(
2977 mime_type=resolved_mime_type,
2978 uri=resource_update.uri or resource.uri,
2979 user_email=modified_by or user_email,
2980 ip_address=modified_from_ip,
2981 )
2982 # Validation passed — safe to assign
2983 resource.mime_type = resolved_mime_type
2985 if resource_update.uri_template is not None:
2986 resource.uri_template = resource_update.uri_template
2987 if resource_update.visibility is not None:
2988 # Validate visibility transitions
2989 if resource_update.visibility == "team":
2990 target_team_id = resource_update.team_id if resource_update.team_id is not None else resource.team_id
2991 _validate_resource_team_assignment(db, user_email, target_team_id)
2992 resource.visibility = resource_update.visibility
2994 # Update content if provided
2995 if resource_update.content is not None:
2996 # Validate content size before updating
2997 content_security.validate_resource_size(
2998 content=resource_update.content,
2999 uri=resource_update.uri or resource.uri,
3000 user_email=modified_by or user_email,
3001 ip_address=modified_from_ip,
3002 )
3004 # Determine content storage
3005 is_text = resource.mime_type and resource.mime_type.startswith("text/") or isinstance(resource_update.content, str)
3007 resource.text_content = resource_update.content if is_text else None
3008 resource.binary_content = (
3009 resource_update.content.encode() if is_text and isinstance(resource_update.content, str) else resource_update.content if isinstance(resource_update.content, bytes) else None
3010 )
3011 resource.size = len(resource_update.content)
3013 # Update tags if provided
3014 if resource_update.tags is not None:
3015 resource.tags = resource_update.tags
3017 # Update team assignment if provided, validating ownership
3018 if resource_update.team_id is not None:
3019 if resource_update.team_id != resource.team_id:
3020 _validate_resource_team_assignment(db, user_email, resource_update.team_id)
3021 resource.team_id = resource_update.team_id
3023 # Update metadata fields
3024 resource.updated_at = datetime.now(timezone.utc)
3025 if modified_by:
3026 resource.modified_by = modified_by
3027 if modified_from_ip:
3028 resource.modified_from_ip = modified_from_ip
3029 if modified_via:
3030 resource.modified_via = modified_via
3031 if modified_user_agent:
3032 resource.modified_user_agent = modified_user_agent
3033 if hasattr(resource, "version") and resource.version is not None:
3034 resource.version = resource.version + 1
3035 else:
3036 resource.version = 1
3037 db.commit()
3038 db.refresh(resource)
3040 # Invalidate cache after successful update
3041 cache = _get_registry_cache()
3042 await cache.invalidate_resources()
3043 # Also invalidate tags cache since resource tags may have changed
3044 # First-Party
3045 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
3047 await admin_stats_cache.invalidate_tags()
3048 # First-Party
3049 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
3051 metrics_cache.invalidate_prefix("top_resources:")
3052 metrics_cache.invalidate("resources")
3054 # Notify subscribers
3055 await self._notify_resource_updated(resource)
3057 logger.info(f"Updated resource: {resource.uri}")
3059 # Structured logging: Audit trail for resource update
3060 changes = []
3061 if resource_update.uri:
3062 changes.append(f"uri: {resource_update.uri}")
3063 if resource_update.visibility:
3064 changes.append(f"visibility: {resource_update.visibility}")
3065 if resource_update.description:
3066 changes.append("description updated")
3068 audit_trail.log_action(
3069 user_id=user_email or modified_by or "system",
3070 action="update_resource",
3071 resource_type="resource",
3072 resource_id=str(resource.id),
3073 resource_name=resource.name,
3074 user_email=user_email,
3075 team_id=resource.team_id,
3076 client_ip=modified_from_ip,
3077 user_agent=modified_user_agent,
3078 new_values={
3079 "uri": resource.uri,
3080 "name": resource.name,
3081 "version": resource.version,
3082 },
3083 context={
3084 "modified_via": modified_via,
3085 "changes": ", ".join(changes) if changes else "metadata only",
3086 },
3087 db=db,
3088 )
3090 # Structured logging: Log successful resource update
3091 structured_logger.log(
3092 level="INFO",
3093 message="Resource updated successfully",
3094 event_type="resource_updated",
3095 component="resource_service",
3096 user_id=modified_by,
3097 user_email=user_email,
3098 team_id=resource.team_id,
3099 resource_type="resource",
3100 resource_id=str(resource.id),
3101 custom_fields={
3102 "resource_uri": resource.uri,
3103 "version": resource.version,
3104 },
3105 )
3107 return self.convert_resource_to_read(resource)
3108 except PermissionError as pe:
3109 db.rollback()
3111 # Structured logging: Log permission error
3112 structured_logger.log(
3113 level="WARNING",
3114 message="Resource update failed due to permission error",
3115 event_type="resource_update_permission_denied",
3116 component="resource_service",
3117 user_email=user_email,
3118 resource_type="resource",
3119 resource_id=str(resource_id),
3120 error=pe,
3121 )
3122 raise
3123 except IntegrityError as ie:
3124 db.rollback()
3125 logger.error(f"IntegrityErrors in group: {ie}")
3127 # Structured logging: Log database integrity error
3128 structured_logger.log(
3129 level="ERROR",
3130 message="Resource update failed due to database integrity error",
3131 event_type="resource_update_failed",
3132 component="resource_service",
3133 user_id=modified_by,
3134 user_email=user_email,
3135 resource_type="resource",
3136 resource_id=str(resource_id),
3137 error=ie,
3138 )
3139 raise ie
3140 except ContentSizeError as cse:
3141 db.rollback()
3142 structured_logger.log(
3143 level="ERROR",
3144 message=f"Resource content size limit exceeded: {cse.actual_size} bytes (max: {cse.max_size} bytes)",
3145 event_type="resource_content_size_exceed",
3146 component="resource_service",
3147 resource_type="resource",
3148 user_id=modified_by,
3149 user_email=user_email,
3150 resource_id=str(resource_id),
3151 error=cse,
3152 )
3153 raise cse
3154 except ContentTypeError as cte:
3155 db.rollback()
3156 structured_logger.log(
3157 level="ERROR",
3158 message=f"Resource MIME type not allowed: {cte.mime_type}",
3159 event_type="resource_mime_type_rejected",
3160 component="resource_service",
3161 resource_type="resource",
3162 user_id=modified_by,
3163 user_email=user_email,
3164 resource_id=str(resource_id),
3165 error=cte,
3166 custom_fields={
3167 "mime_type": cte.mime_type,
3168 },
3169 )
3170 raise cte
3171 except ResourceURIConflictError as pe:
3172 logger.error(f"Resource URI conflict: {pe}")
3174 # Structured logging: Log URI conflict error
3175 structured_logger.log(
3176 level="WARNING",
3177 message="Resource update failed due to URI conflict",
3178 event_type="resource_uri_conflict",
3179 component="resource_service",
3180 user_id=modified_by,
3181 user_email=user_email,
3182 resource_type="resource",
3183 resource_id=str(resource_id),
3184 error=pe,
3185 )
3186 raise pe
3187 except Exception as e:
3188 db.rollback()
3189 if isinstance(e, ResourceNotFoundError):
3190 # Structured logging: Log not found error
3191 structured_logger.log(
3192 level="ERROR",
3193 message="Resource update failed - resource not found",
3194 event_type="resource_not_found",
3195 component="resource_service",
3196 user_email=user_email,
3197 resource_type="resource",
3198 resource_id=str(resource_id),
3199 error=e,
3200 )
3201 raise e
3203 # Structured logging: Log generic resource update failure
3204 structured_logger.log(
3205 level="ERROR",
3206 message="Resource update failed",
3207 event_type="resource_update_failed",
3208 component="resource_service",
3209 user_id=modified_by,
3210 user_email=user_email,
3211 resource_type="resource",
3212 resource_id=str(resource_id),
3213 error=e,
3214 )
3215 raise ResourceError(f"Failed to update resource: {str(e)}")
3217 async def delete_resource(self, db: Session, resource_id: Union[int, str], user_email: Optional[str] = None, purge_metrics: bool = False) -> None:
3218 """
3219 Delete a resource.
3221 Args:
3222 db: Database session
3223 resource_id: Resource ID
3224 user_email: Email of user performing delete (for ownership check)
3225 purge_metrics: If True, delete raw + rollup metrics for this resource
3227 Raises:
3228 ResourceNotFoundError: If the resource is not found
3229 PermissionError: If user doesn't own the resource
3230 ResourceError: For other deletion errors
3232 Example:
3233 >>> from mcpgateway.services.resource_service import ResourceService
3234 >>> from unittest.mock import MagicMock, AsyncMock
3235 >>> service = ResourceService()
3236 >>> db = MagicMock()
3237 >>> resource = MagicMock()
3238 >>> db.get.return_value = resource
3239 >>> db.delete = MagicMock()
3240 >>> db.commit = MagicMock()
3241 >>> service._notify_resource_deleted = AsyncMock()
3242 >>> import asyncio
3243 >>> asyncio.run(service.delete_resource(db, 'resource_id'))
3244 """
3245 try:
3246 # Find resource by its URI.
3247 resource = db.execute(select(DbResource).where(DbResource.id == resource_id)).scalar_one_or_none()
3249 if not resource:
3250 # If resource doesn't exist, rollback and re-raise a ResourceNotFoundError.
3251 db.rollback()
3252 raise ResourceNotFoundError(f"Resource not found: {resource_id}")
3254 # Check ownership if user_email provided
3255 if user_email:
3256 # First-Party
3257 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
3259 permission_service = PermissionService(db)
3260 if not await permission_service.check_resource_ownership(user_email, resource):
3261 raise PermissionError("Only the owner can delete this resource")
3263 # Store resource info for notification before deletion.
3264 resource_info = {
3265 "id": resource.id,
3266 "uri": resource.uri,
3267 "name": resource.name,
3268 "visibility": getattr(resource, "visibility", "public"),
3269 "team_id": getattr(resource, "team_id", None),
3270 "owner_email": getattr(resource, "owner_email", None),
3271 }
3273 # Remove subscriptions using SQLAlchemy's delete() expression.
3274 db.execute(delete(DbSubscription).where(DbSubscription.resource_id == resource.id))
3276 if purge_metrics:
3277 with pause_rollup_during_purge(reason=f"purge_resource:{resource.id}"):
3278 delete_metrics_in_batches(db, ResourceMetric, ResourceMetric.resource_id, resource.id)
3279 delete_metrics_in_batches(db, ResourceMetricsHourly, ResourceMetricsHourly.resource_id, resource.id)
3281 # Hard delete the resource.
3282 resource_uri = resource.uri
3283 resource_name = resource.name
3284 resource_team_id = resource.team_id
3286 db.delete(resource)
3287 db.commit()
3289 # Invalidate cache after successful deletion
3290 cache = _get_registry_cache()
3291 await cache.invalidate_resources()
3292 # Also invalidate tags cache since resource tags may have changed
3293 # First-Party
3294 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
3296 await admin_stats_cache.invalidate_tags()
3298 # Notify subscribers.
3299 await self._notify_resource_deleted(resource_info)
3301 logger.info(f"Permanently deleted resource: {resource.uri}")
3303 # Structured logging: Audit trail for resource deletion
3304 audit_trail.log_action(
3305 user_id=user_email or "system",
3306 action="delete_resource",
3307 resource_type="resource",
3308 resource_id=str(resource_info["id"]),
3309 resource_name=resource_name,
3310 user_email=user_email,
3311 team_id=resource_team_id,
3312 old_values={
3313 "uri": resource_uri,
3314 "name": resource_name,
3315 },
3316 db=db,
3317 )
3319 # Structured logging: Log successful resource deletion
3320 structured_logger.log(
3321 level="INFO",
3322 message="Resource deleted successfully",
3323 event_type="resource_deleted",
3324 component="resource_service",
3325 user_email=user_email,
3326 team_id=resource_team_id,
3327 resource_type="resource",
3328 resource_id=str(resource_info["id"]),
3329 custom_fields={
3330 "resource_uri": resource_uri,
3331 "purge_metrics": purge_metrics,
3332 },
3333 )
3335 except PermissionError as pe:
3336 db.rollback()
3338 # Structured logging: Log permission error
3339 structured_logger.log(
3340 level="WARNING",
3341 message="Resource deletion failed due to permission error",
3342 event_type="resource_delete_permission_denied",
3343 component="resource_service",
3344 user_email=user_email,
3345 resource_type="resource",
3346 resource_id=str(resource_id),
3347 error=pe,
3348 )
3349 raise
3350 except ResourceNotFoundError as rnfe:
3351 # ResourceNotFoundError is re-raised to be handled in the endpoint.
3352 # Structured logging: Log not found error
3353 structured_logger.log(
3354 level="ERROR",
3355 message="Resource deletion failed - resource not found",
3356 event_type="resource_not_found",
3357 component="resource_service",
3358 user_email=user_email,
3359 resource_type="resource",
3360 resource_id=str(resource_id),
3361 error=rnfe,
3362 )
3363 raise
3364 except Exception as e:
3365 db.rollback()
3367 # Structured logging: Log generic resource deletion failure
3368 structured_logger.log(
3369 level="ERROR",
3370 message="Resource deletion failed",
3371 event_type="resource_deletion_failed",
3372 component="resource_service",
3373 user_email=user_email,
3374 resource_type="resource",
3375 resource_id=str(resource_id),
3376 error=e,
3377 )
3378 raise ResourceError(f"Failed to delete resource: {str(e)}")
3380 async def get_resource_by_id(self, db: Session, resource_id: str, include_inactive: bool = False) -> ResourceRead:
3381 """
3382 Get a resource by ID.
3384 Args:
3385 db: Database session
3386 resource_id: Resource ID
3387 include_inactive: Whether to include inactive resources
3389 Returns:
3390 ResourceRead: The resource object
3392 Raises:
3393 ResourceNotFoundError: If the resource is not found
3395 Example:
3396 >>> from mcpgateway.services.resource_service import ResourceService
3397 >>> from unittest.mock import MagicMock
3398 >>> service = ResourceService()
3399 >>> db = MagicMock()
3400 >>> resource = MagicMock()
3401 >>> db.execute.return_value.scalar_one_or_none.return_value = resource
3402 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read')
3403 >>> import asyncio
3404 >>> asyncio.run(service.get_resource_by_id(db, "39334ce0ed2644d79ede8913a66930c9"))
3405 'resource_read'
3406 """
3407 with create_span("resource.get", {"resource.id": resource_id, "include_inactive": include_inactive}):
3408 query = select(DbResource).where(DbResource.id == resource_id)
3410 if not include_inactive:
3411 query = query.where(DbResource.enabled)
3413 resource = db.execute(query).scalar_one_or_none()
3415 if not resource:
3416 if not include_inactive:
3417 # Check if inactive resource exists
3418 inactive_resource = db.execute(select(DbResource).where(DbResource.id == resource_id).where(not_(DbResource.enabled))).scalar_one_or_none()
3420 if inactive_resource:
3421 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive")
3423 raise ResourceNotFoundError(f"Resource not found: {resource_id}")
3425 resource_read = self.convert_resource_to_read(resource)
3427 structured_logger.log(
3428 level="INFO",
3429 message="Resource retrieved successfully",
3430 event_type="resource_viewed",
3431 component="resource_service",
3432 team_id=getattr(resource, "team_id", None),
3433 resource_type="resource",
3434 resource_id=str(resource.id),
3435 custom_fields={
3436 "resource_uri": resource.uri,
3437 "include_inactive": include_inactive,
3438 },
3439 )
3441 return resource_read
3443 async def _notify_resource_activated(self, resource: DbResource) -> None:
3444 """
3445 Notify subscribers of resource activation.
3447 Args:
3448 resource: Resource to activate
3449 """
3450 event = {
3451 "type": "resource_activated",
3452 "data": {
3453 "id": resource.id,
3454 "uri": resource.uri,
3455 "name": resource.name,
3456 "enabled": True,
3457 "visibility": getattr(resource, "visibility", "public"),
3458 "team_id": getattr(resource, "team_id", None),
3459 "owner_email": getattr(resource, "owner_email", None),
3460 },
3461 "timestamp": datetime.now(timezone.utc).isoformat(),
3462 }
3463 await self._publish_event(event)
3465 async def _notify_resource_deactivated(self, resource: DbResource) -> None:
3466 """
3467 Notify subscribers of resource deactivation.
3469 Args:
3470 resource: Resource to deactivate
3471 """
3472 event = {
3473 "type": "resource_deactivated",
3474 "data": {
3475 "id": resource.id,
3476 "uri": resource.uri,
3477 "name": resource.name,
3478 "enabled": False,
3479 "visibility": getattr(resource, "visibility", "public"),
3480 "team_id": getattr(resource, "team_id", None),
3481 "owner_email": getattr(resource, "owner_email", None),
3482 },
3483 "timestamp": datetime.now(timezone.utc).isoformat(),
3484 }
3485 await self._publish_event(event)
3487 async def _notify_resource_deleted(self, resource_info: Dict[str, Any]) -> None:
3488 """
3489 Notify subscribers of resource deletion.
3491 Args:
3492 resource_info: Dictionary of resource to delete
3493 """
3494 event = {
3495 "type": "resource_deleted",
3496 "data": resource_info,
3497 "timestamp": datetime.now(timezone.utc).isoformat(),
3498 }
3499 await self._publish_event(event)
3501 async def _notify_resource_removed(self, resource: DbResource) -> None:
3502 """
3503 Notify subscribers of resource removal.
3505 Args:
3506 resource: Resource to remove
3507 """
3508 event = {
3509 "type": "resource_removed",
3510 "data": {
3511 "id": resource.id,
3512 "uri": resource.uri,
3513 "name": resource.name,
3514 "enabled": False,
3515 "visibility": getattr(resource, "visibility", "public"),
3516 "team_id": getattr(resource, "team_id", None),
3517 "owner_email": getattr(resource, "owner_email", None),
3518 },
3519 "timestamp": datetime.now(timezone.utc).isoformat(),
3520 }
3521 await self._publish_event(event)
3523 async def _event_visible_to_subscriber(self, event: Dict[str, Any], user_email: Optional[str], token_teams: Optional[List[str]]) -> bool:
3524 """Return whether a resource event is visible to a subscriber context.
3526 Args:
3527 event: Event payload emitted by the resource event stream.
3528 user_email: Subscriber email. ``None`` only for unrestricted admin context.
3529 token_teams: Subscriber token team scope.
3531 Returns:
3532 ``True`` when the event is visible to the subscriber, otherwise ``False``.
3533 """
3534 data = event.get("data") if isinstance(event, dict) else None
3535 if not isinstance(data, dict):
3536 return False
3538 visibility = data.get("visibility") or "public"
3539 team_id = data.get("team_id")
3540 owner_email = data.get("owner_email")
3541 event_resource = SimpleNamespace(visibility=visibility, team_id=team_id, owner_email=owner_email)
3543 effective_token_teams = token_teams
3544 if user_email and effective_token_teams is None:
3545 # Non-admin scoped flows should pass token_teams explicitly. If not,
3546 # fail closed to public-only for event filtering.
3547 effective_token_teams = []
3549 return await self._check_resource_access(
3550 db=None, # type: ignore[arg-type]
3551 resource=event_resource, # type: ignore[arg-type]
3552 user_email=user_email,
3553 token_teams=effective_token_teams,
3554 )
3556 async def subscribe_events(self, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> AsyncGenerator[Dict[str, Any], None]:
3557 """Subscribe to Resource events via the EventService.
3559 Args:
3560 user_email: Requesting user email. ``None`` with ``token_teams=None`` indicates unrestricted admin context.
3561 token_teams: Token team scope context:
3562 - ``None`` = unrestricted admin
3563 - ``[]`` = public-only
3564 - ``[...]`` = team-scoped access
3566 Yields:
3567 Resource event messages.
3568 """
3569 async for event in self._event_service.subscribe_events():
3570 if user_email is None and token_teams is None:
3571 yield event
3572 continue
3574 if await self._event_visible_to_subscriber(event, user_email, token_teams):
3575 yield event
3577 def _detect_mime_type_from_uri(self, uri: str) -> Optional[str]:
3578 """Detect MIME type from URI only (no fallback).
3580 Args:
3581 uri: Resource URI
3583 Returns:
3584 Detected MIME type from URI, or None if cannot be determined
3586 Examples:
3587 >>> service = ResourceService()
3588 >>> service._detect_mime_type_from_uri("https://example.com/file.md")
3589 'text/markdown'
3590 >>> service._detect_mime_type_from_uri("https://example.com/unknown")
3592 """
3593 mime_type, _ = mimetypes.guess_type(uri)
3594 return mime_type
3596 def _detect_mime_type(self, uri: str, content: Union[str, bytes]) -> str:
3597 """Detect mime type from URI and content with fallback.
3599 Priority:
3600 1. Try to detect from URI extension
3601 2. Fallback to content-based detection
3603 Args:
3604 uri: Resource URI
3605 content: Resource content
3607 Returns:
3608 Detected mime type (always returns a value)
3610 Examples:
3611 >>> service = ResourceService()
3612 >>> service._detect_mime_type("file.txt", "content")
3613 'text/plain'
3614 >>> service._detect_mime_type("unknown", "text content")
3615 'text/plain'
3616 """
3617 # Try from URI first
3618 mime_type = self._detect_mime_type_from_uri(uri)
3619 if mime_type:
3620 return mime_type
3622 # Fallback based on content type
3623 if isinstance(content, str):
3624 return "text/plain"
3626 return "application/octet-stream"
3628 async def _read_template_resource(self, db: Session, uri: str, include_inactive: Optional[bool] = False) -> ResourceContent:
3629 """
3630 Read a templated resource.
3632 Args:
3633 db: Database session.
3634 uri: Template URI with parameters.
3635 include_inactive: Whether to include inactive resources in DB lookups.
3637 Returns:
3638 ResourceContent: The resolved content from the matching template.
3640 Raises:
3641 ResourceNotFoundError: If no matching template is found.
3642 ResourceError: For other template resolution errors.
3643 NotImplementedError: If a binary template resource is encountered.
3644 """
3645 # Find matching template # DRT BREAKPOINT
3646 template = None
3647 if not self._template_cache:
3648 logger.info("_template_cache is empty, fetching exisitng resource templates")
3649 resource_templates = await self.list_resource_templates(db=db, include_inactive=include_inactive)
3650 for i in resource_templates:
3651 self._template_cache[i.name] = i
3652 for cached in self._template_cache.values():
3653 if self._uri_matches_template(uri, cached.uri_template):
3654 template = cached
3655 break
3657 if template:
3658 check_inactivity = db.execute(select(DbResource).where(DbResource.id == str(template.id)).where(not_(DbResource.enabled))).scalar_one_or_none()
3659 if check_inactivity:
3660 raise ResourceNotFoundError(f"Resource '{template.id}' exists but is inactive")
3661 else:
3662 raise ResourceNotFoundError(f"No template matches URI: {uri}")
3664 try:
3665 # Extract parameters
3666 params = self._extract_template_params(uri, template.uri_template)
3667 # Generate content
3668 if template.mime_type and template.mime_type.startswith("text/"):
3669 content = template.uri_template.format(**params)
3670 return ResourceContent(type="resource", id=str(template.id) or None, uri=template.uri_template or None, mime_type=template.mime_type or None, text=content)
3671 # # Handle binary template
3672 raise NotImplementedError("Binary resource templates not yet supported")
3674 except ResourceNotFoundError:
3675 raise
3676 except Exception as e:
3677 raise ResourceError(f"Failed to process template: {str(e)}") from e
3679 @staticmethod
3680 @lru_cache(maxsize=256)
3681 def _build_regex(template: str) -> re.Pattern:
3682 """
3683 Convert a URI template into a compiled regular expression.
3685 This parser supports a subset of RFC 6570–style templates for path
3686 matching. It extracts path parameters and converts them into named
3687 regex groups.
3689 Supported template features:
3690 - `{var}`
3691 A simple path parameter. Matches a single URI segment
3692 (i.e., any characters except `/`).
3693 → Translates to `(?P<var>[^/]+)`
3694 - `{var*}`
3695 A wildcard parameter. Matches one or more URI segments,
3696 including `/`.
3697 → Translates to `(?P<var>.+)`
3698 - `{?var1,var2}`
3699 Query-parameter expressions. These are ignored when building
3700 the regex for path matching and are stripped from the template.
3702 Example:
3703 Template: "files://root/{path*}/meta/{id}{?expand,debug}"
3704 Regex: r"^files://root/(?P<path>.+)/meta/(?P<id>[^/]+)$"
3706 Args:
3707 template: The URI template string containing parameter expressions.
3709 Returns:
3710 A compiled regular expression (re.Pattern) that can be used to
3711 match URIs and extract parameter values.
3713 Note:
3714 Results are cached using LRU cache (maxsize=256) to avoid
3715 recompiling the same template pattern repeatedly.
3716 """
3717 # Remove query parameter syntax for path matching
3718 template_without_query = re.sub(r"\{\?[^}]+\}", "", template)
3720 parts = re.split(r"(\{[^}]+\})", template_without_query)
3721 pattern = ""
3722 for part in parts:
3723 if part.startswith("{") and part.endswith("}"):
3724 name = part[1:-1]
3725 if name.endswith("*"):
3726 name = name[:-1]
3727 pattern += f"(?P<{name}>.+)"
3728 else:
3729 pattern += f"(?P<{name}>[^/]+)"
3730 else:
3731 pattern += re.escape(part)
3732 return re.compile(f"^{pattern}$")
3734 @staticmethod
3735 @lru_cache(maxsize=256)
3736 def _compile_parse_pattern(template: str) -> parse.Parser:
3737 """
3738 Compile a parse pattern for URI template parameter extraction.
3740 Args:
3741 template: The template pattern (e.g. "file:///{name}/{id}").
3743 Returns:
3744 Compiled parse.Parser object.
3746 Note:
3747 Results are cached using LRU cache (maxsize=256) to avoid
3748 recompiling the same template pattern repeatedly.
3749 """
3750 return parse.compile(template)
3752 def _extract_template_params(self, uri: str, template: str) -> Dict[str, str]:
3753 """
3754 Extract parameters from a URI based on a template.
3756 Args:
3757 uri: The actual URI containing parameter values.
3758 template: The template pattern (e.g. "file:///{name}/{id}").
3760 Returns:
3761 Dict of parameter names and extracted values.
3763 Note:
3764 Uses cached compiled parse patterns for better performance.
3765 """
3766 parser = self._compile_parse_pattern(template)
3767 result = parser.parse(uri)
3768 return result.named if result else {}
3770 def _uri_matches_template(self, uri: str, template: str) -> bool:
3771 """
3772 Check whether a URI matches a given template pattern.
3774 Args:
3775 uri: The URI to check.
3776 template: The template pattern.
3778 Returns:
3779 True if the URI matches the template, otherwise False.
3781 Note:
3782 Uses cached compiled regex patterns for better performance.
3783 """
3784 uri_path, _, _ = uri.partition("?")
3785 regex = self._build_regex(template)
3786 return bool(regex.match(uri_path))
3788 async def _notify_resource_added(self, resource: DbResource) -> None:
3789 """
3790 Notify subscribers of resource addition.
3792 Args:
3793 resource: Resource to add
3794 """
3795 event = {
3796 "type": "resource_added",
3797 "data": {
3798 "id": resource.id,
3799 "uri": resource.uri,
3800 "name": resource.name,
3801 "description": resource.description,
3802 "enabled": resource.enabled,
3803 "visibility": getattr(resource, "visibility", "public"),
3804 "team_id": getattr(resource, "team_id", None),
3805 "owner_email": getattr(resource, "owner_email", None),
3806 },
3807 "timestamp": datetime.now(timezone.utc).isoformat(),
3808 }
3809 await self._publish_event(event)
3811 async def _notify_resource_updated(self, resource: DbResource) -> None:
3812 """
3813 Notify subscribers of resource update.
3815 Args:
3816 resource: Resource to update
3817 """
3818 event = {
3819 "type": "resource_updated",
3820 "data": {
3821 "id": resource.id,
3822 "uri": resource.uri,
3823 "enabled": resource.enabled,
3824 "visibility": getattr(resource, "visibility", "public"),
3825 "team_id": getattr(resource, "team_id", None),
3826 "owner_email": getattr(resource, "owner_email", None),
3827 },
3828 "timestamp": datetime.now(timezone.utc).isoformat(),
3829 }
3830 await self._publish_event(event)
3832 async def _publish_event(self, event: Dict[str, Any]) -> None:
3833 """
3834 Publish event to all subscribers via the EventService.
3836 Args:
3837 event: Event to publish
3838 """
3839 await self._event_service.publish_event(event)
3841 # --- Resource templates ---
3842 async def list_resource_templates(
3843 self,
3844 db: Session,
3845 include_inactive: bool = False,
3846 user_email: Optional[str] = None,
3847 token_teams: Optional[List[str]] = None,
3848 tags: Optional[List[str]] = None,
3849 visibility: Optional[str] = None,
3850 server_id: Optional[str] = None,
3851 ) -> List[ResourceTemplate]:
3852 """
3853 List resource templates with visibility-based access control.
3855 Args:
3856 db: Database session
3857 include_inactive: Whether to include inactive templates
3858 user_email: Email of requesting user (for private visibility check)
3859 token_teams: Teams from JWT. None = admin (no filtering),
3860 [] = public-only (no owner access), [...] = team-scoped
3861 tags (Optional[List[str]]): Filter resources by tags. If provided, only resources with at least one matching tag will be returned.
3862 visibility (Optional[str]): Filter by visibility (private, team, public).
3863 server_id (Optional[str]): Filter by server ID. If provided, only templates associated with this server will be returned.
3865 Returns:
3866 List of ResourceTemplate objects the user has access to
3868 Examples:
3869 >>> from mcpgateway.services.resource_service import ResourceService
3870 >>> from unittest.mock import MagicMock, patch
3871 >>> service = ResourceService()
3872 >>> db = MagicMock()
3873 >>> template_obj = MagicMock()
3874 >>> db.execute.return_value.scalars.return_value.all.return_value = [template_obj]
3875 >>> with patch('mcpgateway.services.resource_service.ResourceTemplate') as MockResourceTemplate:
3876 ... MockResourceTemplate.model_validate.return_value = 'resource_template'
3877 ... import asyncio
3878 ... result = asyncio.run(service.list_resource_templates(db))
3879 ... result == ['resource_template']
3880 True
3881 """
3882 with create_span(
3883 "resource_template.list",
3884 {
3885 "include_inactive": include_inactive,
3886 "server_id": server_id,
3887 "tags.count": len(tags) if tags else 0,
3888 "visibility": visibility,
3889 },
3890 ):
3891 query = select(DbResource).where(DbResource.uri_template.isnot(None))
3893 # Filter by server_id if provided (same pattern as list_server_resources)
3894 if server_id:
3895 query = query.join(server_resource_association, DbResource.id == server_resource_association.c.resource_id).where(server_resource_association.c.server_id == server_id)
3897 if not include_inactive:
3898 query = query.where(DbResource.enabled)
3900 # Apply visibility filtering when token_teams is set (non-admin access)
3901 if token_teams is not None:
3902 # Check if this is a public-only token (empty teams array)
3903 # Public-only tokens can ONLY see public templates - no owner access
3904 is_public_only_token = len(token_teams) == 0
3906 conditions = [DbResource.visibility == "public"]
3908 # Only include owner access for non-public-only tokens with user_email
3909 if not is_public_only_token and user_email:
3910 conditions.append(DbResource.owner_email == user_email)
3912 if token_teams:
3913 conditions.append(and_(DbResource.team_id.in_(token_teams), DbResource.visibility.in_(["team", "public"])))
3915 query = query.where(or_(*conditions))
3917 # Cursor-based pagination logic can be implemented here in the future.
3918 if visibility:
3919 query = query.where(DbResource.visibility == visibility)
3921 if tags:
3922 query = query.where(json_contains_tag_expr(db, DbResource.tags, tags, match_any=True))
3924 templates = db.execute(query).scalars().all()
3925 result = [ResourceTemplate.model_validate(t) for t in templates]
3926 return result
3928 # --- Metrics ---
3929 async def aggregate_metrics(self, db: Session) -> ResourceMetrics:
3930 """
3931 Aggregate metrics for all resource invocations across all resources.
3933 Combines recent raw metrics (within retention period) with historical
3934 hourly rollups for complete historical coverage. Uses in-memory caching
3935 (10s TTL) to reduce database load under high request rates.
3937 Args:
3938 db: Database session
3940 Returns:
3941 ResourceMetrics: Aggregated metrics from raw + hourly rollup tables.
3943 Examples:
3944 >>> from mcpgateway.services.resource_service import ResourceService
3945 >>> service = ResourceService()
3946 >>> # Method exists and is callable
3947 >>> callable(service.aggregate_metrics)
3948 True
3949 """
3950 # Check cache first (if enabled)
3951 # First-Party
3952 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel
3954 if is_cache_enabled():
3955 cached = metrics_cache.get("resources")
3956 if cached is not None:
3957 return ResourceMetrics(**cached)
3959 # Use combined raw + rollup query for full historical coverage
3960 # First-Party
3961 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel
3963 result = aggregate_metrics_combined(db, "resource")
3965 metrics = ResourceMetrics(
3966 total_executions=result.total_executions,
3967 successful_executions=result.successful_executions,
3968 failed_executions=result.failed_executions,
3969 failure_rate=result.failure_rate,
3970 min_response_time=result.min_response_time,
3971 max_response_time=result.max_response_time,
3972 avg_response_time=result.avg_response_time,
3973 last_execution_time=result.last_execution_time,
3974 )
3976 # Cache the result as dict for serialization compatibility (if enabled)
3977 if is_cache_enabled():
3978 metrics_cache.set("resources", metrics.model_dump())
3980 return metrics
3982 async def reset_metrics(self, db: Session) -> None:
3983 """
3984 Reset all resource metrics by deleting raw and hourly rollup records.
3986 Args:
3987 db: Database session
3989 Examples:
3990 >>> from mcpgateway.services.resource_service import ResourceService
3991 >>> from unittest.mock import MagicMock
3992 >>> service = ResourceService()
3993 >>> db = MagicMock()
3994 >>> db.execute = MagicMock()
3995 >>> db.commit = MagicMock()
3996 >>> import asyncio
3997 >>> asyncio.run(service.reset_metrics(db))
3998 """
3999 db.execute(delete(ResourceMetric))
4000 db.execute(delete(ResourceMetricsHourly))
4001 db.commit()
4003 # Invalidate metrics cache
4004 # First-Party
4005 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
4007 metrics_cache.invalidate("resources")
4008 metrics_cache.invalidate_prefix("top_resources:")
4011# Lazy singleton - created on first access, not at module import time.
4012# This avoids instantiation when only exception classes are imported.
4013_resource_service_instance = None # pylint: disable=invalid-name
4016def __getattr__(name: str):
4017 """Module-level __getattr__ for lazy singleton creation.
4019 Args:
4020 name: The attribute name being accessed.
4022 Returns:
4023 The resource_service singleton instance if name is "resource_service".
4025 Raises:
4026 AttributeError: If the attribute name is not "resource_service".
4027 """
4028 global _resource_service_instance # pylint: disable=global-statement
4029 if name == "resource_service":
4030 if _resource_service_instance is None:
4031 _resource_service_instance = ResourceService()
4032 return _resource_service_instance
4033 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")