Coverage for mcpgateway / services / resource_service.py: 99%
1190 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/resource_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Resource Service Implementation.
8This module implements resource management according to the MCP specification.
9It handles:
10- Resource registration and retrieval
11- Resource templates and URI handling
12- Resource subscriptions and updates
13- Content type management
14- Active/inactive resource management
16Examples:
17 >>> from mcpgateway.services.resource_service import ResourceService, ResourceError
18 >>> service = ResourceService()
19 >>> isinstance(service._event_service, EventService)
20 True
21"""
23# Standard
24import binascii
25from datetime import datetime, timezone
26from functools import lru_cache
27import mimetypes
28import os
29import re
30import ssl
31import time
32from types import SimpleNamespace
33from typing import Any, AsyncGenerator, Dict, List, Optional, Union
34import uuid
36# Third-Party
37import httpx
38from mcp import ClientSession
39from mcp.client.sse import sse_client
40from mcp.client.streamable_http import streamablehttp_client
41import parse
42from pydantic import ValidationError
43from sqlalchemy import and_, delete, desc, not_, or_, select
44from sqlalchemy.exc import IntegrityError, OperationalError
45from sqlalchemy.orm import joinedload, Session
47# First-Party
48from mcpgateway.common.models import ResourceContent, ResourceContents, ResourceTemplate, TextContent
49from mcpgateway.common.validators import SecurityValidator
50from mcpgateway.config import settings
51from mcpgateway.db import EmailTeam, fresh_db_session
52from mcpgateway.db import Gateway as DbGateway
53from mcpgateway.db import get_for_update
54from mcpgateway.db import Resource as DbResource
55from mcpgateway.db import ResourceMetric, ResourceMetricsHourly
56from mcpgateway.db import ResourceSubscription as DbSubscription
57from mcpgateway.db import server_resource_association
58from mcpgateway.observability import create_span
59from mcpgateway.schemas import ResourceCreate, ResourceMetrics, ResourceRead, ResourceSubscription, ResourceUpdate, TopPerformer
60from mcpgateway.services.audit_trail_service import get_audit_trail_service
61from mcpgateway.services.base_service import BaseService
62from mcpgateway.services.event_service import EventService
63from mcpgateway.services.logging_service import LoggingService
64from mcpgateway.services.mcp_session_pool import get_mcp_session_pool, TransportType
65from mcpgateway.services.metrics_buffer_service import get_metrics_buffer_service
66from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge
67from mcpgateway.services.oauth_manager import OAuthManager
68from mcpgateway.services.observability_service import current_trace_id, ObservabilityService
69from mcpgateway.services.structured_logger import get_structured_logger
70from mcpgateway.utils.gateway_access import build_gateway_auth_headers, check_gateway_access
71from mcpgateway.utils.metrics_common import build_top_performers
72from mcpgateway.utils.pagination import unified_paginate
73from mcpgateway.utils.services_auth import decode_auth
74from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr
75from mcpgateway.utils.ssl_context_cache import get_cached_ssl_context
76from mcpgateway.utils.url_auth import apply_query_param_auth, sanitize_exception_message
77from mcpgateway.utils.validate_signature import validate_signature
79# Plugin support imports (conditional)
80try:
81 # First-Party
82 from mcpgateway.plugins.framework import get_plugin_manager, GlobalContext, PluginContextTable, ResourceHookType, ResourcePostFetchPayload, ResourcePreFetchPayload
84 PLUGINS_AVAILABLE = True
85except ImportError:
86 PLUGINS_AVAILABLE = False
88# Cache import (lazy to avoid circular dependencies)
89_REGISTRY_CACHE = None
92def _get_registry_cache():
93 """Get registry cache singleton lazily.
95 Returns:
96 RegistryCache instance.
97 """
98 global _REGISTRY_CACHE # pylint: disable=global-statement
99 if _REGISTRY_CACHE is None:
100 # First-Party
101 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel
103 _REGISTRY_CACHE = registry_cache
104 return _REGISTRY_CACHE
107# Initialize logging service first
108logging_service = LoggingService()
109logger = logging_service.get_logger(__name__)
111# Initialize structured logger, audit trail, and metrics buffer for resource operations
112structured_logger = get_structured_logger("resource_service")
113audit_trail = get_audit_trail_service()
114metrics_buffer = get_metrics_buffer_service()
117class ResourceError(Exception):
118 """Base class for resource-related errors."""
121class ResourceNotFoundError(ResourceError):
122 """Raised when a requested resource is not found."""
125class ResourceURIConflictError(ResourceError):
126 """Raised when a resource URI conflicts with existing (active or inactive) resource."""
128 def __init__(self, uri: str, enabled: bool = True, resource_id: Optional[int] = None, visibility: str = "public") -> None:
129 """Initialize the error with resource information.
131 Args:
132 uri: The conflicting resource URI
133 enabled: Whether the existing resource is active
134 resource_id: ID of the existing resource if available
135 visibility: Visibility status of the resource
136 """
137 self.uri = uri
138 self.enabled = enabled
139 self.resource_id = resource_id
140 message = f"{visibility.capitalize()} Resource already exists with URI: {uri}"
141 logger.info(f"ResourceURIConflictError: {message}")
142 if not enabled:
143 message += f" (currently inactive, ID: {resource_id})"
144 super().__init__(message)
147class ResourceValidationError(ResourceError):
148 """Raised when resource validation fails."""
151class ResourceLockConflictError(ResourceError):
152 """Raised when a resource row is locked by another transaction.
154 Raises:
155 ResourceLockConflictError: When attempting to modify a resource that is
156 currently locked by another concurrent request.
157 """
160class ResourceService(BaseService):
161 """Service for managing resources.
163 Handles:
164 - Resource registration and retrieval
165 - Resource templates and URIs
166 - Resource subscriptions
167 - Content type detection
168 - Active/inactive status management
169 """
171 _visibility_model_cls = DbResource
173 def __init__(self) -> None:
174 """Initialize the resource service."""
175 self._event_service = EventService(channel_name="mcpgateway:resource_events")
176 self._template_cache: Dict[str, ResourceTemplate] = {}
177 self.oauth_manager = OAuthManager(request_timeout=int(os.getenv("OAUTH_REQUEST_TIMEOUT", "30")), max_retries=int(os.getenv("OAUTH_MAX_RETRIES", "3")))
179 # Initialize plugin manager if plugins are enabled in settings
180 self._plugin_manager = None
181 if PLUGINS_AVAILABLE:
182 try:
183 self._plugin_manager = get_plugin_manager()
184 if self._plugin_manager:
185 logger.info("Plugin manager initialized for ResourceService with config: %s", settings.plugins.config_file)
186 except Exception as e:
187 logger.warning("Plugin manager initialization failed in ResourceService: %s", e)
188 self._plugin_manager = None
190 # Initialize mime types
191 mimetypes.init()
193 async def initialize(self) -> None:
194 """Initialize the service."""
195 logger.info("Initializing resource service")
196 await self._event_service.initialize()
198 async def shutdown(self) -> None:
199 """Shutdown the service."""
200 # Clear subscriptions
201 await self._event_service.shutdown()
202 logger.info("Resource service shutdown complete")
204 async def get_top_resources(self, db: Session, limit: Optional[int] = 5, include_deleted: bool = False) -> List[TopPerformer]:
205 """Retrieve the top-performing resources based on execution count.
207 Queries the database to get resources with their metrics, ordered by the number of executions
208 in descending order. Combines recent raw metrics with historical hourly rollups for complete
209 historical coverage. Uses the resource URI as the name field for TopPerformer objects.
210 Returns a list of TopPerformer objects containing resource details and performance metrics.
211 Results are cached for performance.
213 Args:
214 db (Session): Database session for querying resource metrics.
215 limit (Optional[int]): Maximum number of resources to return. Defaults to 5.
216 include_deleted (bool): Whether to include deleted resources from rollups.
218 Returns:
219 List[TopPerformer]: A list of TopPerformer objects, each containing:
220 - id: Resource ID.
221 - name: Resource URI (used as the name field).
222 - execution_count: Total number of executions.
223 - avg_response_time: Average response time in seconds, or None if no metrics.
224 - success_rate: Success rate percentage, or None if no metrics.
225 - last_execution: Timestamp of the last execution, or None if no metrics.
226 """
227 # Check cache first (if enabled)
228 # First-Party
229 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel
231 effective_limit = limit or 5
232 cache_key = f"top_resources:{effective_limit}:include_deleted={include_deleted}"
234 if is_cache_enabled():
235 cached = metrics_cache.get(cache_key)
236 if cached is not None:
237 return cached
239 # Use combined query that includes both raw metrics and rollup data
240 # Use name_column="uri" to maintain backward compatibility (resources show URI as name)
241 # First-Party
242 from mcpgateway.services.metrics_query_service import get_top_performers_combined # pylint: disable=import-outside-toplevel
244 results = get_top_performers_combined(
245 db=db,
246 metric_type="resource",
247 entity_model=DbResource,
248 limit=effective_limit,
249 name_column="uri", # Resources use URI as display name
250 include_deleted=include_deleted,
251 )
252 top_performers = build_top_performers(results)
254 # Cache the result (if enabled)
255 if is_cache_enabled():
256 metrics_cache.set(cache_key, top_performers)
258 return top_performers
260 def convert_resource_to_read(self, resource: DbResource, include_metrics: bool = False) -> ResourceRead:
261 """
262 Converts a DbResource instance into a ResourceRead model, optionally including aggregated metrics.
264 Args:
265 resource (DbResource): The ORM instance of the resource.
266 include_metrics (bool): Whether to include metrics in the result. Defaults to False.
267 Set to False for list operations to avoid N+1 query issues.
269 Returns:
270 ResourceRead: The Pydantic model representing the resource, optionally including aggregated metrics.
272 Examples:
273 >>> from types import SimpleNamespace
274 >>> from datetime import datetime, timezone
275 >>> svc = ResourceService()
276 >>> now = datetime.now(timezone.utc)
277 >>> # Fake metrics
278 >>> m1 = SimpleNamespace(is_success=True, response_time=0.1, timestamp=now)
279 >>> m2 = SimpleNamespace(is_success=False, response_time=0.3, timestamp=now)
280 >>> r = SimpleNamespace(
281 ... id="ca627760127d409080fdefc309147e08", uri='res://x', name='R', description=None, mime_type='text/plain', size=123,
282 ... created_at=now, updated_at=now, enabled=True, tags=[{"id": "t", "label": "T"}], metrics=[m1, m2]
283 ... )
284 >>> out = svc.convert_resource_to_read(r, include_metrics=True)
285 >>> out.metrics.total_executions
286 2
287 >>> out.metrics.successful_executions
288 1
289 """
290 resource_dict = resource.__dict__.copy()
291 # Remove SQLAlchemy state and any pre-existing 'metrics' attribute
292 resource_dict.pop("_sa_instance_state", None)
293 resource_dict.pop("metrics", None)
295 # Ensure required base fields are present even if SQLAlchemy hasn't loaded them into __dict__ yet
296 resource_dict["id"] = getattr(resource, "id", resource_dict.get("id"))
297 resource_dict["uri"] = getattr(resource, "uri", resource_dict.get("uri"))
298 resource_dict["name"] = getattr(resource, "name", resource_dict.get("name"))
299 resource_dict["description"] = getattr(resource, "description", resource_dict.get("description"))
300 resource_dict["mime_type"] = getattr(resource, "mime_type", resource_dict.get("mime_type"))
301 resource_dict["size"] = getattr(resource, "size", resource_dict.get("size"))
302 resource_dict["created_at"] = getattr(resource, "created_at", resource_dict.get("created_at"))
303 resource_dict["updated_at"] = getattr(resource, "updated_at", resource_dict.get("updated_at"))
304 resource_dict["is_active"] = getattr(resource, "is_active", resource_dict.get("is_active"))
305 resource_dict["enabled"] = getattr(resource, "enabled", resource_dict.get("enabled"))
307 # Compute aggregated metrics from the resource's metrics list (only if requested)
308 if include_metrics:
309 total = len(resource.metrics) if hasattr(resource, "metrics") and resource.metrics is not None else 0
310 successful = sum(1 for m in resource.metrics if m.is_success) if total > 0 else 0
311 failed = sum(1 for m in resource.metrics if not m.is_success) if total > 0 else 0
312 failure_rate = (failed / total) if total > 0 else 0.0
313 min_rt = min((m.response_time for m in resource.metrics), default=None) if total > 0 else None
314 max_rt = max((m.response_time for m in resource.metrics), default=None) if total > 0 else None
315 avg_rt = (sum(m.response_time for m in resource.metrics) / total) if total > 0 else None
316 last_time = max((m.timestamp for m in resource.metrics), default=None) if total > 0 else None
318 resource_dict["metrics"] = {
319 "total_executions": total,
320 "successful_executions": successful,
321 "failed_executions": failed,
322 "failure_rate": failure_rate,
323 "min_response_time": min_rt,
324 "max_response_time": max_rt,
325 "avg_response_time": avg_rt,
326 "last_execution_time": last_time,
327 }
328 else:
329 resource_dict["metrics"] = None
331 raw_tags = resource.tags or []
332 normalized_tags = []
333 for tag in raw_tags:
334 if isinstance(tag, str):
335 normalized_tags.append(tag)
336 continue
337 if isinstance(tag, dict):
338 label = tag.get("label") or tag.get("name")
339 if label:
340 normalized_tags.append(label)
341 continue
342 label = getattr(tag, "label", None) or getattr(tag, "name", None)
343 if label:
344 normalized_tags.append(label)
345 resource_dict["tags"] = normalized_tags
346 resource_dict["team"] = getattr(resource, "team", None)
348 # Include metadata fields for proper API response
349 resource_dict["created_by"] = getattr(resource, "created_by", None)
350 resource_dict["modified_by"] = getattr(resource, "modified_by", None)
351 resource_dict["created_at"] = getattr(resource, "created_at", None)
352 resource_dict["updated_at"] = getattr(resource, "updated_at", None)
353 resource_dict["version"] = getattr(resource, "version", None)
354 return ResourceRead.model_validate(resource_dict)
356 def _get_team_name(self, db: Session, team_id: Optional[str]) -> Optional[str]:
357 """Retrieve the team name given a team ID.
359 Args:
360 db (Session): Database session for querying teams.
361 team_id (Optional[str]): The ID of the team.
363 Returns:
364 Optional[str]: The name of the team if found, otherwise None.
365 """
366 if not team_id:
367 return None
368 team = db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first()
369 db.commit() # Release transaction to avoid idle-in-transaction
370 return team.name if team else None
372 async def register_resource(
373 self,
374 db: Session,
375 resource: ResourceCreate,
376 created_by: Optional[str] = None,
377 created_from_ip: Optional[str] = None,
378 created_via: Optional[str] = None,
379 created_user_agent: Optional[str] = None,
380 import_batch_id: Optional[str] = None,
381 federation_source: Optional[str] = None,
382 team_id: Optional[str] = None,
383 owner_email: Optional[str] = None,
384 visibility: Optional[str] = "public",
385 ) -> ResourceRead:
386 """Register a new resource.
388 Args:
389 db: Database session
390 resource: Resource creation schema
391 created_by: User who created the resource
392 created_from_ip: IP address of the creator
393 created_via: Method used to create the resource (e.g., API, UI)
394 created_user_agent: User agent of the creator
395 import_batch_id: Optional batch ID for bulk imports
396 federation_source: Optional source of the resource if federated
397 team_id (Optional[str]): Team ID to assign the resource to.
398 owner_email (Optional[str]): Email of the user who owns this resource.
399 visibility (str): Resource visibility level (private, team, public).
401 Returns:
402 Created resource information
404 Raises:
405 IntegrityError: If a database integrity error occurs.
406 ResourceURIConflictError: If a resource with the same URI already exists.
407 ResourceError: For other resource registration errors
409 Examples:
410 >>> from mcpgateway.services.resource_service import ResourceService
411 >>> from unittest.mock import MagicMock, AsyncMock
412 >>> from mcpgateway.schemas import ResourceRead
413 >>> service = ResourceService()
414 >>> db = MagicMock()
415 >>> resource = MagicMock()
416 >>> db.execute.return_value.scalar_one_or_none.return_value = None
417 >>> db.add = MagicMock()
418 >>> db.commit = MagicMock()
419 >>> db.refresh = MagicMock()
420 >>> service._notify_resource_added = AsyncMock()
421 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read')
422 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read')
423 >>> import asyncio
424 >>> asyncio.run(service.register_resource(db, resource))
425 'resource_read'
426 """
427 try:
428 logger.info(f"Registering resource: {resource.uri}")
430 # Extract gateway_id from resource if present
431 gateway_id = getattr(resource, "gateway_id", None)
433 # Check for existing server with the same uri
434 if visibility.lower() == "public":
435 logger.info(f"visibility:: {visibility}")
436 # Check for existing public resource with the same uri and gateway_id
437 existing_resource = db.execute(select(DbResource).where(DbResource.uri == resource.uri, DbResource.visibility == "public", DbResource.gateway_id == gateway_id)).scalar_one_or_none()
438 if existing_resource:
439 raise ResourceURIConflictError(resource.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility)
440 elif visibility.lower() == "team" and team_id:
441 # Check for existing team resource with the same uri and gateway_id
442 existing_resource = db.execute(
443 select(DbResource).where(DbResource.uri == resource.uri, DbResource.visibility == "team", DbResource.team_id == team_id, DbResource.gateway_id == gateway_id)
444 ).scalar_one_or_none()
445 if existing_resource:
446 raise ResourceURIConflictError(resource.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility)
448 # Detect mime type if not provided
449 mime_type = resource.mime_type
450 if not mime_type:
451 mime_type = self._detect_mime_type(resource.uri, resource.content)
453 # Determine content storage
454 is_text = mime_type and mime_type.startswith("text/") or isinstance(resource.content, str)
456 # Create DB model
457 db_resource = DbResource(
458 uri=resource.uri,
459 name=resource.name,
460 description=resource.description,
461 mime_type=mime_type,
462 uri_template=resource.uri_template,
463 text_content=resource.content if is_text else None,
464 binary_content=(resource.content.encode() if is_text and isinstance(resource.content, str) else resource.content if isinstance(resource.content, bytes) else None),
465 size=len(resource.content) if resource.content else 0,
466 tags=resource.tags or [],
467 created_by=created_by,
468 created_from_ip=created_from_ip,
469 created_via=created_via,
470 created_user_agent=created_user_agent,
471 import_batch_id=import_batch_id,
472 federation_source=federation_source,
473 version=1,
474 # Team scoping fields - use schema values if provided, otherwise fallback to parameters
475 team_id=getattr(resource, "team_id", None) or team_id,
476 owner_email=getattr(resource, "owner_email", None) or owner_email or created_by,
477 # Endpoint visibility parameter takes precedence over schema default
478 visibility=visibility if visibility is not None else getattr(resource, "visibility", "public"),
479 gateway_id=gateway_id,
480 )
482 # Add to DB
483 db.add(db_resource)
484 db.commit()
485 db.refresh(db_resource)
487 # Notify subscribers
488 await self._notify_resource_added(db_resource)
490 logger.info(f"Registered resource: {resource.uri}")
492 # Structured logging: Audit trail for resource creation
493 audit_trail.log_action(
494 user_id=created_by or "system",
495 action="create_resource",
496 resource_type="resource",
497 resource_id=str(db_resource.id),
498 resource_name=db_resource.name,
499 user_email=owner_email,
500 team_id=team_id,
501 client_ip=created_from_ip,
502 user_agent=created_user_agent,
503 new_values={
504 "uri": db_resource.uri,
505 "name": db_resource.name,
506 "visibility": visibility,
507 "mime_type": db_resource.mime_type,
508 },
509 context={
510 "created_via": created_via,
511 "import_batch_id": import_batch_id,
512 "federation_source": federation_source,
513 },
514 db=db,
515 )
517 # Structured logging: Log successful resource creation
518 structured_logger.log(
519 level="INFO",
520 message="Resource created successfully",
521 event_type="resource_created",
522 component="resource_service",
523 user_id=created_by,
524 user_email=owner_email,
525 team_id=team_id,
526 resource_type="resource",
527 resource_id=str(db_resource.id),
528 custom_fields={
529 "resource_uri": db_resource.uri,
530 "resource_name": db_resource.name,
531 "visibility": visibility,
532 },
533 )
535 db_resource.team = self._get_team_name(db, db_resource.team_id)
536 return self.convert_resource_to_read(db_resource)
537 except IntegrityError as ie:
538 logger.error(f"IntegrityErrors in group: {ie}")
540 # Structured logging: Log database integrity error
541 structured_logger.log(
542 level="ERROR",
543 message="Resource creation failed due to database integrity error",
544 event_type="resource_creation_failed",
545 component="resource_service",
546 user_id=created_by,
547 user_email=owner_email,
548 error=ie,
549 custom_fields={
550 "resource_uri": resource.uri,
551 },
552 )
553 raise ie
554 except ResourceURIConflictError as rce:
555 logger.error(f"ResourceURIConflictError in group: {resource.uri}")
557 # Structured logging: Log URI conflict error
558 structured_logger.log(
559 level="WARNING",
560 message="Resource creation failed due to URI conflict",
561 event_type="resource_uri_conflict",
562 component="resource_service",
563 user_id=created_by,
564 user_email=owner_email,
565 custom_fields={
566 "resource_uri": resource.uri,
567 "visibility": visibility,
568 },
569 )
570 raise rce
571 except Exception as e:
572 db.rollback()
574 # Structured logging: Log generic resource creation failure
575 structured_logger.log(
576 level="ERROR",
577 message="Resource creation failed",
578 event_type="resource_creation_failed",
579 component="resource_service",
580 user_id=created_by,
581 user_email=owner_email,
582 error=e,
583 custom_fields={
584 "resource_uri": resource.uri,
585 },
586 )
587 raise ResourceError(f"Failed to register resource: {str(e)}")
589 async def register_resources_bulk(
590 self,
591 db: Session,
592 resources: List[ResourceCreate],
593 created_by: Optional[str] = None,
594 created_from_ip: Optional[str] = None,
595 created_via: Optional[str] = None,
596 created_user_agent: Optional[str] = None,
597 import_batch_id: Optional[str] = None,
598 federation_source: Optional[str] = None,
599 team_id: Optional[str] = None,
600 owner_email: Optional[str] = None,
601 visibility: Optional[str] = "public",
602 conflict_strategy: str = "skip",
603 ) -> Dict[str, Any]:
604 """Register multiple resources in bulk with a single commit.
606 This method provides significant performance improvements over individual
607 resource registration by:
608 - Using db.add_all() instead of individual db.add() calls
609 - Performing a single commit for all resources
610 - Batch conflict detection
611 - Chunking for very large imports (>500 items)
613 Args:
614 db: Database session
615 resources: List of resource creation schemas
616 created_by: Username who created these resources
617 created_from_ip: IP address of creator
618 created_via: Creation method (ui, api, import, federation)
619 created_user_agent: User agent of creation request
620 import_batch_id: UUID for bulk import operations
621 federation_source: Source gateway for federated resources
622 team_id: Team ID to assign the resources to
623 owner_email: Email of the user who owns these resources
624 visibility: Resource visibility level (private, team, public)
625 conflict_strategy: How to handle conflicts (skip, update, rename, fail)
627 Returns:
628 Dict with statistics:
629 - created: Number of resources created
630 - updated: Number of resources updated
631 - skipped: Number of resources skipped
632 - failed: Number of resources that failed
633 - errors: List of error messages
635 Raises:
636 ResourceError: If bulk registration fails critically
638 Examples:
639 >>> from mcpgateway.services.resource_service import ResourceService
640 >>> from unittest.mock import MagicMock
641 >>> service = ResourceService()
642 >>> db = MagicMock()
643 >>> resources = [MagicMock(), MagicMock()]
644 >>> import asyncio
645 >>> try:
646 ... result = asyncio.run(service.register_resources_bulk(db, resources))
647 ... except Exception:
648 ... pass
649 """
650 if not resources:
651 return {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []}
653 stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []}
655 # Process in chunks to avoid memory issues and SQLite parameter limits
656 chunk_size = 500
658 for chunk_start in range(0, len(resources), chunk_size):
659 chunk = resources[chunk_start : chunk_start + chunk_size]
661 try:
662 # Batch check for existing resources to detect conflicts
663 resource_uris = [resource.uri for resource in chunk]
665 # Build base query conditions
666 if visibility.lower() == "public":
667 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "public"]
668 elif visibility.lower() == "team" and team_id:
669 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "team", DbResource.team_id == team_id]
670 else:
671 # Private resources - check by owner
672 base_conditions = [DbResource.uri.in_(resource_uris), DbResource.visibility == "private", DbResource.owner_email == (owner_email or created_by)]
674 existing_resources_query = select(DbResource).where(*base_conditions)
675 existing_resources = db.execute(existing_resources_query).scalars().all()
676 # Use (uri, gateway_id) tuple as key for proper conflict detection with gateway_id scoping
677 existing_resources_map = {(r.uri, r.gateway_id): r for r in existing_resources}
679 resources_to_add = []
680 resources_to_update = []
682 for resource in chunk:
683 try:
684 # Use provided parameters or schema values
685 resource_team_id = team_id if team_id is not None else getattr(resource, "team_id", None)
686 resource_owner_email = owner_email or getattr(resource, "owner_email", None) or created_by
687 resource_visibility = visibility if visibility is not None else getattr(resource, "visibility", "public")
688 resource_gateway_id = getattr(resource, "gateway_id", None)
690 # Look up existing resource by (uri, gateway_id) tuple
691 existing_resource = existing_resources_map.get((resource.uri, resource_gateway_id))
693 if existing_resource:
694 # Handle conflict based on strategy
695 if conflict_strategy == "skip":
696 stats["skipped"] += 1
697 continue
698 if conflict_strategy == "update":
699 # Update existing resource
700 existing_resource.name = resource.name
701 existing_resource.description = resource.description
702 existing_resource.mime_type = resource.mime_type
703 existing_resource.size = getattr(resource, "size", None)
704 existing_resource.uri_template = resource.uri_template
705 existing_resource.tags = resource.tags or []
706 existing_resource.modified_by = created_by
707 existing_resource.modified_from_ip = created_from_ip
708 existing_resource.modified_via = created_via
709 existing_resource.modified_user_agent = created_user_agent
710 existing_resource.updated_at = datetime.now(timezone.utc)
711 existing_resource.version = (existing_resource.version or 1) + 1
713 resources_to_update.append(existing_resource)
714 stats["updated"] += 1
715 elif conflict_strategy == "rename":
716 # Create with renamed resource
717 new_uri = f"{resource.uri}_imported_{int(datetime.now().timestamp())}"
718 db_resource = DbResource(
719 uri=new_uri,
720 name=resource.name,
721 description=resource.description,
722 mime_type=resource.mime_type,
723 size=getattr(resource, "size", None),
724 uri_template=resource.uri_template,
725 gateway_id=getattr(resource, "gateway_id", None),
726 tags=resource.tags or [],
727 created_by=created_by,
728 created_from_ip=created_from_ip,
729 created_via=created_via,
730 created_user_agent=created_user_agent,
731 import_batch_id=import_batch_id,
732 federation_source=federation_source,
733 version=1,
734 team_id=resource_team_id,
735 owner_email=resource_owner_email,
736 visibility=resource_visibility,
737 )
738 resources_to_add.append(db_resource)
739 stats["created"] += 1
740 elif conflict_strategy == "fail":
741 stats["failed"] += 1
742 stats["errors"].append(f"Resource URI conflict: {resource.uri}")
743 continue
744 else:
745 # Create new resource
746 db_resource = DbResource(
747 uri=resource.uri,
748 name=resource.name,
749 description=resource.description,
750 mime_type=resource.mime_type,
751 size=getattr(resource, "size", None),
752 uri_template=resource.uri_template,
753 gateway_id=getattr(resource, "gateway_id", None),
754 tags=resource.tags or [],
755 created_by=created_by,
756 created_from_ip=created_from_ip,
757 created_via=created_via,
758 created_user_agent=created_user_agent,
759 import_batch_id=import_batch_id,
760 federation_source=federation_source,
761 version=1,
762 team_id=resource_team_id,
763 owner_email=resource_owner_email,
764 visibility=resource_visibility,
765 )
766 resources_to_add.append(db_resource)
767 stats["created"] += 1
769 except Exception as e:
770 stats["failed"] += 1
771 stats["errors"].append(f"Failed to process resource {resource.uri}: {str(e)}")
772 logger.warning(f"Failed to process resource {resource.uri} in bulk operation: {str(e)}")
773 continue
775 # Bulk add new resources
776 if resources_to_add:
777 db.add_all(resources_to_add)
779 # Commit the chunk
780 db.commit()
782 # Refresh resources for notifications and audit trail
783 for db_resource in resources_to_add:
784 db.refresh(db_resource)
785 # Notify subscribers
786 await self._notify_resource_added(db_resource)
788 # Log bulk audit trail entry
789 if resources_to_add or resources_to_update:
790 audit_trail.log_action(
791 user_id=created_by or "system",
792 action="bulk_create_resources" if resources_to_add else "bulk_update_resources",
793 resource_type="resource",
794 resource_id=import_batch_id or "bulk_operation",
795 resource_name=f"Bulk operation: {len(resources_to_add)} created, {len(resources_to_update)} updated",
796 user_email=owner_email,
797 team_id=team_id,
798 client_ip=created_from_ip,
799 user_agent=created_user_agent,
800 new_values={
801 "resources_created": len(resources_to_add),
802 "resources_updated": len(resources_to_update),
803 "visibility": visibility,
804 },
805 context={
806 "created_via": created_via,
807 "import_batch_id": import_batch_id,
808 "federation_source": federation_source,
809 "conflict_strategy": conflict_strategy,
810 },
811 db=db,
812 )
814 logger.info(f"Bulk registered {len(resources_to_add)} resources, updated {len(resources_to_update)} resources in chunk")
816 except Exception as e:
817 db.rollback()
818 logger.error(f"Failed to process chunk in bulk resource registration: {str(e)}")
819 stats["failed"] += len(chunk)
820 stats["errors"].append(f"Chunk processing failed: {str(e)}")
821 continue
823 # Final structured logging
824 structured_logger.log(
825 level="INFO",
826 message="Bulk resource registration completed",
827 event_type="resources_bulk_created",
828 component="resource_service",
829 user_id=created_by,
830 user_email=owner_email,
831 team_id=team_id,
832 resource_type="resource",
833 custom_fields={
834 "resources_created": stats["created"],
835 "resources_updated": stats["updated"],
836 "resources_skipped": stats["skipped"],
837 "resources_failed": stats["failed"],
838 "total_resources": len(resources),
839 "visibility": visibility,
840 "conflict_strategy": conflict_strategy,
841 },
842 )
844 return stats
846 async def _check_resource_access(
847 self,
848 db: Session,
849 resource: DbResource,
850 user_email: Optional[str],
851 token_teams: Optional[List[str]],
852 ) -> bool:
853 """Check if user has access to a resource based on visibility rules.
855 Implements the same access control logic as list_resources() for consistency.
857 Args:
858 db: Database session for team membership lookup if needed.
859 resource: Resource ORM object with visibility, team_id, owner_email.
860 user_email: Email of the requesting user (None = unauthenticated).
861 token_teams: List of team IDs from token.
862 - None = unrestricted admin access
863 - [] = public-only token
864 - [...] = team-scoped token
866 Returns:
867 True if access is allowed, False otherwise.
868 """
869 visibility = getattr(resource, "visibility", "public")
870 resource_team_id = getattr(resource, "team_id", None)
871 resource_owner_email = getattr(resource, "owner_email", None)
873 # Public resources are accessible by everyone
874 if visibility == "public":
875 return True
877 # Admin bypass: token_teams=None AND user_email=None means unrestricted admin
878 # This happens when is_admin=True and no team scoping in token
879 if token_teams is None and user_email is None:
880 return True
882 # No user context (but not admin) = deny access to non-public resources
883 if not user_email:
884 return False
886 # Public-only tokens (empty teams array) can ONLY access public resources
887 is_public_only_token = token_teams is not None and len(token_teams) == 0
888 if is_public_only_token:
889 return False # Already checked public above
891 # Owner can access their own private resources
892 if visibility == "private" and resource_owner_email and resource_owner_email == user_email:
893 return True
895 # Team resources: check team membership (matches list_resources behavior)
896 if resource_team_id:
897 # Use token_teams if provided, otherwise look up from DB
898 if token_teams is not None:
899 team_ids = token_teams
900 else:
901 if db is None:
902 logger.warning("Missing database session for team-scoped resource access check")
903 return False
904 # First-Party
905 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel
907 team_service = TeamManagementService(db)
908 user_teams = await team_service.get_user_teams(user_email)
909 team_ids = [team.id for team in user_teams]
911 # Team/public visibility allows access if user is in the team
912 if visibility in ["team", "public"] and resource_team_id in team_ids:
913 return True
915 return False
917 async def list_resources(
918 self,
919 db: Session,
920 include_inactive: bool = False,
921 cursor: Optional[str] = None,
922 tags: Optional[List[str]] = None,
923 limit: Optional[int] = None,
924 page: Optional[int] = None,
925 per_page: Optional[int] = None,
926 user_email: Optional[str] = None,
927 team_id: Optional[str] = None,
928 visibility: Optional[str] = None,
929 token_teams: Optional[List[str]] = None,
930 ) -> Union[tuple[List[ResourceRead], Optional[str]], Dict[str, Any]]:
931 """
932 Retrieve a list of registered resources from the database with pagination support.
934 This method retrieves resources from the database and converts them into a list
935 of ResourceRead objects. It supports filtering out inactive resources based on the
936 include_inactive parameter and cursor-based pagination.
938 Args:
939 db (Session): The SQLAlchemy database session.
940 include_inactive (bool): If True, include inactive resources in the result.
941 Defaults to False.
942 cursor (Optional[str], optional): An opaque cursor token for pagination.
943 Opaque base64-encoded string containing last item's ID and created_at.
944 tags (Optional[List[str]]): Filter resources by tags. If provided, only resources with at least one matching tag will be returned.
945 limit (Optional[int]): Maximum number of resources to return. Use 0 for all resources (no limit).
946 If not specified, uses pagination_default_page_size.
947 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
948 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size.
949 user_email (Optional[str]): User email for team-based access control. If None, no access control is applied.
950 team_id (Optional[str]): Filter by specific team ID. Requires user_email for access validation.
951 visibility (Optional[str]): Filter by visibility (private, team, public).
952 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API token access
953 where the token scope should be respected instead of the user's full team memberships.
955 Returns:
956 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}}
957 If cursor is provided or neither: tuple of (list of ResourceRead objects, next_cursor).
959 Examples:
960 >>> from mcpgateway.services.resource_service import ResourceService
961 >>> from unittest.mock import MagicMock
962 >>> service = ResourceService()
963 >>> db = MagicMock()
964 >>> resource_read = MagicMock()
965 >>> service.convert_resource_to_read = MagicMock(return_value=resource_read)
966 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()]
967 >>> import asyncio
968 >>> resources, next_cursor = asyncio.run(service.list_resources(db))
969 >>> isinstance(resources, list)
970 True
972 With tags filter:
973 >>> db2 = MagicMock()
974 >>> bind = MagicMock()
975 >>> bind.dialect = MagicMock()
976 >>> bind.dialect.name = "sqlite" # or "postgresql" / "mysql"
977 >>> db2.get_bind.return_value = bind
978 >>> db2.execute.return_value.scalars.return_value.all.return_value = [MagicMock()]
979 >>> result2, _ = asyncio.run(service.list_resources(db2, tags=['api']))
980 >>> isinstance(result2, list)
981 True
982 """
983 # Check cache for first page only (cursor=None)
984 # Skip caching when:
985 # - user_email is provided (team-filtered results are user-specific)
986 # - token_teams is set (scoped access, e.g., public-only or team-scoped tokens)
987 # - page-based pagination is used
988 # This prevents cache poisoning where admin results could leak to public-only requests
989 cache = _get_registry_cache()
990 if cursor is None and user_email is None and token_teams is None and page is None:
991 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None, limit=limit)
992 cached = await cache.get("resources", filters_hash)
993 if cached is not None:
994 # Reconstruct ResourceRead objects from cached dicts
995 cached_resources = [ResourceRead.model_validate(r) for r in cached["resources"]]
996 return (cached_resources, cached.get("next_cursor"))
998 # Build base query with ordering
999 query = select(DbResource).where(DbResource.uri_template.is_(None)).order_by(desc(DbResource.created_at), desc(DbResource.id))
1001 # Apply active/inactive filter
1002 if not include_inactive:
1003 query = query.where(DbResource.enabled)
1005 query = await self._apply_access_control(query, db, user_email, token_teams, team_id)
1007 if visibility:
1008 query = query.where(DbResource.visibility == visibility)
1010 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats)
1011 if tags:
1012 query = query.where(json_contains_tag_expr(db, DbResource.tags, tags, match_any=True))
1014 # Use unified pagination helper - handles both page and cursor pagination
1015 pag_result = await unified_paginate(
1016 db=db,
1017 query=query,
1018 page=page,
1019 per_page=per_page,
1020 cursor=cursor,
1021 limit=limit,
1022 base_url="/admin/resources", # Used for page-based links
1023 query_params={"include_inactive": include_inactive} if include_inactive else {},
1024 )
1026 next_cursor = None
1027 # Extract servers based on pagination type
1028 if page is not None:
1029 # Page-based: pag_result is a dict
1030 resources_db = pag_result["data"]
1031 else:
1032 # Cursor-based: pag_result is a tuple
1033 resources_db, next_cursor = pag_result
1035 # Fetch team names for the resources (common for both pagination types)
1036 team_ids_set = {s.team_id for s in resources_db if s.team_id}
1037 team_map = {}
1038 if team_ids_set:
1039 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(team_ids_set), EmailTeam.is_active.is_(True))).all()
1040 team_map = {team.id: team.name for team in teams}
1042 db.commit() # Release transaction to avoid idle-in-transaction
1044 # Convert to ResourceRead (common for both pagination types)
1045 result = []
1046 for s in resources_db:
1047 try:
1048 s.team = team_map.get(s.team_id) if s.team_id else None
1049 result.append(self.convert_resource_to_read(s, include_metrics=False))
1050 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
1051 logger.exception(f"Failed to convert resource {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}")
1052 # Continue with remaining resources instead of failing completely
1053 # Return appropriate format based on pagination type
1054 if page is not None:
1055 # Page-based format
1056 return {
1057 "data": result,
1058 "pagination": pag_result["pagination"],
1059 "links": pag_result["links"],
1060 }
1062 # Cursor-based format
1064 # Cache first page results - only for non-user-specific/non-scoped queries
1065 # Must match the same conditions as cache lookup to prevent cache poisoning
1066 if cursor is None and user_email is None and token_teams is None:
1067 try:
1068 cache_data = {"resources": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor}
1069 await cache.set("resources", cache_data, filters_hash)
1070 except AttributeError:
1071 pass # Skip caching if result objects don't support model_dump (e.g., in doctests)
1073 return (result, next_cursor)
1075 async def list_resources_for_user(
1076 self, db: Session, user_email: str, team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100
1077 ) -> List[ResourceRead]:
1078 """
1079 DEPRECATED: Use list_resources() with user_email parameter instead.
1081 List resources user has access to with team filtering.
1083 This method is maintained for backward compatibility but is no longer used.
1084 New code should call list_resources() with user_email, team_id, and visibility parameters.
1086 Args:
1087 db: Database session
1088 user_email: Email of the user requesting resources
1089 team_id: Optional team ID to filter by specific team
1090 visibility: Optional visibility filter (private, team, public)
1091 include_inactive: Whether to include inactive resources
1092 skip: Number of resources to skip for pagination
1093 limit: Maximum number of resources to return
1095 Returns:
1096 List[ResourceRead]: Resources the user has access to
1098 Examples:
1099 >>> from unittest.mock import MagicMock
1100 >>> import asyncio
1101 >>> service = ResourceService()
1102 >>> db = MagicMock()
1103 >>> # Patch out TeamManagementService so it doesn't run real logic
1104 >>> import mcpgateway.services.resource_service as _rs
1105 >>> class FakeTeamService:
1106 ... def __init__(self, db): pass
1107 ... async def get_user_teams(self, email): return []
1108 >>> _rs.TeamManagementService = FakeTeamService
1109 >>> # Force DB to return one fake row with a 'team' attribute
1110 >>> class FakeResource:
1111 ... team_id = None
1112 >>> fake_resource = FakeResource()
1113 >>> db.execute.return_value.scalars.return_value.all.return_value = [fake_resource]
1114 >>> service.convert_resource_to_read = MagicMock(return_value="converted")
1115 >>> asyncio.run(service.list_resources_for_user(db, "user@example.com"))
1116 ['converted']
1118 Without team_id (default/public access):
1119 >>> db2 = MagicMock()
1120 >>> class FakeResource2:
1121 ... team_id = None
1122 >>> fake_resource2 = FakeResource2()
1123 >>> db2.execute.return_value.scalars.return_value.all.return_value = [fake_resource2]
1124 >>> service.convert_resource_to_read = MagicMock(return_value="converted2")
1125 >>> out2 = asyncio.run(service.list_resources_for_user(db2, "user@example.com"))
1126 >>> out2
1127 ['converted2']
1128 """
1129 # First-Party
1130 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel
1132 # Build query following existing patterns from list_resources()
1133 team_service = TeamManagementService(db)
1134 user_teams = await team_service.get_user_teams(user_email)
1135 team_ids = [team.id for team in user_teams]
1137 # Build query following existing patterns from list_resources()
1138 query = select(DbResource)
1140 # Apply active/inactive filter
1141 if not include_inactive:
1142 query = query.where(DbResource.enabled)
1144 if team_id:
1145 if team_id not in team_ids:
1146 return [] # No access to team
1148 access_conditions = []
1149 # Filter by specific team
1150 access_conditions.append(and_(DbResource.team_id == team_id, DbResource.visibility.in_(["team", "public"])))
1152 access_conditions.append(and_(DbResource.team_id == team_id, DbResource.owner_email == user_email))
1154 query = query.where(or_(*access_conditions))
1155 else:
1156 # Get user's accessible teams
1157 # Build access conditions following existing patterns
1158 access_conditions = []
1159 # 1. User's personal resources (owner_email matches)
1160 access_conditions.append(DbResource.owner_email == user_email)
1161 # 2. Team resources where user is member
1162 if team_ids:
1163 access_conditions.append(and_(DbResource.team_id.in_(team_ids), DbResource.visibility.in_(["team", "public"])))
1164 # 3. Public resources (if visibility allows)
1165 access_conditions.append(DbResource.visibility == "public")
1167 query = query.where(or_(*access_conditions))
1169 # Apply visibility filter if specified
1170 if visibility:
1171 query = query.where(DbResource.visibility == visibility)
1173 # Apply pagination following existing patterns
1174 query = query.offset(skip).limit(limit)
1176 resources = db.execute(query).scalars().all()
1178 # Batch fetch team names to avoid N+1 queries
1179 resource_team_ids = {r.team_id for r in resources if r.team_id}
1180 team_map = {}
1181 if resource_team_ids:
1182 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(resource_team_ids), EmailTeam.is_active.is_(True))).all()
1183 team_map = {str(team.id): team.name for team in teams}
1185 db.commit() # Release transaction to avoid idle-in-transaction
1187 result = []
1188 for t in resources:
1189 try:
1190 t.team = team_map.get(str(t.team_id)) if t.team_id else None
1191 result.append(self.convert_resource_to_read(t, include_metrics=False))
1192 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
1193 logger.exception(f"Failed to convert resource {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}")
1194 # Continue with remaining resources instead of failing completely
1195 return result
1197 async def list_server_resources(
1198 self,
1199 db: Session,
1200 server_id: str,
1201 include_inactive: bool = False,
1202 user_email: Optional[str] = None,
1203 token_teams: Optional[List[str]] = None,
1204 ) -> List[ResourceRead]:
1205 """
1206 Retrieve a list of registered resources from the database.
1208 This method retrieves resources from the database and converts them into a list
1209 of ResourceRead objects. It supports filtering out inactive resources based on the
1210 include_inactive parameter. The cursor parameter is reserved for future pagination support
1211 but is currently not implemented.
1213 Args:
1214 db (Session): The SQLAlchemy database session.
1215 server_id (str): Server ID
1216 include_inactive (bool): If True, include inactive resources in the result.
1217 Defaults to False.
1218 user_email (Optional[str]): User email for visibility filtering. If None, no filtering applied.
1219 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API
1220 token access where the token scope should be respected.
1222 Returns:
1223 List[ResourceRead]: A list of resources represented as ResourceRead objects.
1225 Examples:
1226 >>> from mcpgateway.services.resource_service import ResourceService
1227 >>> from unittest.mock import MagicMock
1228 >>> service = ResourceService()
1229 >>> db = MagicMock()
1230 >>> resource_read = MagicMock()
1231 >>> service.convert_resource_to_read = MagicMock(return_value=resource_read)
1232 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()]
1233 >>> import asyncio
1234 >>> result = asyncio.run(service.list_server_resources(db, 'server1'))
1235 >>> isinstance(result, list)
1236 True
1237 >>> # Include inactive branch
1238 >>> result = asyncio.run(service.list_server_resources(db, 'server1', include_inactive=True))
1239 >>> isinstance(result, list)
1240 True
1241 """
1242 logger.debug(f"Listing resources for server_id: {server_id}, include_inactive: {include_inactive}")
1243 query = (
1244 select(DbResource)
1245 .join(server_resource_association, DbResource.id == server_resource_association.c.resource_id)
1246 .where(DbResource.uri_template.is_(None))
1247 .where(server_resource_association.c.server_id == server_id)
1248 )
1249 if not include_inactive:
1250 query = query.where(DbResource.enabled)
1252 # Add visibility filtering if user context OR token_teams provided
1253 # This ensures unauthenticated requests with token_teams=[] only see public resources
1254 if user_email is not None or token_teams is not None: # empty-string user_email -> public-only filtering (secure default)
1255 # Use token_teams if provided (for MCP/API token access), otherwise look up from DB
1256 if token_teams is not None:
1257 team_ids = token_teams
1258 elif user_email:
1259 # First-Party
1260 from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel
1262 team_service = TeamManagementService(db)
1263 user_teams = await team_service.get_user_teams(user_email)
1264 team_ids = [team.id for team in user_teams]
1265 else:
1266 team_ids = []
1268 # Check if this is a public-only token (empty teams array)
1269 # Public-only tokens can ONLY see public resources - no owner access
1270 is_public_only_token = token_teams is not None and len(token_teams) == 0
1272 access_conditions = [
1273 DbResource.visibility == "public",
1274 ]
1275 # Only include owner access for non-public-only tokens with user_email
1276 if not is_public_only_token and user_email:
1277 access_conditions.append(DbResource.owner_email == user_email)
1278 if team_ids:
1279 access_conditions.append(and_(DbResource.team_id.in_(team_ids), DbResource.visibility.in_(["team", "public"])))
1280 query = query.where(or_(*access_conditions))
1282 # Cursor-based pagination logic can be implemented here in the future.
1283 resources = db.execute(query).scalars().all()
1285 # Batch fetch team names to avoid N+1 queries
1286 resource_team_ids = {r.team_id for r in resources if r.team_id}
1287 team_map = {}
1288 if resource_team_ids:
1289 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(resource_team_ids), EmailTeam.is_active.is_(True))).all()
1290 team_map = {str(team.id): team.name for team in teams}
1292 db.commit() # Release transaction to avoid idle-in-transaction
1294 result = []
1295 for t in resources:
1296 try:
1297 t.team = team_map.get(str(t.team_id)) if t.team_id else None
1298 result.append(self.convert_resource_to_read(t, include_metrics=False))
1299 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
1300 logger.exception(f"Failed to convert resource {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}")
1301 # Continue with remaining resources instead of failing completely
1302 return result
1304 async def _record_resource_metric(self, db: Session, resource: DbResource, start_time: float, success: bool, error_message: Optional[str]) -> None:
1305 """
1306 Records a metric for a resource access.
1308 Args:
1309 db: Database session
1310 resource: The resource that was accessed
1311 start_time: Monotonic start time of the access
1312 success: True if successful, False otherwise
1313 error_message: Error message if failed, None otherwise
1314 """
1315 end_time = time.monotonic()
1316 response_time = end_time - start_time
1318 metric = ResourceMetric(
1319 resource_id=resource.id,
1320 response_time=response_time,
1321 is_success=success,
1322 error_message=error_message,
1323 )
1324 db.add(metric)
1325 db.commit()
1327 async def _record_invoke_resource_metric(self, db: Session, resource_id: str, start_time: float, success: bool, error_message: Optional[str]) -> None:
1328 """
1329 Records a metric for invoking resource.
1331 Args:
1332 db: Database Session
1333 resource_id: unique identifier to access & invoke resource
1334 start_time: Monotonic start time of the access
1335 success: True if successful, False otherwise
1336 error_message: Error message if failed, None otherwise
1337 """
1338 end_time = time.monotonic()
1339 response_time = end_time - start_time
1341 metric = ResourceMetric(
1342 resource_id=resource_id,
1343 response_time=response_time,
1344 is_success=success,
1345 error_message=error_message,
1346 )
1347 db.add(metric)
1348 db.commit()
1350 def create_ssl_context(self, ca_certificate: str) -> ssl.SSLContext:
1351 """Create an SSL context with the provided CA certificate.
1353 Uses caching to avoid repeated SSL context creation for the same certificate.
1355 Args:
1356 ca_certificate: CA certificate in PEM format
1358 Returns:
1359 ssl.SSLContext: Configured SSL context
1360 """
1361 return get_cached_ssl_context(ca_certificate)
1363 async def invoke_resource( # pylint: disable=unused-argument
1364 self,
1365 db: Session,
1366 resource_id: str,
1367 resource_uri: str,
1368 resource_template_uri: Optional[str] = None,
1369 user_identity: Optional[Union[str, Dict[str, Any]]] = None,
1370 meta_data: Optional[Dict[str, Any]] = None, # Reserved for future MCP SDK support
1371 resource_obj: Optional[Any] = None,
1372 gateway_obj: Optional[Any] = None,
1373 server_id: Optional[str] = None,
1374 ) -> Any:
1375 """
1376 Invoke a resource via its configured gateway using SSE or StreamableHTTP transport.
1378 This method determines the correct URI to invoke, loads the associated resource
1379 and gateway from the database, validates certificates if applicable, prepares
1380 authentication headers (OAuth, header-based, or none), and then connects to
1381 the gateway to read the resource using the appropriate transport.
1383 The function supports:
1384 - CA certificate validation / SSL context creation
1385 - OAuth client-credentials and authorization-code flow
1386 - Header-based auth
1387 - SSE transport gateways
1388 - StreamableHTTP transport gateways
1390 Args:
1391 db (Session):
1392 SQLAlchemy session for retrieving resource and gateway information.
1393 resource_id (str):
1394 ID of the resource to invoke.
1395 resource_uri (str):
1396 Direct resource URI configured for the resource.
1397 resource_template_uri (Optional[str]):
1398 URI from the template. Overrides `resource_uri` when provided.
1399 user_identity (Optional[Union[str, Dict[str, Any]]]):
1400 Identity of the user making the request, used for session pool isolation.
1401 Can be a string (email) or a dict with an 'email' key.
1402 Defaults to "anonymous" for pool isolation if not provided.
1403 OAuth Authorization Code token lookup uses this user identity.
1404 meta_data (Optional[Dict[str, Any]]):
1405 Additional metadata to pass to the gateway during invocation.
1406 resource_obj (Optional[Any]):
1407 Pre-fetched DbResource object to skip the internal resource lookup query.
1408 gateway_obj (Optional[Any]):
1409 Pre-fetched DbGateway object to skip the internal gateway lookup query.
1410 server_id (Optional[str]):
1411 Virtual server ID for server metrics recording. When provided, indicates
1412 the resource was invoked through a specific virtual server endpoint.
1413 Direct resource calls (e.g., from admin UI) should pass None.
1415 Returns:
1416 Any: The text content returned by the remote resource, or ``None`` if the
1417 gateway could not be contacted or an error occurred.
1419 Raises:
1420 Exception: Any unhandled internal errors (e.g., DB issues).
1422 ---
1423 Doctest Examples
1424 ----------------
1426 >>> class FakeDB:
1427 ... "Simple DB stub returning fake resource and gateway rows."
1428 ... def execute(self, query):
1429 ... class Result:
1430 ... def scalar_one_or_none(self):
1431 ... # Return fake objects with the needed attributes
1432 ... class FakeResource:
1433 ... id = "res123"
1434 ... name = "Demo Resource"
1435 ... gateway_id = "gw1"
1436 ... return FakeResource()
1437 ... return Result()
1439 >>> class FakeGateway:
1440 ... id = "gw1"
1441 ... name = "Fake Gateway"
1442 ... url = "https://fake.gateway"
1443 ... ca_certificate = None
1444 ... ca_certificate_sig = None
1445 ... transport = "sse"
1446 ... auth_type = None
1447 ... auth_value = {}
1449 >>> # Monkeypatch the DB lookup for gateway
1450 >>> def fake_execute_gateway(self, query):
1451 ... class Result:
1452 ... def scalar_one_or_none(self_inner):
1453 ... return FakeGateway()
1454 ... return Result()
1456 >>> FakeDB.execute_gateway = fake_execute_gateway
1458 >>> class FakeService:
1459 ... "Service stub replacing network calls with predictable outputs."
1460 ... async def invoke_resource(self, db, resource_id, resource_uri, resource_template_uri=None):
1461 ... # Represent the behavior of a successful SSE response.
1462 ... return "hello from gateway"
1464 >>> svc = FakeService()
1465 >>> import asyncio
1466 >>> asyncio.run(svc.invoke_resource(FakeDB(), "res123", "/test"))
1467 'hello from gateway'
1469 ---
1470 Example: Template URI overrides resource URI
1471 --------------------------------------------
1473 >>> class FakeService2(FakeService):
1474 ... async def invoke_resource(self, db, resource_id, resource_uri, resource_template_uri=None):
1475 ... if resource_template_uri:
1476 ... return f"using template: {resource_template_uri}"
1477 ... return f"using direct: {resource_uri}"
1479 >>> svc2 = FakeService2()
1480 >>> asyncio.run(svc2.invoke_resource(FakeDB(), "res123", "/direct", "/template"))
1481 'using template: /template'
1483 """
1484 uri = None
1485 if resource_uri and resource_template_uri:
1486 uri = resource_template_uri
1487 elif resource_uri:
1488 uri = resource_uri
1490 logger.info(f"Invoking the resource: {uri}")
1491 gateway_id = None
1492 # Use pre-fetched resource if provided (avoids Q5 re-fetch)
1493 resource_info = resource_obj
1494 if resource_info is None:
1495 resource_info = db.execute(select(DbResource).where(DbResource.id == resource_id)).scalar_one_or_none()
1497 # Release transaction immediately after resource lookup to prevent idle-in-transaction
1498 # This is especially important when resource isn't found - we don't want to hold the transaction
1499 db.commit()
1501 # Normalize user_identity to string for session pool isolation.
1502 if isinstance(user_identity, dict):
1503 pool_user_identity = user_identity.get("email") or "anonymous"
1504 elif isinstance(user_identity, str):
1505 pool_user_identity = user_identity
1506 else:
1507 pool_user_identity = "anonymous"
1509 oauth_user_email: Optional[str] = None
1510 if isinstance(user_identity, dict):
1511 user_email_value = user_identity.get("email")
1512 if isinstance(user_email_value, str):
1513 oauth_user_email = user_email_value.strip().lower() or None
1514 elif isinstance(user_identity, str):
1515 oauth_user_email = user_identity.strip().lower() or None
1517 if resource_info:
1518 gateway_id = getattr(resource_info, "gateway_id", None)
1519 resource_name = getattr(resource_info, "name", None)
1520 # Use pre-fetched gateway if provided (avoids Q6 re-fetch)
1521 gateway = gateway_obj
1522 if gateway is None and gateway_id:
1523 gateway = db.execute(select(DbGateway).where(DbGateway.id == gateway_id)).scalar_one_or_none()
1525 # ═══════════════════════════════════════════════════════════════════════════
1526 # CRITICAL: Release DB transaction immediately after fetching resource/gateway
1527 # This ensures the transaction doesn't stay open if there's no gateway
1528 # or if the function returns early for any reason.
1529 # ═══════════════════════════════════════════════════════════════════════════
1530 db.commit()
1532 if gateway:
1534 start_time = time.monotonic()
1535 success = False
1536 error_message = None
1538 # Create database span for observability dashboard
1539 trace_id = current_trace_id.get()
1540 db_span_id = None
1541 db_span_ended = False
1542 observability_service = ObservabilityService() if trace_id else None
1544 if trace_id and observability_service:
1545 try:
1546 db_span_id = observability_service.start_span(
1547 db=db,
1548 trace_id=trace_id,
1549 name="invoke.resource",
1550 attributes={
1551 "resource.name": resource_name if resource_name else "unknown",
1552 "resource.id": str(resource_id) if resource_id else "unknown",
1553 "resource.uri": str(uri) or "unknown",
1554 "gateway.transport": getattr(gateway, "transport") or "uknown",
1555 "gateway.url": getattr(gateway, "url") or "unknown",
1556 },
1557 )
1558 logger.debug(f"✓ Created resource.read span: {db_span_id} for resource: {resource_id} & {uri}")
1559 except Exception as e:
1560 logger.warning(f"Failed to start the observability span for invoking resource: {e}")
1561 db_span_id = None
1563 with create_span(
1564 "invoke.resource",
1565 {
1566 "resource.name": resource_name if resource_name else "unknown",
1567 "resource.id": str(resource_id) if resource_id else "unknown",
1568 "resource.uri": str(uri) or "unknown",
1569 "gateway.transport": getattr(gateway, "transport") or "uknown",
1570 "gateway.url": getattr(gateway, "url") or "unknown",
1571 },
1572 ) as span:
1573 valid = False
1574 if gateway.ca_certificate:
1575 if settings.enable_ed25519_signing:
1576 public_key_pem = settings.ed25519_public_key
1577 valid = validate_signature(gateway.ca_certificate.encode(), gateway.ca_certificate_sig, public_key_pem)
1578 else:
1579 valid = True
1581 if valid:
1582 ssl_context = self.create_ssl_context(gateway.ca_certificate)
1583 else:
1584 ssl_context = None
1586 def _get_httpx_client_factory(
1587 headers: dict[str, str] | None = None,
1588 timeout: httpx.Timeout | None = None,
1589 auth: httpx.Auth | None = None,
1590 ) -> httpx.AsyncClient:
1591 """Factory function to create httpx.AsyncClient with optional CA certificate.
1593 Args:
1594 headers: Optional headers for the client
1595 timeout: Optional timeout for the client
1596 auth: Optional auth for the client
1598 Returns:
1599 httpx.AsyncClient: Configured HTTPX async client
1600 """
1601 # First-Party
1602 from mcpgateway.services.http_client_service import get_default_verify, get_http_timeout # pylint: disable=import-outside-toplevel
1604 return httpx.AsyncClient(
1605 verify=ssl_context if ssl_context else get_default_verify(), # pylint: disable=cell-var-from-loop
1606 follow_redirects=True,
1607 headers=headers,
1608 timeout=timeout if timeout else get_http_timeout(),
1609 auth=auth,
1610 limits=httpx.Limits(
1611 max_connections=settings.httpx_max_connections,
1612 max_keepalive_connections=settings.httpx_max_keepalive_connections,
1613 keepalive_expiry=settings.httpx_keepalive_expiry,
1614 ),
1615 )
1617 try:
1618 # ═══════════════════════════════════════════════════════════════════════════
1619 # Extract gateway data to local variables BEFORE OAuth handling
1620 # OAuth client_credentials flow makes network calls, so we must release
1621 # the DB transaction first to prevent idle-in-transaction during network I/O
1622 # ═══════════════════════════════════════════════════════════════════════════
1623 gateway_url = gateway.url
1624 gateway_transport = gateway.transport
1625 gateway_auth_type = gateway.auth_type
1626 gateway_auth_value = gateway.auth_value
1627 gateway_oauth_config = gateway.oauth_config
1628 gateway_name = gateway.name
1629 gateway_auth_query_params = getattr(gateway, "auth_query_params", None)
1631 # Apply query param auth to URL if applicable
1632 auth_query_params_decrypted: Optional[Dict[str, str]] = None
1633 if gateway_auth_type == "query_param" and gateway_auth_query_params:
1634 auth_query_params_decrypted = {}
1635 for param_key, encrypted_value in gateway_auth_query_params.items():
1636 if encrypted_value:
1637 try:
1638 decrypted = decode_auth(encrypted_value)
1639 auth_query_params_decrypted[param_key] = decrypted.get(param_key, "")
1640 except Exception: # noqa: S110 - intentionally skip failed decryptions
1641 # Silently skip params that fail decryption (corrupted or old key)
1642 logger.debug(f"Failed to decrypt query param '{param_key}' for resource")
1643 if auth_query_params_decrypted:
1644 gateway_url = apply_query_param_auth(gateway_url, auth_query_params_decrypted)
1646 # ═══════════════════════════════════════════════════════════════════════════
1647 # CRITICAL: Release DB connection back to pool BEFORE making HTTP/OAuth calls
1648 # This prevents connection pool exhaustion during slow upstream requests.
1649 # OAuth client_credentials flow makes network calls to token endpoints.
1650 # All needed data has been extracted to local variables above.
1651 # ═══════════════════════════════════════════════════════════════════════════
1652 db.commit() # End read-only transaction cleanly
1653 db.close()
1655 # Handle different authentication types (AFTER DB release)
1656 headers = {}
1657 if gateway_auth_type == "oauth" and gateway_oauth_config:
1658 grant_type = gateway_oauth_config.get("grant_type", "client_credentials")
1660 if grant_type == "authorization_code":
1661 # For Authorization Code flow, try to get stored tokens
1662 try:
1663 # First-Party
1664 from mcpgateway.services.token_storage_service import TokenStorageService # pylint: disable=import-outside-toplevel
1666 # Use fresh DB session for token lookup (original db was closed)
1667 access_token = None
1668 if oauth_user_email:
1669 with fresh_db_session() as token_db:
1670 token_storage = TokenStorageService(token_db)
1671 access_token = await token_storage.get_user_token(gateway_id, oauth_user_email)
1673 if access_token:
1674 headers["Authorization"] = f"Bearer {access_token}"
1675 else:
1676 if span:
1677 span.set_attribute("health.status", "unhealthy")
1678 span.set_attribute("error.message", "No valid OAuth token for user")
1680 except Exception as e:
1681 logger.error(f"Failed to obtain stored OAuth token for gateway {gateway_name}: {e}")
1682 if span:
1683 span.set_attribute("health.status", "unhealthy")
1684 span.set_attribute("error.message", "Failed to obtain stored OAuth token")
1685 else:
1686 # For Client Credentials flow, get token directly (makes network calls)
1687 try:
1688 access_token: str = await self.oauth_manager.get_access_token(gateway_oauth_config)
1689 headers["Authorization"] = f"Bearer {access_token}"
1690 except Exception as e:
1691 if span:
1692 span.set_attribute("health.status", "unhealthy")
1693 span.set_attribute("error.message", str(e))
1694 else:
1695 # Handle non-OAuth authentication (existing logic)
1696 auth_data = gateway_auth_value or {}
1697 if isinstance(auth_data, str):
1698 headers = decode_auth(auth_data)
1699 elif isinstance(auth_data, dict):
1700 headers = {str(k): str(v) for k, v in auth_data.items()}
1701 else:
1702 headers = {}
1704 async def connect_to_sse_session(server_url: str, uri: str, authentication: Optional[Dict[str, str]] = None) -> str | None:
1705 """
1706 Connect to an SSE-based gateway and retrieve the text content of a resource.
1708 This helper establishes an SSE (Server-Sent Events) session with the remote
1709 gateway, initializes a `ClientSession`, invokes `read_resource()` for the
1710 given URI, and returns the textual content from the first item in the
1711 response's `contents` list.
1713 If any error occurs (network failure, unexpected response format, session
1714 initialization failure, etc.), the method logs the exception and returns
1715 ``None`` instead of raising.
1717 Note:
1718 MCP SDK 1.25.0 read_resource() does not support meta parameter.
1719 When the SDK adds support, meta_data can be added back here.
1721 Args:
1722 server_url (str):
1723 The base URL of the SSE gateway to connect to.
1724 uri (str):
1725 The resource URI that should be requested from the gateway.
1726 authentication (Optional[Dict[str, str]]):
1727 Optional dictionary of headers (e.g., OAuth Bearer tokens) to
1728 include in the SSE connection request. Defaults to an empty
1729 dictionary when not provided.
1731 Returns:
1732 str | None:
1733 The text content returned by the remote resource, or ``None`` if the
1734 SSE connection fails or the response is invalid.
1736 Notes:
1737 - This function assumes the SSE client context manager yields:
1738 ``(read_stream, write_stream, get_session_id)``.
1739 - The expected response object from `session.read_resource()` must have a
1740 `contents` attribute containing a list, where the first element has a
1741 `text` attribute.
1742 """
1743 if authentication is None:
1744 authentication = {}
1745 try:
1746 # Use session pool if enabled for 10-20x latency improvement
1747 use_pool = False
1748 pool = None
1749 if settings.mcp_session_pool_enabled:
1750 try:
1751 pool = get_mcp_session_pool()
1752 use_pool = True
1753 except RuntimeError:
1754 # Pool not initialized (e.g., in tests), fall back to per-call sessions
1755 pass
1757 if use_pool and pool is not None:
1758 async with pool.session(
1759 url=server_url,
1760 headers=authentication,
1761 transport_type=TransportType.SSE,
1762 httpx_client_factory=_get_httpx_client_factory,
1763 user_identity=pool_user_identity,
1764 gateway_id=gateway_id,
1765 ) as pooled:
1766 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter
1767 resource_response = await pooled.session.read_resource(uri=uri)
1768 return getattr(getattr(resource_response, "contents")[0], "text")
1769 else:
1770 # Fallback to per-call sessions when pool disabled or not initialized
1771 async with sse_client(url=server_url, headers=authentication, timeout=settings.health_check_timeout, httpx_client_factory=_get_httpx_client_factory) as (
1772 read_stream,
1773 write_stream,
1774 ):
1775 async with ClientSession(read_stream, write_stream) as session:
1776 _ = await session.initialize()
1777 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter
1778 resource_response = await session.read_resource(uri=uri)
1779 return getattr(getattr(resource_response, "contents")[0], "text")
1780 except Exception as e:
1781 # Sanitize error message to prevent URL secrets from leaking in logs
1782 sanitized_error = sanitize_exception_message(str(e), auth_query_params_decrypted)
1783 logger.debug(f"Exception while connecting to sse gateway: {sanitized_error}")
1784 return None
1786 async def connect_to_streamablehttp_server(server_url: str, uri: str, authentication: Optional[Dict[str, str]] = None) -> str | None:
1787 """
1788 Connect to a StreamableHTTP gateway and retrieve the text content of a resource.
1790 This helper establishes a StreamableHTTP client session with the specified
1791 gateway, initializes a `ClientSession`, invokes `read_resource()` for the
1792 given URI, and returns the textual content from the first element in the
1793 response's `contents` list.
1795 If any exception occurs during connection, session initialization, or
1796 resource reading, the function logs the error and returns ``None`` instead
1797 of propagating the exception.
1799 Note:
1800 MCP SDK 1.25.0 read_resource() does not support meta parameter.
1801 When the SDK adds support, meta_data can be added back here.
1803 Args:
1804 server_url (str):
1805 The endpoint URL of the StreamableHTTP gateway.
1806 uri (str):
1807 The resource URI to request from the gateway.
1808 authentication (Optional[Dict[str, str]]):
1809 Optional dictionary of authentication headers (e.g., API keys or
1810 Bearer tokens). Defaults to an empty dictionary when not provided.
1812 Returns:
1813 str | None:
1814 The text content returned by the StreamableHTTP resource, or ``None``
1815 if the connection fails or the response format is invalid.
1817 Notes:
1818 - The `streamablehttp_client` context manager must yield a tuple:
1819 ``(read_stream, write_stream, get_session_id)``.
1820 - The expected `resource_response` returned by ``session.read_resource()``
1821 must contain a `contents` list, whose first element exposes a `text`
1822 attribute.
1823 """
1824 if authentication is None:
1825 authentication = {}
1826 try:
1827 # Use session pool if enabled for 10-20x latency improvement
1828 use_pool = False
1829 pool = None
1830 if settings.mcp_session_pool_enabled:
1831 try:
1832 pool = get_mcp_session_pool()
1833 use_pool = True
1834 except RuntimeError:
1835 # Pool not initialized (e.g., in tests), fall back to per-call sessions
1836 pass
1838 if use_pool and pool is not None:
1839 async with pool.session(
1840 url=server_url,
1841 headers=authentication,
1842 transport_type=TransportType.STREAMABLE_HTTP,
1843 httpx_client_factory=_get_httpx_client_factory,
1844 user_identity=pool_user_identity,
1845 gateway_id=gateway_id,
1846 ) as pooled:
1847 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter
1848 resource_response = await pooled.session.read_resource(uri=uri)
1849 return getattr(getattr(resource_response, "contents")[0], "text")
1850 else:
1851 # Fallback to per-call sessions when pool disabled or not initialized
1852 async with streamablehttp_client(url=server_url, headers=authentication, timeout=settings.health_check_timeout, httpx_client_factory=_get_httpx_client_factory) as (
1853 read_stream,
1854 write_stream,
1855 _get_session_id,
1856 ):
1857 async with ClientSession(read_stream, write_stream) as session:
1858 _ = await session.initialize()
1859 # Note: MCP SDK 1.25.0 read_resource() does not support meta parameter
1860 resource_response = await session.read_resource(uri=uri)
1861 return getattr(getattr(resource_response, "contents")[0], "text")
1862 except Exception as e:
1863 # Sanitize error message to prevent URL secrets from leaking in logs
1864 sanitized_error = sanitize_exception_message(str(e), auth_query_params_decrypted)
1865 logger.debug(f"Exception while connecting to streamablehttp gateway: {sanitized_error}")
1866 return None
1868 if span:
1869 span.set_attribute("success", True)
1870 span.set_attribute("duration.ms", (time.monotonic() - start_time) * 1000)
1872 resource_text = ""
1873 if (gateway_transport).lower() == "sse":
1874 # Note: meta_data not passed - MCP SDK 1.25.0 read_resource() doesn't support it
1875 resource_text = await connect_to_sse_session(server_url=gateway_url, authentication=headers, uri=uri)
1876 else:
1877 # Note: meta_data not passed - MCP SDK 1.25.0 read_resource() doesn't support it
1878 resource_text = await connect_to_streamablehttp_server(server_url=gateway_url, authentication=headers, uri=uri)
1879 success = True # Mark as successful before returning
1880 return resource_text
1881 except Exception as e:
1882 success = False
1883 error_message = str(e)
1884 raise
1885 finally:
1886 # Metrics are now recorded only in read_resource finally block
1887 # This eliminates duplicate metrics and provides a single source of truth
1888 # End Invoke resource span for Observability dashboard
1889 # NOTE: Use fresh_db_session() since the original db was released
1890 # before making HTTP calls to prevent connection pool exhaustion
1891 if db_span_id and observability_service and not db_span_ended:
1892 try:
1893 with fresh_db_session() as fresh_db:
1894 observability_service.end_span(
1895 db=fresh_db,
1896 span_id=db_span_id,
1897 status="ok" if success else "error",
1898 status_message=error_message if error_message else None,
1899 )
1900 db_span_ended = True
1901 logger.debug(f"✓ Ended invoke.resource span: {db_span_id}")
1902 except Exception as e:
1903 logger.warning(f"Failed to end observability span for invoking resource: {e}")
1905 async def read_resource(
1906 self,
1907 db: Session,
1908 resource_id: Optional[Union[int, str]] = None,
1909 resource_uri: Optional[str] = None,
1910 request_id: Optional[str] = None,
1911 user: Optional[str] = None,
1912 server_id: Optional[str] = None,
1913 include_inactive: bool = False,
1914 token_teams: Optional[List[str]] = None,
1915 plugin_context_table: Optional[PluginContextTable] = None,
1916 plugin_global_context: Optional[GlobalContext] = None,
1917 meta_data: Optional[Dict[str, Any]] = None,
1918 ) -> Union[ResourceContent, ResourceContents]:
1919 """Read a resource's content with plugin hook support.
1921 Args:
1922 db: Database session.
1923 resource_id: Optional ID of the resource to read.
1924 resource_uri: Optional URI of the resource to read.
1925 request_id: Optional request ID for tracing.
1926 user: Optional user email for authorization checks.
1927 server_id: Optional server ID for server scoping enforcement.
1928 include_inactive: Whether to include inactive resources. Defaults to False.
1929 token_teams: Optional list of team IDs from token for authorization.
1930 None = unrestricted admin, [] = public-only, [...] = team-scoped.
1931 plugin_context_table: Optional plugin context table from previous hooks for cross-hook state sharing.
1932 plugin_global_context: Optional global context from middleware for consistency across hooks.
1933 meta_data: Optional metadata dictionary to pass to the gateway during resource reading.
1935 Returns:
1936 Resource content object
1938 Raises:
1939 ResourceNotFoundError: If resource not found or access denied
1940 ResourceError: If blocked by plugin
1941 PluginError: If encounters issue with plugin
1942 PluginViolationError: If plugin violated the request. Example - In case of OPA plugin, if the request is denied by policy.
1943 ValueError: If neither resource_id nor resource_uri is provided
1945 Examples:
1946 >>> from mcpgateway.common.models import ResourceContent
1947 >>> from mcpgateway.services.resource_service import ResourceService
1948 >>> from unittest.mock import MagicMock, PropertyMock
1949 >>> service = ResourceService()
1950 >>> db = MagicMock()
1951 >>> uri = 'http://example.com/resource.txt'
1952 >>> mock_resource = MagicMock()
1953 >>> mock_resource.id = 123
1954 >>> mock_resource.uri = uri
1955 >>> type(mock_resource).content = PropertyMock(return_value='test')
1956 >>> db.execute.return_value.scalar_one_or_none.return_value = mock_resource
1957 >>> db.get.return_value = mock_resource
1958 >>> import asyncio
1959 >>> result = asyncio.run(service.read_resource(db, resource_uri=uri))
1960 >>> result.__class__.__name__ == 'ResourceContent'
1961 True
1963 Not found case returns ResourceNotFoundError:
1965 >>> db2 = MagicMock()
1966 >>> db2.execute.return_value.scalar_one_or_none.return_value = None
1967 >>> db2.get.return_value = None
1968 >>> import asyncio
1969 >>> # Disable path validation for doctest
1970 >>> import mcpgateway.config
1971 >>> old_val = getattr(mcpgateway.config.settings, 'experimental_validate_io', False)
1972 >>> mcpgateway.config.settings.experimental_validate_io = False
1973 >>> def _nf():
1974 ... try:
1975 ... asyncio.run(service.read_resource(db2, resource_uri='abc'))
1976 ... except ResourceNotFoundError:
1977 ... return True
1978 >>> result = _nf()
1979 >>> mcpgateway.config.settings.experimental_validate_io = old_val
1980 >>> result
1981 True
1982 """
1983 start_time = time.monotonic()
1984 success = False
1985 error_message = None
1986 resource_db = None
1987 server_scoped = False
1988 resource_db_gateway = None # Only set when eager-loaded via Q2's joinedload
1989 content = None
1990 uri = resource_uri or "unknown"
1991 if resource_id:
1992 resource_db = db.get(DbResource, resource_id, options=[joinedload(DbResource.gateway)])
1993 if resource_db:
1994 uri = resource_db.uri
1995 resource_db_gateway = resource_db.gateway # Eager-loaded, safe to access
1996 # Check enabled status in Python (avoids redundant Q3/Q4 re-fetches)
1997 if not include_inactive and not resource_db.enabled:
1998 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive")
1999 content = resource_db.content
2000 else:
2001 uri = None
2003 # Create database span for observability dashboard
2004 trace_id = current_trace_id.get()
2005 db_span_id = None
2006 db_span_ended = False
2007 observability_service = ObservabilityService() if trace_id else None
2009 if trace_id and observability_service:
2010 try:
2011 db_span_id = observability_service.start_span(
2012 db=db,
2013 trace_id=trace_id,
2014 name="resource.read",
2015 attributes={
2016 "resource.uri": str(resource_uri) if resource_uri else "unknown",
2017 "user": user or "anonymous",
2018 "server_id": server_id,
2019 "request_id": request_id,
2020 "http.url": uri if uri is not None and uri.startswith("http") else None,
2021 "resource.type": "template" if (uri is not None and "{" in uri and "}" in uri) else "static",
2022 },
2023 )
2024 logger.debug(f"✓ Created resource.read span: {db_span_id} for resource: {uri}")
2025 except Exception as e:
2026 logger.warning(f"Failed to start observability span for resource reading: {e}")
2027 db_span_id = None
2029 with create_span(
2030 "resource.read",
2031 {
2032 "resource.uri": resource_uri or "unknown",
2033 "user": user or "anonymous",
2034 "server_id": server_id,
2035 "request_id": request_id,
2036 "http.url": uri if uri is not None and uri.startswith("http") else None,
2037 "resource.type": "template" if (uri is not None and "{" in uri and "}" in uri) else "static",
2038 },
2039 ) as span:
2040 try:
2041 # Generate request ID if not provided
2042 if not request_id:
2043 request_id = str(uuid.uuid4())
2045 original_uri = uri
2046 contexts = None
2048 # Check if plugin manager is available and eligible for this request
2049 plugin_eligible = bool(self._plugin_manager and PLUGINS_AVAILABLE and uri and ("://" in uri))
2051 # Initialize plugin manager if needed (lazy init must happen before has_hooks_for check)
2052 # pylint: disable=protected-access
2053 if plugin_eligible and not self._plugin_manager._initialized:
2054 await self._plugin_manager.initialize()
2055 # pylint: enable=protected-access
2057 # Check if any resource hooks are registered to avoid unnecessary context creation
2058 has_pre_fetch = plugin_eligible and self._plugin_manager.has_hooks_for(ResourceHookType.RESOURCE_PRE_FETCH)
2059 has_post_fetch = plugin_eligible and self._plugin_manager.has_hooks_for(ResourceHookType.RESOURCE_POST_FETCH)
2061 # Initialize plugin context variables only if hooks are registered
2062 global_context = None
2063 if has_pre_fetch or has_post_fetch:
2064 # Create plugin context
2065 # Normalize user to an identifier string if provided
2066 user_id = None
2067 if user is not None:
2068 if isinstance(user, dict) and "email" in user:
2069 user_id = user.get("email")
2070 elif isinstance(user, str):
2071 user_id = user
2072 else:
2073 # Attempt to fallback to attribute access
2074 user_id = getattr(user, "email", None)
2076 # Use existing global_context from middleware or create new one
2077 if plugin_global_context:
2078 global_context = plugin_global_context
2079 # Update fields with resource-specific information
2080 if user_id:
2081 global_context.user = user_id
2082 if server_id:
2083 global_context.server_id = server_id
2084 else:
2085 # Create new context (fallback when middleware didn't run)
2086 global_context = GlobalContext(request_id=request_id, user=user_id, server_id=server_id)
2088 # Call pre-fetch hooks if registered
2089 if has_pre_fetch:
2090 # Create pre-fetch payload
2091 pre_payload = ResourcePreFetchPayload(uri=uri, metadata={})
2093 # Execute pre-fetch hooks with context from previous hooks
2094 pre_result, contexts = await self._plugin_manager.invoke_hook(
2095 ResourceHookType.RESOURCE_PRE_FETCH,
2096 pre_payload,
2097 global_context,
2098 local_contexts=plugin_context_table, # Pass context from previous hooks
2099 violations_as_exceptions=True,
2100 )
2101 # Use modified URI if plugin changed it
2102 if pre_result.modified_payload:
2103 uri = pre_result.modified_payload.uri
2104 logger.debug(f"Resource URI modified by plugin: {original_uri} -> {uri}")
2106 # Validate resource path if experimental validation is enabled
2107 if getattr(settings, "experimental_validate_io", False) and uri and isinstance(uri, str):
2108 try:
2109 SecurityValidator.validate_path(uri, getattr(settings, "allowed_roots", None))
2110 except ValueError as e:
2111 raise ResourceError(f"Path validation failed: {e}")
2113 # Original resource fetching logic
2114 logger.info(f"Fetching resource: {resource_id} (URI: {uri})")
2116 # Check if resource's gateway is in direct_proxy mode
2117 # First, try to find the resource to get its gateway
2118 # Check for template
2120 if resource_db is None and uri is not None: # and "{" in uri and "}" in uri:
2121 # Matches uri (modified value from pluggins if applicable)
2122 # with uri from resource DB
2123 # if uri is of type resource template then resource is retreived from DB
2124 query = select(DbResource).where(DbResource.uri == str(uri)).where(DbResource.enabled)
2125 if include_inactive:
2126 query = select(DbResource).where(DbResource.uri == str(uri))
2127 resource_db = db.execute(query).scalar_one_or_none()
2129 # Check for direct_proxy mode
2130 if resource_db and resource_db.gateway and getattr(resource_db.gateway, "gateway_mode", "cache") == "direct_proxy" and settings.mcpgateway_direct_proxy_enabled:
2131 # SECURITY: Check gateway access before allowing direct proxy
2132 if not await check_gateway_access(db, resource_db.gateway, user, token_teams):
2133 raise ResourceNotFoundError(f"Resource not found: {uri}")
2135 logger.info(f"Using direct_proxy mode for resource '{uri}' via gateway {resource_db.gateway.id}")
2137 try: # First-Party
2138 # First-Party
2139 from mcpgateway.common.models import BlobResourceContents, TextResourceContents # pylint: disable=import-outside-toplevel
2141 gateway = resource_db.gateway
2143 # Prepare headers with gateway auth
2144 headers = build_gateway_auth_headers(gateway)
2146 # Use MCP SDK to connect and read resource
2147 async with streamablehttp_client(url=gateway.url, headers=headers, timeout=settings.mcpgateway_direct_proxy_timeout) as (read_stream, write_stream, _get_session_id):
2148 async with ClientSession(read_stream, write_stream) as session:
2149 await session.initialize()
2151 # Note: MCP SDK read_resource() only accepts uri; _meta is not supported
2152 result = await session.read_resource(uri=uri)
2154 # Convert MCP result to MCP-compliant content models
2155 # result.contents is a list of TextResourceContents or BlobResourceContents
2156 if result.contents:
2157 first_content = result.contents[0]
2158 if hasattr(first_content, "text"):
2159 content = TextResourceContents(uri=uri, mimeType=first_content.mimeType if hasattr(first_content, "mimeType") else "text/plain", text=first_content.text)
2160 elif hasattr(first_content, "blob"):
2161 content = BlobResourceContents(
2162 uri=uri, mimeType=first_content.mimeType if hasattr(first_content, "mimeType") else "application/octet-stream", blob=first_content.blob
2163 )
2164 else:
2165 content = TextResourceContents(uri=uri, text="")
2166 else:
2167 content = TextResourceContents(uri=uri, text="")
2169 success = True
2170 logger.info(f"[READ RESOURCE] Using direct_proxy mode for gateway {gateway.id} (from X-Context-Forge-Gateway-Id header). Meta Attached: {meta_data is not None}")
2171 # Skip the rest of the DB lookup logic
2173 except Exception as e:
2174 logger.exception(f"Error in direct_proxy mode for resource '{uri}': {e}")
2175 raise ResourceError(f"Direct proxy resource read failed: {str(e)}")
2177 elif resource_db:
2178 # Normal cache mode - resource found in DB
2179 content = resource_db.content
2180 else:
2181 # Check the inactivity first
2182 check_inactivity = db.execute(select(DbResource).where(DbResource.uri == str(resource_uri)).where(not_(DbResource.enabled))).scalar_one_or_none()
2183 if check_inactivity:
2184 raise ResourceNotFoundError(f"Resource '{resource_uri}' exists but is inactive")
2186 if resource_db is None:
2187 if resource_uri:
2188 # if resource_uri is provided
2189 # modified uri have templatized resource with prefilled value
2190 # triggers _read_template_resource
2191 # it internally checks which uri matches the pattern of modified uri and fetches
2192 # the one which matches else raises ResourceNotFoundError
2193 try:
2194 content = await self._read_template_resource(db, uri) or None
2195 # ═══════════════════════════════════════════════════════════════════════════
2196 # SECURITY: Fetch the template's DbResource record for access checking
2197 # _read_template_resource returns ResourceContent with the template's ID
2198 # ═══════════════════════════════════════════════════════════════════════════
2199 if content is not None and hasattr(content, "id") and content.id:
2200 template_query = select(DbResource).where(DbResource.id == str(content.id))
2201 if not include_inactive:
2202 template_query = template_query.where(DbResource.enabled)
2203 resource_db = db.execute(template_query).scalar_one_or_none()
2204 except Exception as e:
2205 raise ResourceNotFoundError(f"Resource template not found for '{resource_uri}'") from e
2207 if resource_uri:
2208 if content is None and resource_db is None:
2209 raise ResourceNotFoundError(f"Resource template not found for '{resource_uri}'")
2211 if resource_id and resource_db is None:
2212 # if resource_id provided but not found by Q2 (shouldn't normally happen,
2213 # but handles race conditions where resource is deleted between requests)
2214 query = select(DbResource).where(DbResource.id == str(resource_id)).where(DbResource.enabled)
2215 if include_inactive:
2216 query = select(DbResource).where(DbResource.id == str(resource_id))
2217 resource_db = db.execute(query).scalar_one_or_none()
2218 if resource_db:
2219 original_uri = resource_db.uri or None
2220 content = resource_db.content
2221 else:
2222 check_inactivity = db.execute(select(DbResource).where(DbResource.id == str(resource_id)).where(not_(DbResource.enabled))).scalar_one_or_none()
2223 if check_inactivity:
2224 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive")
2225 raise ResourceNotFoundError(f"Resource not found for the resource id: {resource_id}")
2227 # ═══════════════════════════════════════════════════════════════════════════
2228 # SECURITY: Check resource access based on visibility and team membership
2229 # ═══════════════════════════════════════════════════════════════════════════
2230 if resource_db:
2231 if not await self._check_resource_access(db, resource_db, user, token_teams):
2232 # Don't reveal resource existence - return generic "not found"
2233 raise ResourceNotFoundError(f"Resource not found: {resource_uri or resource_id}")
2235 # ═══════════════════════════════════════════════════════════════════════════
2236 # SECURITY: Enforce server scoping if server_id is provided
2237 # Resource must be attached to the specified virtual server
2238 # ═══════════════════════════════════════════════════════════════════════════
2239 if server_id:
2240 server_match = db.execute(
2241 select(server_resource_association.c.resource_id).where(
2242 server_resource_association.c.server_id == server_id,
2243 server_resource_association.c.resource_id == resource_db.id,
2244 )
2245 ).first()
2246 if not server_match:
2247 raise ResourceNotFoundError(f"Resource not found: {resource_uri or resource_id}")
2248 server_scoped = True
2250 # Set success attributes on span
2251 if span:
2252 span.set_attribute("success", True)
2253 span.set_attribute("duration.ms", (time.monotonic() - start_time) * 1000)
2254 if content:
2255 span.set_attribute("content.size", len(str(content)))
2257 success = True
2258 # Return standardized content without breaking callers that expect passthrough
2259 # Prefer returning first-class content models or objects with content-like attributes.
2260 # ResourceContent and TextContent already imported at top level
2262 # Release transaction before network calls to avoid idle-in-transaction during invoke_resource
2263 db.commit()
2265 # ═══════════════════════════════════════════════════════════════════════════
2266 # RESOLVE CONTENT: Fetch actual content from gateway if needed
2267 # ═══════════════════════════════════════════════════════════════════════════
2268 # If content is a Pydantic content model, invoke gateway
2270 # ResourceContents covers TextResourceContents and BlobResourceContents (MCP-compliant)
2271 # ResourceContent is the legacy model for backwards compatibility
2273 if isinstance(content, (ResourceContent, ResourceContents, TextContent)):
2274 # Metrics are recorded in read_resource finally block for all resources
2275 resource_response = await self.invoke_resource(
2276 db=db,
2277 resource_id=getattr(content, "id"),
2278 resource_uri=getattr(content, "uri") or None,
2279 resource_template_uri=getattr(content, "text") or None,
2280 user_identity=user,
2281 meta_data=meta_data,
2282 resource_obj=resource_db,
2283 gateway_obj=resource_db_gateway,
2284 server_id=server_id,
2285 )
2286 if resource_response:
2287 setattr(content, "text", resource_response)
2288 # If content is any object that quacks like content
2289 elif hasattr(content, "text") or hasattr(content, "blob"):
2290 # Metrics are recorded in read_resource finally block for all resources
2291 if hasattr(content, "blob"):
2292 resource_response = await self.invoke_resource(
2293 db=db,
2294 resource_id=getattr(content, "id"),
2295 resource_uri=getattr(content, "uri") or None,
2296 resource_template_uri=getattr(content, "blob") or None,
2297 user_identity=user,
2298 meta_data=meta_data,
2299 resource_obj=resource_db,
2300 gateway_obj=resource_db_gateway,
2301 server_id=server_id,
2302 )
2303 if resource_response:
2304 setattr(content, "blob", resource_response)
2305 elif hasattr(content, "text"):
2306 resource_response = await self.invoke_resource(
2307 db=db,
2308 resource_id=getattr(content, "id"),
2309 resource_uri=getattr(content, "uri") or None,
2310 resource_template_uri=getattr(content, "text") or None,
2311 user_identity=user,
2312 meta_data=meta_data,
2313 resource_obj=resource_db,
2314 gateway_obj=resource_db_gateway,
2315 server_id=server_id,
2316 )
2317 if resource_response:
2318 setattr(content, "text", resource_response)
2319 # Normalize primitive types to ResourceContent
2320 elif isinstance(content, bytes):
2321 content = ResourceContent(type="resource", id=str(resource_id), uri=original_uri, blob=content)
2322 elif isinstance(content, str):
2323 content = ResourceContent(type="resource", id=str(resource_id), uri=original_uri, text=content)
2324 else:
2325 # Fallback to stringified content
2326 content = ResourceContent(type="resource", id=str(resource_id) or str(content.id), uri=original_uri or content.uri, text=str(content))
2328 # ═══════════════════════════════════════════════════════════════════════════
2329 # POST-FETCH HOOKS: Now called AFTER content is resolved from gateway
2330 # ═══════════════════════════════════════════════════════════════════════════
2331 if has_post_fetch:
2332 post_payload = ResourcePostFetchPayload(uri=original_uri, content=content)
2333 post_result, _ = await self._plugin_manager.invoke_hook(ResourceHookType.RESOURCE_POST_FETCH, post_payload, global_context, contexts, violations_as_exceptions=True)
2334 if post_result.modified_payload:
2335 content = post_result.modified_payload.content
2337 return content
2338 except Exception as e:
2339 success = False
2340 error_message = str(e)
2341 raise
2342 finally:
2343 # Record metrics only if we found a resource (not for templates)
2344 logger.debug(f"read_resource finally block: resource_db={'present' if resource_db else None}, resource_id={resource_db.id if resource_db else None}, server_id={server_id}")
2346 if resource_db:
2347 try:
2348 metrics_buffer.record_resource_metric(
2349 resource_id=resource_db.id,
2350 start_time=start_time,
2351 success=success,
2352 error_message=error_message,
2353 )
2354 except Exception as metrics_error:
2355 logger.warning(f"Failed to record resource metric: {metrics_error}")
2357 # Record server metrics ONLY when the server scoping check passed.
2358 # This prevents recording metrics with unvalidated server_id values
2359 # from admin API headers (X-Server-ID) or RPC params.
2360 if resource_db and server_scoped:
2361 try:
2362 logger.debug(f"Recording server metric for server_id={server_id}, resource_id={resource_db.id}, success={success}")
2363 # Record server metric only for the specific virtual server being accessed
2364 metrics_buffer.record_server_metric(
2365 server_id=server_id,
2366 start_time=start_time,
2367 success=success,
2368 error_message=error_message,
2369 )
2370 except Exception as metrics_error:
2371 logger.warning(f"Failed to record server metric: {metrics_error}")
2373 # End database span for observability dashboard
2374 # NOTE: Use fresh_db_session() since db may have been closed by invoke_resource
2375 if db_span_id and observability_service and not db_span_ended:
2376 try:
2377 with fresh_db_session() as fresh_db:
2378 observability_service.end_span(
2379 db=fresh_db,
2380 span_id=db_span_id,
2381 status="ok" if success else "error",
2382 status_message=error_message if error_message else None,
2383 )
2384 db_span_ended = True
2385 logger.debug(f"✓ Ended resource.read span: {db_span_id}")
2386 except Exception as e:
2387 logger.warning(f"Failed to end observability span for resource reading: {e}")
2389 async def set_resource_state(self, db: Session, resource_id: int, activate: bool, user_email: Optional[str] = None, skip_cache_invalidation: bool = False) -> ResourceRead:
2390 """
2391 Set the activation status of a resource.
2393 Args:
2394 db: Database session
2395 resource_id: Resource ID
2396 activate: True to activate, False to deactivate
2397 user_email: Optional[str] The email of the user to check if the user has permission to modify.
2398 skip_cache_invalidation: If True, skip cache invalidation (used for batch operations).
2400 Returns:
2401 The updated ResourceRead object
2403 Raises:
2404 ResourceNotFoundError: If the resource is not found.
2405 ResourceLockConflictError: If the resource is locked by another transaction.
2406 ResourceError: For other errors.
2407 PermissionError: If user doesn't own the resource.
2409 Examples:
2410 >>> from mcpgateway.services.resource_service import ResourceService
2411 >>> from unittest.mock import MagicMock, AsyncMock
2412 >>> from mcpgateway.schemas import ResourceRead
2413 >>> service = ResourceService()
2414 >>> db = MagicMock()
2415 >>> resource = MagicMock()
2416 >>> db.get.return_value = resource
2417 >>> db.commit = MagicMock()
2418 >>> db.refresh = MagicMock()
2419 >>> service._notify_resource_activated = AsyncMock()
2420 >>> service._notify_resource_deactivated = AsyncMock()
2421 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read')
2422 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read')
2423 >>> import asyncio
2424 >>> asyncio.run(service.set_resource_state(db, 1, True))
2425 'resource_read'
2426 """
2427 try:
2428 # Use nowait=True to fail fast if row is locked, preventing lock contention under high load
2429 try:
2430 resource = get_for_update(db, DbResource, resource_id, nowait=True)
2431 except OperationalError as lock_err:
2432 # Row is locked by another transaction - fail fast with 409
2433 db.rollback()
2434 raise ResourceLockConflictError(f"Resource {resource_id} is currently being modified by another request") from lock_err
2435 if not resource:
2436 raise ResourceNotFoundError(f"Resource not found: {resource_id}")
2438 if user_email:
2439 # First-Party
2440 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
2442 permission_service = PermissionService(db)
2443 if not await permission_service.check_resource_ownership(user_email, resource):
2444 raise PermissionError("Only the owner can activate the Resource" if activate else "Only the owner can deactivate the Resource")
2446 # Update status if it's different
2447 if resource.enabled != activate:
2448 resource.enabled = activate
2449 resource.updated_at = datetime.now(timezone.utc)
2450 db.commit()
2451 db.refresh(resource)
2453 # Invalidate cache after status change (skip for batch operations)
2454 if not skip_cache_invalidation:
2455 cache = _get_registry_cache()
2456 await cache.invalidate_resources()
2458 # Notify subscribers
2459 if activate:
2460 await self._notify_resource_activated(resource)
2461 else:
2462 await self._notify_resource_deactivated(resource)
2464 logger.info(f"Resource {resource.uri} {'activated' if activate else 'deactivated'}")
2466 # Structured logging: Audit trail for resource state change
2467 audit_trail.log_action(
2468 user_id=user_email or "system",
2469 action="set_resource_state",
2470 resource_type="resource",
2471 resource_id=str(resource.id),
2472 resource_name=resource.name,
2473 user_email=user_email,
2474 team_id=resource.team_id,
2475 new_values={
2476 "enabled": resource.enabled,
2477 },
2478 context={
2479 "action": "activate" if activate else "deactivate",
2480 },
2481 db=db,
2482 )
2484 # Structured logging: Log successful resource state change
2485 structured_logger.log(
2486 level="INFO",
2487 message=f"Resource {'activated' if activate else 'deactivated'} successfully",
2488 event_type="resource_state_changed",
2489 component="resource_service",
2490 user_email=user_email,
2491 team_id=resource.team_id,
2492 resource_type="resource",
2493 resource_id=str(resource.id),
2494 custom_fields={
2495 "resource_uri": resource.uri,
2496 "enabled": resource.enabled,
2497 },
2498 )
2500 resource.team = self._get_team_name(db, resource.team_id)
2501 return self.convert_resource_to_read(resource)
2502 except PermissionError as e:
2503 # Structured logging: Log permission error
2504 structured_logger.log(
2505 level="WARNING",
2506 message="Resource state change failed due to permission error",
2507 event_type="resource_state_change_permission_denied",
2508 component="resource_service",
2509 user_email=user_email,
2510 resource_type="resource",
2511 resource_id=str(resource_id),
2512 error=e,
2513 )
2514 raise e
2515 except ResourceLockConflictError:
2516 # Re-raise lock conflicts without wrapping - allows 409 response
2517 raise
2518 except ResourceNotFoundError:
2519 # Re-raise not found without wrapping - allows 404 response
2520 raise
2521 except Exception as e:
2522 db.rollback()
2524 # Structured logging: Log generic resource state change failure
2525 structured_logger.log(
2526 level="ERROR",
2527 message="Resource state change failed",
2528 event_type="resource_state_change_failed",
2529 component="resource_service",
2530 user_email=user_email,
2531 resource_type="resource",
2532 resource_id=str(resource_id),
2533 error=e,
2534 )
2535 raise ResourceError(f"Failed to set resource state: {str(e)}")
2537 async def subscribe_resource(
2538 self,
2539 db: Session,
2540 subscription: ResourceSubscription,
2541 *,
2542 user_email: Optional[str] = None,
2543 token_teams: Optional[List[str]] = None,
2544 ) -> None:
2545 """
2546 Subscribe to a resource.
2548 Args:
2549 db: Database session
2550 subscription: Resource subscription object
2551 user_email: Requester email used for visibility checks.
2552 token_teams: Token team scope used for visibility checks.
2554 Raises:
2555 ResourceNotFoundError: If the resource is not found or is inactive
2556 PermissionError: If the requester is not authorized for the resource
2557 ResourceError: For other subscription errors
2559 Examples:
2560 >>> from mcpgateway.services.resource_service import ResourceService
2561 >>> from unittest.mock import MagicMock
2562 >>> service = ResourceService()
2563 >>> db = MagicMock()
2564 >>> subscription = MagicMock()
2565 >>> import asyncio
2566 >>> asyncio.run(service.subscribe_resource(db, subscription))
2567 """
2568 try:
2569 # Verify resource exists (single query to avoid TOCTOU between active/inactive checks)
2570 resource = db.execute(select(DbResource).where(DbResource.uri == subscription.uri)).scalar_one_or_none()
2572 if not resource:
2573 raise ResourceNotFoundError(f"Resource not found: {subscription.uri}")
2575 if not resource.enabled:
2576 raise ResourceNotFoundError(f"Resource '{subscription.uri}' exists but is inactive")
2578 if not await self._check_resource_access(db, resource, user_email=user_email, token_teams=token_teams):
2579 raise PermissionError(f"Access denied for resource subscription: {subscription.uri}")
2581 # Create subscription
2582 db_sub = DbSubscription(resource_id=resource.id, subscriber_id=subscription.subscriber_id)
2583 db.add(db_sub)
2584 db.commit()
2586 logger.info(f"Added subscription for {subscription.uri} by {subscription.subscriber_id}")
2588 except PermissionError:
2589 db.rollback()
2590 raise
2591 except Exception as e:
2592 db.rollback()
2593 raise ResourceError(f"Failed to subscribe: {str(e)}")
2595 async def unsubscribe_resource(self, db: Session, subscription: ResourceSubscription) -> None:
2596 """
2597 Unsubscribe from a resource.
2599 Args:
2600 db: Database session
2601 subscription: Resource subscription object
2603 Raises:
2605 Examples:
2606 >>> from mcpgateway.services.resource_service import ResourceService
2607 >>> from unittest.mock import MagicMock
2608 >>> service = ResourceService()
2609 >>> db = MagicMock()
2610 >>> subscription = MagicMock()
2611 >>> import asyncio
2612 >>> asyncio.run(service.unsubscribe_resource(db, subscription))
2613 """
2614 try:
2615 # Find resource
2616 resource = db.execute(select(DbResource).where(DbResource.uri == subscription.uri)).scalar_one_or_none()
2618 if not resource:
2619 return
2621 # Remove subscription
2622 db.execute(select(DbSubscription).where(DbSubscription.resource_id == resource.id).where(DbSubscription.subscriber_id == subscription.subscriber_id)).delete()
2623 db.commit()
2625 logger.info(f"Removed subscription for {subscription.uri} by {subscription.subscriber_id}")
2627 except Exception as e:
2628 db.rollback()
2629 logger.error(f"Failed to unsubscribe: {str(e)}")
2631 async def update_resource(
2632 self,
2633 db: Session,
2634 resource_id: Union[int, str],
2635 resource_update: ResourceUpdate,
2636 modified_by: Optional[str] = None,
2637 modified_from_ip: Optional[str] = None,
2638 modified_via: Optional[str] = None,
2639 modified_user_agent: Optional[str] = None,
2640 user_email: Optional[str] = None,
2641 ) -> ResourceRead:
2642 """
2643 Update a resource.
2645 Args:
2646 db: Database session
2647 resource_id: Resource ID
2648 resource_update: Resource update object
2649 modified_by: Username of the person modifying the resource
2650 modified_from_ip: IP address where the modification request originated
2651 modified_via: Source of modification (ui/api/import)
2652 modified_user_agent: User agent string from the modification request
2653 user_email: Email of user performing update (for ownership check)
2655 Returns:
2656 The updated ResourceRead object
2658 Raises:
2659 ResourceNotFoundError: If the resource is not found
2660 ResourceURIConflictError: If a resource with the same URI already exists.
2661 PermissionError: If user doesn't own the resource
2662 ResourceError: For other update errors
2663 IntegrityError: If a database integrity error occurs.
2664 Exception: For unexpected errors
2666 Example:
2667 >>> from mcpgateway.services.resource_service import ResourceService
2668 >>> from unittest.mock import MagicMock, AsyncMock
2669 >>> from mcpgateway.schemas import ResourceRead
2670 >>> service = ResourceService()
2671 >>> db = MagicMock()
2672 >>> resource = MagicMock()
2673 >>> db.get.return_value = resource
2674 >>> db.commit = MagicMock()
2675 >>> db.refresh = MagicMock()
2676 >>> service._notify_resource_updated = AsyncMock()
2677 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read')
2678 >>> ResourceRead.model_validate = MagicMock(return_value='resource_read')
2679 >>> import asyncio
2680 >>> asyncio.run(service.update_resource(db, 'resource_id', MagicMock()))
2681 'resource_read'
2682 """
2683 try:
2684 logger.info(f"Updating resource: {resource_id}")
2685 resource = get_for_update(db, DbResource, resource_id)
2686 if not resource:
2687 raise ResourceNotFoundError(f"Resource not found: {resource_id}")
2689 # # Check for uri conflict if uri is being changed and visibility is public
2690 if resource_update.uri and resource_update.uri != resource.uri:
2691 visibility = resource_update.visibility or resource.visibility
2692 team_id = resource_update.team_id or resource.team_id
2693 if visibility.lower() == "public":
2694 # Check for existing public resources with the same uri
2695 existing_resource = get_for_update(db, DbResource, where=and_(DbResource.uri == resource_update.uri, DbResource.visibility == "public", DbResource.id != resource_id))
2696 if existing_resource:
2697 raise ResourceURIConflictError(resource_update.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility)
2698 elif visibility.lower() == "team" and team_id:
2699 # Check for existing team resource with the same uri
2700 existing_resource = get_for_update(
2701 db, DbResource, where=and_(DbResource.uri == resource_update.uri, DbResource.visibility == "team", DbResource.team_id == team_id, DbResource.id != resource_id)
2702 )
2703 if existing_resource:
2704 raise ResourceURIConflictError(resource_update.uri, enabled=existing_resource.enabled, resource_id=existing_resource.id, visibility=existing_resource.visibility)
2706 # Check ownership if user_email provided
2707 if user_email:
2708 # First-Party
2709 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
2711 permission_service = PermissionService(db)
2712 if not await permission_service.check_resource_ownership(user_email, resource):
2713 raise PermissionError("Only the owner can update this resource")
2715 # Update fields if provided
2716 if resource_update.uri is not None:
2717 resource.uri = resource_update.uri
2718 if resource_update.name is not None:
2719 resource.name = resource_update.name
2720 if resource_update.description is not None:
2721 resource.description = resource_update.description
2722 if resource_update.mime_type is not None:
2723 resource.mime_type = resource_update.mime_type
2724 if resource_update.uri_template is not None:
2725 resource.uri_template = resource_update.uri_template
2726 if resource_update.visibility is not None:
2727 resource.visibility = resource_update.visibility
2729 # Update content if provided
2730 if resource_update.content is not None:
2731 # Determine content storage
2732 is_text = resource.mime_type and resource.mime_type.startswith("text/") or isinstance(resource_update.content, str)
2734 resource.text_content = resource_update.content if is_text else None
2735 resource.binary_content = (
2736 resource_update.content.encode() if is_text and isinstance(resource_update.content, str) else resource_update.content if isinstance(resource_update.content, bytes) else None
2737 )
2738 resource.size = len(resource_update.content)
2740 # Update tags if provided
2741 if resource_update.tags is not None:
2742 resource.tags = resource_update.tags
2744 # Update metadata fields
2745 resource.updated_at = datetime.now(timezone.utc)
2746 if modified_by:
2747 resource.modified_by = modified_by
2748 if modified_from_ip:
2749 resource.modified_from_ip = modified_from_ip
2750 if modified_via:
2751 resource.modified_via = modified_via
2752 if modified_user_agent:
2753 resource.modified_user_agent = modified_user_agent
2754 if hasattr(resource, "version") and resource.version is not None:
2755 resource.version = resource.version + 1
2756 else:
2757 resource.version = 1
2758 db.commit()
2759 db.refresh(resource)
2761 # Invalidate cache after successful update
2762 cache = _get_registry_cache()
2763 await cache.invalidate_resources()
2764 # Also invalidate tags cache since resource tags may have changed
2765 # First-Party
2766 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
2768 await admin_stats_cache.invalidate_tags()
2769 # First-Party
2770 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
2772 metrics_cache.invalidate_prefix("top_resources:")
2773 metrics_cache.invalidate("resources")
2775 # Notify subscribers
2776 await self._notify_resource_updated(resource)
2778 logger.info(f"Updated resource: {resource.uri}")
2780 # Structured logging: Audit trail for resource update
2781 changes = []
2782 if resource_update.uri:
2783 changes.append(f"uri: {resource_update.uri}")
2784 if resource_update.visibility:
2785 changes.append(f"visibility: {resource_update.visibility}")
2786 if resource_update.description:
2787 changes.append("description updated")
2789 audit_trail.log_action(
2790 user_id=user_email or modified_by or "system",
2791 action="update_resource",
2792 resource_type="resource",
2793 resource_id=str(resource.id),
2794 resource_name=resource.name,
2795 user_email=user_email,
2796 team_id=resource.team_id,
2797 client_ip=modified_from_ip,
2798 user_agent=modified_user_agent,
2799 new_values={
2800 "uri": resource.uri,
2801 "name": resource.name,
2802 "version": resource.version,
2803 },
2804 context={
2805 "modified_via": modified_via,
2806 "changes": ", ".join(changes) if changes else "metadata only",
2807 },
2808 db=db,
2809 )
2811 # Structured logging: Log successful resource update
2812 structured_logger.log(
2813 level="INFO",
2814 message="Resource updated successfully",
2815 event_type="resource_updated",
2816 component="resource_service",
2817 user_id=modified_by,
2818 user_email=user_email,
2819 team_id=resource.team_id,
2820 resource_type="resource",
2821 resource_id=str(resource.id),
2822 custom_fields={
2823 "resource_uri": resource.uri,
2824 "version": resource.version,
2825 },
2826 )
2828 return self.convert_resource_to_read(resource)
2829 except PermissionError as pe:
2830 db.rollback()
2832 # Structured logging: Log permission error
2833 structured_logger.log(
2834 level="WARNING",
2835 message="Resource update failed due to permission error",
2836 event_type="resource_update_permission_denied",
2837 component="resource_service",
2838 user_email=user_email,
2839 resource_type="resource",
2840 resource_id=str(resource_id),
2841 error=pe,
2842 )
2843 raise
2844 except IntegrityError as ie:
2845 db.rollback()
2846 logger.error(f"IntegrityErrors in group: {ie}")
2848 # Structured logging: Log database integrity error
2849 structured_logger.log(
2850 level="ERROR",
2851 message="Resource update failed due to database integrity error",
2852 event_type="resource_update_failed",
2853 component="resource_service",
2854 user_id=modified_by,
2855 user_email=user_email,
2856 resource_type="resource",
2857 resource_id=str(resource_id),
2858 error=ie,
2859 )
2860 raise ie
2861 except ResourceURIConflictError as pe:
2862 logger.error(f"Resource URI conflict: {pe}")
2864 # Structured logging: Log URI conflict error
2865 structured_logger.log(
2866 level="WARNING",
2867 message="Resource update failed due to URI conflict",
2868 event_type="resource_uri_conflict",
2869 component="resource_service",
2870 user_id=modified_by,
2871 user_email=user_email,
2872 resource_type="resource",
2873 resource_id=str(resource_id),
2874 error=pe,
2875 )
2876 raise pe
2877 except Exception as e:
2878 db.rollback()
2879 if isinstance(e, ResourceNotFoundError):
2880 # Structured logging: Log not found error
2881 structured_logger.log(
2882 level="ERROR",
2883 message="Resource update failed - resource not found",
2884 event_type="resource_not_found",
2885 component="resource_service",
2886 user_email=user_email,
2887 resource_type="resource",
2888 resource_id=str(resource_id),
2889 error=e,
2890 )
2891 raise e
2893 # Structured logging: Log generic resource update failure
2894 structured_logger.log(
2895 level="ERROR",
2896 message="Resource update failed",
2897 event_type="resource_update_failed",
2898 component="resource_service",
2899 user_id=modified_by,
2900 user_email=user_email,
2901 resource_type="resource",
2902 resource_id=str(resource_id),
2903 error=e,
2904 )
2905 raise ResourceError(f"Failed to update resource: {str(e)}")
2907 async def delete_resource(self, db: Session, resource_id: Union[int, str], user_email: Optional[str] = None, purge_metrics: bool = False) -> None:
2908 """
2909 Delete a resource.
2911 Args:
2912 db: Database session
2913 resource_id: Resource ID
2914 user_email: Email of user performing delete (for ownership check)
2915 purge_metrics: If True, delete raw + rollup metrics for this resource
2917 Raises:
2918 ResourceNotFoundError: If the resource is not found
2919 PermissionError: If user doesn't own the resource
2920 ResourceError: For other deletion errors
2922 Example:
2923 >>> from mcpgateway.services.resource_service import ResourceService
2924 >>> from unittest.mock import MagicMock, AsyncMock
2925 >>> service = ResourceService()
2926 >>> db = MagicMock()
2927 >>> resource = MagicMock()
2928 >>> db.get.return_value = resource
2929 >>> db.delete = MagicMock()
2930 >>> db.commit = MagicMock()
2931 >>> service._notify_resource_deleted = AsyncMock()
2932 >>> import asyncio
2933 >>> asyncio.run(service.delete_resource(db, 'resource_id'))
2934 """
2935 try:
2936 # Find resource by its URI.
2937 resource = db.execute(select(DbResource).where(DbResource.id == resource_id)).scalar_one_or_none()
2939 if not resource:
2940 # If resource doesn't exist, rollback and re-raise a ResourceNotFoundError.
2941 db.rollback()
2942 raise ResourceNotFoundError(f"Resource not found: {resource_id}")
2944 # Check ownership if user_email provided
2945 if user_email:
2946 # First-Party
2947 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
2949 permission_service = PermissionService(db)
2950 if not await permission_service.check_resource_ownership(user_email, resource):
2951 raise PermissionError("Only the owner can delete this resource")
2953 # Store resource info for notification before deletion.
2954 resource_info = {
2955 "id": resource.id,
2956 "uri": resource.uri,
2957 "name": resource.name,
2958 "visibility": getattr(resource, "visibility", "public"),
2959 "team_id": getattr(resource, "team_id", None),
2960 "owner_email": getattr(resource, "owner_email", None),
2961 }
2963 # Remove subscriptions using SQLAlchemy's delete() expression.
2964 db.execute(delete(DbSubscription).where(DbSubscription.resource_id == resource.id))
2966 if purge_metrics:
2967 with pause_rollup_during_purge(reason=f"purge_resource:{resource.id}"):
2968 delete_metrics_in_batches(db, ResourceMetric, ResourceMetric.resource_id, resource.id)
2969 delete_metrics_in_batches(db, ResourceMetricsHourly, ResourceMetricsHourly.resource_id, resource.id)
2971 # Hard delete the resource.
2972 resource_uri = resource.uri
2973 resource_name = resource.name
2974 resource_team_id = resource.team_id
2976 db.delete(resource)
2977 db.commit()
2979 # Invalidate cache after successful deletion
2980 cache = _get_registry_cache()
2981 await cache.invalidate_resources()
2982 # Also invalidate tags cache since resource tags may have changed
2983 # First-Party
2984 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
2986 await admin_stats_cache.invalidate_tags()
2988 # Notify subscribers.
2989 await self._notify_resource_deleted(resource_info)
2991 logger.info(f"Permanently deleted resource: {resource.uri}")
2993 # Structured logging: Audit trail for resource deletion
2994 audit_trail.log_action(
2995 user_id=user_email or "system",
2996 action="delete_resource",
2997 resource_type="resource",
2998 resource_id=str(resource_info["id"]),
2999 resource_name=resource_name,
3000 user_email=user_email,
3001 team_id=resource_team_id,
3002 old_values={
3003 "uri": resource_uri,
3004 "name": resource_name,
3005 },
3006 db=db,
3007 )
3009 # Structured logging: Log successful resource deletion
3010 structured_logger.log(
3011 level="INFO",
3012 message="Resource deleted successfully",
3013 event_type="resource_deleted",
3014 component="resource_service",
3015 user_email=user_email,
3016 team_id=resource_team_id,
3017 resource_type="resource",
3018 resource_id=str(resource_info["id"]),
3019 custom_fields={
3020 "resource_uri": resource_uri,
3021 "purge_metrics": purge_metrics,
3022 },
3023 )
3025 except PermissionError as pe:
3026 db.rollback()
3028 # Structured logging: Log permission error
3029 structured_logger.log(
3030 level="WARNING",
3031 message="Resource deletion failed due to permission error",
3032 event_type="resource_delete_permission_denied",
3033 component="resource_service",
3034 user_email=user_email,
3035 resource_type="resource",
3036 resource_id=str(resource_id),
3037 error=pe,
3038 )
3039 raise
3040 except ResourceNotFoundError as rnfe:
3041 # ResourceNotFoundError is re-raised to be handled in the endpoint.
3042 # Structured logging: Log not found error
3043 structured_logger.log(
3044 level="ERROR",
3045 message="Resource deletion failed - resource not found",
3046 event_type="resource_not_found",
3047 component="resource_service",
3048 user_email=user_email,
3049 resource_type="resource",
3050 resource_id=str(resource_id),
3051 error=rnfe,
3052 )
3053 raise
3054 except Exception as e:
3055 db.rollback()
3057 # Structured logging: Log generic resource deletion failure
3058 structured_logger.log(
3059 level="ERROR",
3060 message="Resource deletion failed",
3061 event_type="resource_deletion_failed",
3062 component="resource_service",
3063 user_email=user_email,
3064 resource_type="resource",
3065 resource_id=str(resource_id),
3066 error=e,
3067 )
3068 raise ResourceError(f"Failed to delete resource: {str(e)}")
3070 async def get_resource_by_id(self, db: Session, resource_id: str, include_inactive: bool = False) -> ResourceRead:
3071 """
3072 Get a resource by ID.
3074 Args:
3075 db: Database session
3076 resource_id: Resource ID
3077 include_inactive: Whether to include inactive resources
3079 Returns:
3080 ResourceRead: The resource object
3082 Raises:
3083 ResourceNotFoundError: If the resource is not found
3085 Example:
3086 >>> from mcpgateway.services.resource_service import ResourceService
3087 >>> from unittest.mock import MagicMock
3088 >>> service = ResourceService()
3089 >>> db = MagicMock()
3090 >>> resource = MagicMock()
3091 >>> db.execute.return_value.scalar_one_or_none.return_value = resource
3092 >>> service.convert_resource_to_read = MagicMock(return_value='resource_read')
3093 >>> import asyncio
3094 >>> asyncio.run(service.get_resource_by_id(db, "39334ce0ed2644d79ede8913a66930c9"))
3095 'resource_read'
3096 """
3097 query = select(DbResource).where(DbResource.id == resource_id)
3099 if not include_inactive:
3100 query = query.where(DbResource.enabled)
3102 resource = db.execute(query).scalar_one_or_none()
3104 if not resource:
3105 if not include_inactive:
3106 # Check if inactive resource exists
3107 inactive_resource = db.execute(select(DbResource).where(DbResource.id == resource_id).where(not_(DbResource.enabled))).scalar_one_or_none()
3109 if inactive_resource:
3110 raise ResourceNotFoundError(f"Resource '{resource_id}' exists but is inactive")
3112 raise ResourceNotFoundError(f"Resource not found: {resource_id}")
3114 resource_read = self.convert_resource_to_read(resource)
3116 structured_logger.log(
3117 level="INFO",
3118 message="Resource retrieved successfully",
3119 event_type="resource_viewed",
3120 component="resource_service",
3121 team_id=getattr(resource, "team_id", None),
3122 resource_type="resource",
3123 resource_id=str(resource.id),
3124 custom_fields={
3125 "resource_uri": resource.uri,
3126 "include_inactive": include_inactive,
3127 },
3128 )
3130 return resource_read
3132 async def _notify_resource_activated(self, resource: DbResource) -> None:
3133 """
3134 Notify subscribers of resource activation.
3136 Args:
3137 resource: Resource to activate
3138 """
3139 event = {
3140 "type": "resource_activated",
3141 "data": {
3142 "id": resource.id,
3143 "uri": resource.uri,
3144 "name": resource.name,
3145 "enabled": True,
3146 "visibility": getattr(resource, "visibility", "public"),
3147 "team_id": getattr(resource, "team_id", None),
3148 "owner_email": getattr(resource, "owner_email", None),
3149 },
3150 "timestamp": datetime.now(timezone.utc).isoformat(),
3151 }
3152 await self._publish_event(event)
3154 async def _notify_resource_deactivated(self, resource: DbResource) -> None:
3155 """
3156 Notify subscribers of resource deactivation.
3158 Args:
3159 resource: Resource to deactivate
3160 """
3161 event = {
3162 "type": "resource_deactivated",
3163 "data": {
3164 "id": resource.id,
3165 "uri": resource.uri,
3166 "name": resource.name,
3167 "enabled": False,
3168 "visibility": getattr(resource, "visibility", "public"),
3169 "team_id": getattr(resource, "team_id", None),
3170 "owner_email": getattr(resource, "owner_email", None),
3171 },
3172 "timestamp": datetime.now(timezone.utc).isoformat(),
3173 }
3174 await self._publish_event(event)
3176 async def _notify_resource_deleted(self, resource_info: Dict[str, Any]) -> None:
3177 """
3178 Notify subscribers of resource deletion.
3180 Args:
3181 resource_info: Dictionary of resource to delete
3182 """
3183 event = {
3184 "type": "resource_deleted",
3185 "data": resource_info,
3186 "timestamp": datetime.now(timezone.utc).isoformat(),
3187 }
3188 await self._publish_event(event)
3190 async def _notify_resource_removed(self, resource: DbResource) -> None:
3191 """
3192 Notify subscribers of resource removal.
3194 Args:
3195 resource: Resource to remove
3196 """
3197 event = {
3198 "type": "resource_removed",
3199 "data": {
3200 "id": resource.id,
3201 "uri": resource.uri,
3202 "name": resource.name,
3203 "enabled": False,
3204 "visibility": getattr(resource, "visibility", "public"),
3205 "team_id": getattr(resource, "team_id", None),
3206 "owner_email": getattr(resource, "owner_email", None),
3207 },
3208 "timestamp": datetime.now(timezone.utc).isoformat(),
3209 }
3210 await self._publish_event(event)
3212 async def _event_visible_to_subscriber(self, event: Dict[str, Any], user_email: Optional[str], token_teams: Optional[List[str]]) -> bool:
3213 """Return whether a resource event is visible to a subscriber context.
3215 Args:
3216 event: Event payload emitted by the resource event stream.
3217 user_email: Subscriber email. ``None`` only for unrestricted admin context.
3218 token_teams: Subscriber token team scope.
3220 Returns:
3221 ``True`` when the event is visible to the subscriber, otherwise ``False``.
3222 """
3223 data = event.get("data") if isinstance(event, dict) else None
3224 if not isinstance(data, dict):
3225 return False
3227 visibility = data.get("visibility") or "public"
3228 team_id = data.get("team_id")
3229 owner_email = data.get("owner_email")
3230 event_resource = SimpleNamespace(visibility=visibility, team_id=team_id, owner_email=owner_email)
3232 effective_token_teams = token_teams
3233 if user_email and effective_token_teams is None:
3234 # Non-admin scoped flows should pass token_teams explicitly. If not,
3235 # fail closed to public-only for event filtering.
3236 effective_token_teams = []
3238 return await self._check_resource_access(
3239 db=None, # type: ignore[arg-type]
3240 resource=event_resource, # type: ignore[arg-type]
3241 user_email=user_email,
3242 token_teams=effective_token_teams,
3243 )
3245 async def subscribe_events(self, user_email: Optional[str] = None, token_teams: Optional[List[str]] = None) -> AsyncGenerator[Dict[str, Any], None]:
3246 """Subscribe to Resource events via the EventService.
3248 Args:
3249 user_email: Requesting user email. ``None`` with ``token_teams=None`` indicates unrestricted admin context.
3250 token_teams: Token team scope context:
3251 - ``None`` = unrestricted admin
3252 - ``[]`` = public-only
3253 - ``[...]`` = team-scoped access
3255 Yields:
3256 Resource event messages.
3257 """
3258 async for event in self._event_service.subscribe_events():
3259 if user_email is None and token_teams is None:
3260 yield event
3261 continue
3263 if await self._event_visible_to_subscriber(event, user_email, token_teams):
3264 yield event
3266 def _detect_mime_type(self, uri: str, content: Union[str, bytes]) -> str:
3267 """Detect mime type from URI and content.
3269 Args:
3270 uri: Resource URI
3271 content: Resource content
3273 Returns:
3274 Detected mime type
3275 """
3276 # Try from URI first
3277 mime_type, _ = mimetypes.guess_type(uri)
3278 if mime_type:
3279 return mime_type
3281 # Check content type
3282 if isinstance(content, str):
3283 return "text/plain"
3285 return "application/octet-stream"
3287 async def _read_template_resource(self, db: Session, uri: str, include_inactive: Optional[bool] = False) -> ResourceContent:
3288 """
3289 Read a templated resource.
3291 Args:
3292 db: Database session.
3293 uri: Template URI with parameters.
3294 include_inactive: Whether to include inactive resources in DB lookups.
3296 Returns:
3297 ResourceContent: The resolved content from the matching template.
3299 Raises:
3300 ResourceNotFoundError: If no matching template is found.
3301 ResourceError: For other template resolution errors.
3302 NotImplementedError: If a binary template resource is encountered.
3303 """
3304 # Find matching template # DRT BREAKPOINT
3305 template = None
3306 if not self._template_cache:
3307 logger.info("_template_cache is empty, fetching exisitng resource templates")
3308 resource_templates = await self.list_resource_templates(db=db, include_inactive=include_inactive)
3309 for i in resource_templates:
3310 self._template_cache[i.name] = i
3311 for cached in self._template_cache.values():
3312 if self._uri_matches_template(uri, cached.uri_template):
3313 template = cached
3314 break
3316 if template:
3317 check_inactivity = db.execute(select(DbResource).where(DbResource.id == str(template.id)).where(not_(DbResource.enabled))).scalar_one_or_none()
3318 if check_inactivity:
3319 raise ResourceNotFoundError(f"Resource '{template.id}' exists but is inactive")
3320 else:
3321 raise ResourceNotFoundError(f"No template matches URI: {uri}")
3323 try:
3324 # Extract parameters
3325 params = self._extract_template_params(uri, template.uri_template)
3326 # Generate content
3327 if template.mime_type and template.mime_type.startswith("text/"):
3328 content = template.uri_template.format(**params)
3329 return ResourceContent(type="resource", id=str(template.id) or None, uri=template.uri_template or None, mime_type=template.mime_type or None, text=content)
3330 # # Handle binary template
3331 raise NotImplementedError("Binary resource templates not yet supported")
3333 except ResourceNotFoundError:
3334 raise
3335 except Exception as e:
3336 raise ResourceError(f"Failed to process template: {str(e)}") from e
3338 @staticmethod
3339 @lru_cache(maxsize=256)
3340 def _build_regex(template: str) -> re.Pattern:
3341 """
3342 Convert a URI template into a compiled regular expression.
3344 This parser supports a subset of RFC 6570–style templates for path
3345 matching. It extracts path parameters and converts them into named
3346 regex groups.
3348 Supported template features:
3349 - `{var}`
3350 A simple path parameter. Matches a single URI segment
3351 (i.e., any characters except `/`).
3352 → Translates to `(?P<var>[^/]+)`
3353 - `{var*}`
3354 A wildcard parameter. Matches one or more URI segments,
3355 including `/`.
3356 → Translates to `(?P<var>.+)`
3357 - `{?var1,var2}`
3358 Query-parameter expressions. These are ignored when building
3359 the regex for path matching and are stripped from the template.
3361 Example:
3362 Template: "files://root/{path*}/meta/{id}{?expand,debug}"
3363 Regex: r"^files://root/(?P<path>.+)/meta/(?P<id>[^/]+)$"
3365 Args:
3366 template: The URI template string containing parameter expressions.
3368 Returns:
3369 A compiled regular expression (re.Pattern) that can be used to
3370 match URIs and extract parameter values.
3372 Note:
3373 Results are cached using LRU cache (maxsize=256) to avoid
3374 recompiling the same template pattern repeatedly.
3375 """
3376 # Remove query parameter syntax for path matching
3377 template_without_query = re.sub(r"\{\?[^}]+\}", "", template)
3379 parts = re.split(r"(\{[^}]+\})", template_without_query)
3380 pattern = ""
3381 for part in parts:
3382 if part.startswith("{") and part.endswith("}"):
3383 name = part[1:-1]
3384 if name.endswith("*"):
3385 name = name[:-1]
3386 pattern += f"(?P<{name}>.+)"
3387 else:
3388 pattern += f"(?P<{name}>[^/]+)"
3389 else:
3390 pattern += re.escape(part)
3391 return re.compile(f"^{pattern}$")
3393 @staticmethod
3394 @lru_cache(maxsize=256)
3395 def _compile_parse_pattern(template: str) -> parse.Parser:
3396 """
3397 Compile a parse pattern for URI template parameter extraction.
3399 Args:
3400 template: The template pattern (e.g. "file:///{name}/{id}").
3402 Returns:
3403 Compiled parse.Parser object.
3405 Note:
3406 Results are cached using LRU cache (maxsize=256) to avoid
3407 recompiling the same template pattern repeatedly.
3408 """
3409 return parse.compile(template)
3411 def _extract_template_params(self, uri: str, template: str) -> Dict[str, str]:
3412 """
3413 Extract parameters from a URI based on a template.
3415 Args:
3416 uri: The actual URI containing parameter values.
3417 template: The template pattern (e.g. "file:///{name}/{id}").
3419 Returns:
3420 Dict of parameter names and extracted values.
3422 Note:
3423 Uses cached compiled parse patterns for better performance.
3424 """
3425 parser = self._compile_parse_pattern(template)
3426 result = parser.parse(uri)
3427 return result.named if result else {}
3429 def _uri_matches_template(self, uri: str, template: str) -> bool:
3430 """
3431 Check whether a URI matches a given template pattern.
3433 Args:
3434 uri: The URI to check.
3435 template: The template pattern.
3437 Returns:
3438 True if the URI matches the template, otherwise False.
3440 Note:
3441 Uses cached compiled regex patterns for better performance.
3442 """
3443 uri_path, _, _ = uri.partition("?")
3444 regex = self._build_regex(template)
3445 return bool(regex.match(uri_path))
3447 async def _notify_resource_added(self, resource: DbResource) -> None:
3448 """
3449 Notify subscribers of resource addition.
3451 Args:
3452 resource: Resource to add
3453 """
3454 event = {
3455 "type": "resource_added",
3456 "data": {
3457 "id": resource.id,
3458 "uri": resource.uri,
3459 "name": resource.name,
3460 "description": resource.description,
3461 "enabled": resource.enabled,
3462 "visibility": getattr(resource, "visibility", "public"),
3463 "team_id": getattr(resource, "team_id", None),
3464 "owner_email": getattr(resource, "owner_email", None),
3465 },
3466 "timestamp": datetime.now(timezone.utc).isoformat(),
3467 }
3468 await self._publish_event(event)
3470 async def _notify_resource_updated(self, resource: DbResource) -> None:
3471 """
3472 Notify subscribers of resource update.
3474 Args:
3475 resource: Resource to update
3476 """
3477 event = {
3478 "type": "resource_updated",
3479 "data": {
3480 "id": resource.id,
3481 "uri": resource.uri,
3482 "enabled": resource.enabled,
3483 "visibility": getattr(resource, "visibility", "public"),
3484 "team_id": getattr(resource, "team_id", None),
3485 "owner_email": getattr(resource, "owner_email", None),
3486 },
3487 "timestamp": datetime.now(timezone.utc).isoformat(),
3488 }
3489 await self._publish_event(event)
3491 async def _publish_event(self, event: Dict[str, Any]) -> None:
3492 """
3493 Publish event to all subscribers via the EventService.
3495 Args:
3496 event: Event to publish
3497 """
3498 await self._event_service.publish_event(event)
3500 # --- Resource templates ---
3501 async def list_resource_templates(
3502 self,
3503 db: Session,
3504 include_inactive: bool = False,
3505 user_email: Optional[str] = None,
3506 token_teams: Optional[List[str]] = None,
3507 tags: Optional[List[str]] = None,
3508 visibility: Optional[str] = None,
3509 server_id: Optional[str] = None,
3510 ) -> List[ResourceTemplate]:
3511 """
3512 List resource templates with visibility-based access control.
3514 Args:
3515 db: Database session
3516 include_inactive: Whether to include inactive templates
3517 user_email: Email of requesting user (for private visibility check)
3518 token_teams: Teams from JWT. None = admin (no filtering),
3519 [] = public-only (no owner access), [...] = team-scoped
3520 tags (Optional[List[str]]): Filter resources by tags. If provided, only resources with at least one matching tag will be returned.
3521 visibility (Optional[str]): Filter by visibility (private, team, public).
3522 server_id (Optional[str]): Filter by server ID. If provided, only templates associated with this server will be returned.
3524 Returns:
3525 List of ResourceTemplate objects the user has access to
3527 Examples:
3528 >>> from mcpgateway.services.resource_service import ResourceService
3529 >>> from unittest.mock import MagicMock, patch
3530 >>> service = ResourceService()
3531 >>> db = MagicMock()
3532 >>> template_obj = MagicMock()
3533 >>> db.execute.return_value.scalars.return_value.all.return_value = [template_obj]
3534 >>> with patch('mcpgateway.services.resource_service.ResourceTemplate') as MockResourceTemplate:
3535 ... MockResourceTemplate.model_validate.return_value = 'resource_template'
3536 ... import asyncio
3537 ... result = asyncio.run(service.list_resource_templates(db))
3538 ... result == ['resource_template']
3539 True
3540 """
3541 query = select(DbResource).where(DbResource.uri_template.isnot(None))
3543 # Filter by server_id if provided (same pattern as list_server_resources)
3544 if server_id:
3545 query = query.join(server_resource_association, DbResource.id == server_resource_association.c.resource_id).where(server_resource_association.c.server_id == server_id)
3547 if not include_inactive:
3548 query = query.where(DbResource.enabled)
3550 # Apply visibility filtering when token_teams is set (non-admin access)
3551 if token_teams is not None:
3552 # Check if this is a public-only token (empty teams array)
3553 # Public-only tokens can ONLY see public templates - no owner access
3554 is_public_only_token = len(token_teams) == 0
3556 conditions = [DbResource.visibility == "public"]
3558 # Only include owner access for non-public-only tokens with user_email
3559 if not is_public_only_token and user_email:
3560 conditions.append(DbResource.owner_email == user_email)
3562 if token_teams:
3563 conditions.append(and_(DbResource.team_id.in_(token_teams), DbResource.visibility.in_(["team", "public"])))
3565 query = query.where(or_(*conditions))
3567 # Cursor-based pagination logic can be implemented here in the future.
3568 if visibility:
3569 query = query.where(DbResource.visibility == visibility)
3571 if tags:
3572 query = query.where(json_contains_tag_expr(db, DbResource.tags, tags, match_any=True))
3574 templates = db.execute(query).scalars().all()
3575 result = [ResourceTemplate.model_validate(t) for t in templates]
3576 return result
3578 # --- Metrics ---
3579 async def aggregate_metrics(self, db: Session) -> ResourceMetrics:
3580 """
3581 Aggregate metrics for all resource invocations across all resources.
3583 Combines recent raw metrics (within retention period) with historical
3584 hourly rollups for complete historical coverage. Uses in-memory caching
3585 (10s TTL) to reduce database load under high request rates.
3587 Args:
3588 db: Database session
3590 Returns:
3591 ResourceMetrics: Aggregated metrics from raw + hourly rollup tables.
3593 Examples:
3594 >>> from mcpgateway.services.resource_service import ResourceService
3595 >>> service = ResourceService()
3596 >>> # Method exists and is callable
3597 >>> callable(service.aggregate_metrics)
3598 True
3599 """
3600 # Check cache first (if enabled)
3601 # First-Party
3602 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel
3604 if is_cache_enabled():
3605 cached = metrics_cache.get("resources")
3606 if cached is not None:
3607 return ResourceMetrics(**cached)
3609 # Use combined raw + rollup query for full historical coverage
3610 # First-Party
3611 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel
3613 result = aggregate_metrics_combined(db, "resource")
3615 metrics = ResourceMetrics(
3616 total_executions=result.total_executions,
3617 successful_executions=result.successful_executions,
3618 failed_executions=result.failed_executions,
3619 failure_rate=result.failure_rate,
3620 min_response_time=result.min_response_time,
3621 max_response_time=result.max_response_time,
3622 avg_response_time=result.avg_response_time,
3623 last_execution_time=result.last_execution_time,
3624 )
3626 # Cache the result as dict for serialization compatibility (if enabled)
3627 if is_cache_enabled():
3628 metrics_cache.set("resources", metrics.model_dump())
3630 return metrics
3632 async def reset_metrics(self, db: Session) -> None:
3633 """
3634 Reset all resource metrics by deleting raw and hourly rollup records.
3636 Args:
3637 db: Database session
3639 Examples:
3640 >>> from mcpgateway.services.resource_service import ResourceService
3641 >>> from unittest.mock import MagicMock
3642 >>> service = ResourceService()
3643 >>> db = MagicMock()
3644 >>> db.execute = MagicMock()
3645 >>> db.commit = MagicMock()
3646 >>> import asyncio
3647 >>> asyncio.run(service.reset_metrics(db))
3648 """
3649 db.execute(delete(ResourceMetric))
3650 db.execute(delete(ResourceMetricsHourly))
3651 db.commit()
3653 # Invalidate metrics cache
3654 # First-Party
3655 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
3657 metrics_cache.invalidate("resources")
3658 metrics_cache.invalidate_prefix("top_resources:")
3661# Lazy singleton - created on first access, not at module import time.
3662# This avoids instantiation when only exception classes are imported.
3663_resource_service_instance = None # pylint: disable=invalid-name
3666def __getattr__(name: str):
3667 """Module-level __getattr__ for lazy singleton creation.
3669 Args:
3670 name: The attribute name being accessed.
3672 Returns:
3673 The resource_service singleton instance if name is "resource_service".
3675 Raises:
3676 AttributeError: If the attribute name is not "resource_service".
3677 """
3678 global _resource_service_instance # pylint: disable=global-statement
3679 if name == "resource_service":
3680 if _resource_service_instance is None:
3681 _resource_service_instance = ResourceService()
3682 return _resource_service_instance
3683 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")