Coverage for mcpgateway / main.py: 99%
2855 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2# pylint: disable=wrong-import-position, import-outside-toplevel, no-name-in-module
3"""Location: ./mcpgateway/main.py
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
6Authors: Mihai Criveti
8MCP Gateway - Main FastAPI Application.
10This module defines the core FastAPI application for the Model Context Protocol (MCP) Gateway.
11It serves as the entry point for handling all HTTP and WebSocket traffic.
13Features and Responsibilities:
14- Initializes and orchestrates services for tools, resources, prompts, servers, gateways, and roots.
15- Supports full MCP protocol operations: initialize, ping, notify, complete, and sample.
16- Integrates authentication (JWT and basic), CORS, caching, and middleware.
17- Serves a rich Admin UI for managing gateway entities via HTMX-based frontend.
18- Exposes routes for JSON-RPC, SSE, and WebSocket transports.
19- Manages application lifecycle including startup and graceful shutdown of all services.
21Structure:
22- Declares routers for MCP protocol operations and administration.
23- Registers dependencies (e.g., DB sessions, auth handlers).
24- Applies middleware including custom documentation protection.
25- Configures resource caching and session registry using pluggable backends.
26- Provides OpenAPI metadata and redirect handling depending on UI feature flags.
27"""
29# Standard
30import asyncio
31from contextlib import asynccontextmanager, suppress
32from datetime import datetime, timezone
33from functools import lru_cache
34import hashlib
35import os as _os # local alias to avoid collisions
36import sys
37from typing import Any, AsyncIterator, Dict, List, Optional, Union
38from urllib.parse import urlparse, urlunparse
39import uuid
40import warnings
42# Third-Party
43from fastapi import APIRouter, Body, Depends, FastAPI, HTTPException, Query, Request, status, WebSocket, WebSocketDisconnect
44from fastapi.background import BackgroundTasks
45from fastapi.exception_handlers import request_validation_exception_handler as fastapi_default_validation_handler
46from fastapi.exceptions import RequestValidationError
47from fastapi.middleware.cors import CORSMiddleware
48from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse
49from fastapi.staticfiles import StaticFiles
50from fastapi.templating import Jinja2Templates
51from jinja2 import Environment, FileSystemLoader
52from jsonpath_ng.ext import parse
53from jsonpath_ng.jsonpath import JSONPath
54import orjson
55from pydantic import ValidationError
56from sqlalchemy import text
57from sqlalchemy.exc import IntegrityError
58from sqlalchemy.orm import Session
59from starlette.middleware.base import BaseHTTPMiddleware
60from starlette.requests import Request as starletteRequest
61from starlette.responses import Response as starletteResponse
62from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
64# First-Party
65from mcpgateway import __version__
66from mcpgateway.admin import admin_router, set_logging_service
67from mcpgateway.auth import _check_token_revoked_sync, _lookup_api_token_sync, get_current_user, get_user_team_roles, normalize_token_teams
68from mcpgateway.bootstrap_db import main as bootstrap_db
69from mcpgateway.cache import ResourceCache, SessionRegistry
70from mcpgateway.common.models import InitializeResult
71from mcpgateway.common.models import JSONRPCError as PydanticJSONRPCError
72from mcpgateway.common.models import ListResourceTemplatesResult, LogLevel, Root
73from mcpgateway.config import settings
74from mcpgateway.db import refresh_slugs_on_startup, SessionLocal
75from mcpgateway.db import Tool as DbTool
76from mcpgateway.handlers.sampling import SamplingHandler
77from mcpgateway.middleware.compression import SSEAwareCompressMiddleware
78from mcpgateway.middleware.correlation_id import CorrelationIDMiddleware
79from mcpgateway.middleware.http_auth_middleware import HttpAuthMiddleware
80from mcpgateway.middleware.protocol_version import MCPProtocolVersionMiddleware
81from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission
82from mcpgateway.middleware.request_logging_middleware import RequestLoggingMiddleware
83from mcpgateway.middleware.security_headers import SecurityHeadersMiddleware
84from mcpgateway.middleware.token_scoping import token_scoping_middleware
85from mcpgateway.middleware.validation_middleware import ValidationMiddleware
86from mcpgateway.observability import init_telemetry
87from mcpgateway.plugins.framework import PluginError, PluginManager, PluginViolationError
88from mcpgateway.routers.server_well_known import router as server_well_known_router
89from mcpgateway.routers.well_known import router as well_known_router
90from mcpgateway.schemas import (
91 A2AAgentCreate,
92 A2AAgentRead,
93 A2AAgentUpdate,
94 CursorPaginatedA2AAgentsResponse,
95 CursorPaginatedGatewaysResponse,
96 CursorPaginatedPromptsResponse,
97 CursorPaginatedResourcesResponse,
98 CursorPaginatedServersResponse,
99 CursorPaginatedToolsResponse,
100 GatewayCreate,
101 GatewayRead,
102 GatewayRefreshResponse,
103 GatewayUpdate,
104 JsonPathModifier,
105 PromptCreate,
106 PromptExecuteArgs,
107 PromptRead,
108 PromptUpdate,
109 ResourceCreate,
110 ResourceRead,
111 ResourceSubscription,
112 ResourceUpdate,
113 RPCRequest,
114 ServerCreate,
115 ServerRead,
116 ServerUpdate,
117 TaggedEntity,
118 TagInfo,
119 ToolCreate,
120 ToolRead,
121 ToolUpdate,
122)
123from mcpgateway.services.a2a_service import A2AAgentError, A2AAgentNameConflictError, A2AAgentNotFoundError, A2AAgentService
124from mcpgateway.services.cancellation_service import cancellation_service
125from mcpgateway.services.completion_service import CompletionService
126from mcpgateway.services.email_auth_service import EmailAuthService
127from mcpgateway.services.export_service import ExportError, ExportService
128from mcpgateway.services.gateway_service import GatewayConnectionError, GatewayDuplicateConflictError, GatewayError, GatewayNameConflictError, GatewayNotFoundError
129from mcpgateway.services.import_service import ConflictStrategy, ImportConflictError
130from mcpgateway.services.import_service import ImportError as ImportServiceError
131from mcpgateway.services.import_service import ImportService, ImportValidationError
132from mcpgateway.services.log_aggregator import get_log_aggregator
133from mcpgateway.services.logging_service import LoggingService
134from mcpgateway.services.metrics import setup_metrics
135from mcpgateway.services.permission_service import PermissionService
136from mcpgateway.services.prompt_service import PromptError, PromptLockConflictError, PromptNameConflictError, PromptNotFoundError
137from mcpgateway.services.resource_service import ResourceError, ResourceLockConflictError, ResourceNotFoundError, ResourceURIConflictError
138from mcpgateway.services.server_service import ServerError, ServerLockConflictError, ServerNameConflictError, ServerNotFoundError
139from mcpgateway.services.tag_service import TagService
140from mcpgateway.services.tool_service import ToolError, ToolLockConflictError, ToolNameConflictError, ToolNotFoundError
141from mcpgateway.transports.sse_transport import SSETransport
142from mcpgateway.transports.streamablehttp_transport import SessionManagerWrapper, streamable_http_auth
143from mcpgateway.utils.db_isready import wait_for_db_ready
144from mcpgateway.utils.error_formatter import ErrorFormatter
145from mcpgateway.utils.metadata_capture import MetadataCapture
146from mcpgateway.utils.orjson_response import ORJSONResponse
147from mcpgateway.utils.passthrough_headers import set_global_passthrough_headers
148from mcpgateway.utils.redis_client import close_redis_client, get_redis_client
149from mcpgateway.utils.redis_isready import wait_for_redis_ready
150from mcpgateway.utils.retry_manager import ResilientHttpClient
151from mcpgateway.utils.verify_credentials import require_docs_auth_override, verify_jwt_token
152from mcpgateway.validation.jsonrpc import JSONRPCError
154# Import the admin routes from the new module
155from mcpgateway.version import router as version_router
157# Initialize logging service first
158logging_service = LoggingService()
159logger = logging_service.get_logger("mcpgateway")
161# Share the logging service with admin module
162set_logging_service(logging_service)
164# Note: Logging configuration is handled by LoggingService during startup
165# Don't use basicConfig here as it conflicts with our dual logging setup
167# Wait for database to be ready before creating tables
168wait_for_db_ready(max_tries=int(settings.db_max_retries), interval=int(settings.db_retry_interval_ms) / 1000, sync=True) # Converting ms to s
170# Create database tables
171try:
172 loop = asyncio.get_running_loop()
173except RuntimeError:
174 asyncio.run(bootstrap_db())
175else:
176 loop.create_task(bootstrap_db())
178# Initialize plugin manager as a singleton (honor env overrides for tests)
179_env_flag = _os.getenv("PLUGINS_ENABLED")
180if _env_flag is not None:
181 _env_enabled = _env_flag.strip().lower() in {"1", "true", "yes", "on"}
182 _PLUGINS_ENABLED = _env_enabled
183else:
184 _PLUGINS_ENABLED = settings.plugins_enabled
185_config_file = _os.getenv("PLUGIN_CONFIG_FILE", settings.plugin_config_file)
186plugin_manager: PluginManager | None = PluginManager(_config_file) if _PLUGINS_ENABLED else None
189# First-Party
190# First-Party - import module-level service singletons
191from mcpgateway.services.gateway_service import gateway_service # noqa: E402
192from mcpgateway.services.prompt_service import prompt_service # noqa: E402
193from mcpgateway.services.resource_service import resource_service # noqa: E402
194from mcpgateway.services.root_service import root_service, RootServiceNotFoundError # noqa: E402
195from mcpgateway.services.server_service import server_service # noqa: E402
196from mcpgateway.services.tool_service import tool_service # noqa: E402
198# Services that do not expose module-level singletons are instantiated here
199completion_service = CompletionService()
200sampling_handler = SamplingHandler()
201tag_service = TagService()
202export_service = ExportService()
203import_service = ImportService()
204# Initialize A2A service only if A2A features are enabled
205a2a_service = A2AAgentService() if settings.mcpgateway_a2a_enabled else None
207# Initialize session manager for Streamable HTTP transport
208streamable_http_session = SessionManagerWrapper()
210# Wait for redis to be ready
211if settings.cache_type == "redis" and settings.redis_url is not None:
212 wait_for_redis_ready(redis_url=settings.redis_url, max_retries=int(settings.redis_max_retries), retry_interval_ms=int(settings.redis_retry_interval_ms), sync=True)
214# Initialize session registry
215session_registry = SessionRegistry(
216 backend=settings.cache_type,
217 redis_url=settings.redis_url if settings.cache_type == "redis" else None,
218 database_url=settings.database_url if settings.cache_type == "database" else None,
219 session_ttl=settings.session_ttl,
220 message_ttl=settings.message_ttl,
221)
224# Helper function for authentication compatibility
225def get_user_email(user):
226 """Extract email from user object, handling both string and dict formats.
228 Args:
229 user: User object, can be either a dict (new RBAC format) or string (legacy format)
231 Returns:
232 str: User email address or 'unknown' if not available
234 Examples:
235 Test with dictionary user containing email:
236 >>> from mcpgateway import main
237 >>> user_dict = {'email': 'alice@example.com', 'role': 'admin'}
238 >>> main.get_user_email(user_dict)
239 'alice@example.com'
241 Test with dictionary user containing sub (JWT standard claim):
242 >>> user_dict_sub = {'sub': 'bob@example.com', 'role': 'user'}
243 >>> main.get_user_email(user_dict_sub)
244 'bob@example.com'
246 Test with dictionary user containing both email and sub (email takes precedence):
247 >>> user_dict_both = {'email': 'alice@example.com', 'sub': 'bob@example.com'}
248 >>> main.get_user_email(user_dict_both)
249 'alice@example.com'
251 Test with dictionary user without email or sub:
252 >>> user_dict_no_email = {'username': 'charlie', 'role': 'user'}
253 >>> main.get_user_email(user_dict_no_email)
254 'unknown'
256 Test with string user (legacy format):
257 >>> user_string = 'charlie@company.com'
258 >>> main.get_user_email(user_string)
259 'charlie@company.com'
261 Test with None user:
262 >>> main.get_user_email(None)
263 'unknown'
265 Test with empty dictionary:
266 >>> main.get_user_email({})
267 'unknown'
269 Test with integer (non-string, non-dict):
270 >>> main.get_user_email(123)
271 '123'
273 Test with user object having various data types:
274 >>> user_complex = {'email': 'david@test.org', 'id': 456, 'active': True}
275 >>> main.get_user_email(user_complex)
276 'david@test.org'
278 Test with empty string user:
279 >>> main.get_user_email('')
280 'unknown'
282 Test with boolean user:
283 >>> main.get_user_email(True)
284 'True'
285 >>> main.get_user_email(False)
286 'unknown'
287 """
288 if isinstance(user, dict):
289 # First try 'email', then 'sub' (JWT standard claim)
290 return user.get("email") or user.get("sub") or "unknown"
291 return str(user) if user else "unknown"
294def _normalize_token_teams(teams: Optional[List]) -> List[str]:
295 """
296 Normalize token teams to list of team IDs.
298 SSO tokens may contain team dicts like {"id": "...", "name": "..."}.
299 This normalizes to just IDs for consistent filtering.
301 Args:
302 teams: Raw teams from token payload (may be None, list of IDs, or list of dicts)
304 Returns:
305 List of team ID strings (empty list if None)
307 Examples:
308 >>> from mcpgateway import main
309 >>> main._normalize_token_teams(None)
310 []
311 >>> main._normalize_token_teams([])
312 []
313 >>> main._normalize_token_teams(["team_a", "team_b"])
314 ['team_a', 'team_b']
315 >>> main._normalize_token_teams([{"id": "team_a", "name": "Team A"}])
316 ['team_a']
317 >>> main._normalize_token_teams([{"id": "t1"}, "t2", {"name": "no_id"}])
318 ['t1', 't2']
319 """
320 if not teams:
321 return []
323 normalized = []
324 for team in teams:
325 if isinstance(team, dict):
326 team_id = team.get("id")
327 if team_id:
328 normalized.append(team_id)
329 elif isinstance(team, str): 329 ↛ 324line 329 didn't jump to line 324 because the condition on line 329 was always true
330 normalized.append(team)
331 return normalized
334def _get_token_teams_from_request(request: Request) -> Optional[List[str]]:
335 """
336 Extract and normalize teams from verified JWT token.
338 SECURITY: Uses normalize_token_teams for consistent secure-first semantics:
339 - teams key missing → [] (public-only, secure default)
340 - teams key null + is_admin=true → None (admin bypass)
341 - teams key null + is_admin=false → [] (public-only)
342 - teams key [] → [] (explicit public-only)
343 - teams key [...] → normalized list of string IDs
345 First checks request.state.token_teams (set by auth.py), then falls back
346 to calling normalize_token_teams on the JWT payload.
348 Args:
349 request: FastAPI request object
351 Returns:
352 None for admin bypass, [] for public-only, or list of normalized team ID strings.
354 Examples:
355 >>> from mcpgateway import main
356 >>> from unittest.mock import MagicMock
357 >>> req = MagicMock()
358 >>> req.state = MagicMock()
359 >>> req.state.token_teams = ["team_a"] # Already normalized by auth.py
360 >>> main._get_token_teams_from_request(req)
361 ['team_a']
362 >>> req.state.token_teams = [] # Public-only
363 >>> main._get_token_teams_from_request(req)
364 []
365 """
366 # SECURITY: First check request.state.token_teams (already normalized by auth.py)
367 # This is the preferred path as auth.py has already applied normalize_token_teams
368 # Use getattr with a sentinel to distinguish "not set" from "set to None"
369 _not_set = object()
370 token_teams = getattr(request.state, "token_teams", _not_set)
371 if token_teams is not _not_set and (token_teams is None or isinstance(token_teams, list)):
372 return token_teams
374 # Fallback: Use cached verified payload and call normalize_token_teams
375 cached = getattr(request.state, "_jwt_verified_payload", None)
376 if cached and isinstance(cached, tuple) and len(cached) == 2:
377 _, payload = cached
378 if payload:
379 # Use normalize_token_teams for consistent secure-first semantics
380 return normalize_token_teams(payload)
382 # No JWT payload - return [] for public-only (secure default)
383 return []
386def _get_rpc_filter_context(request: Request, user) -> tuple:
387 """
388 Extract user_email, token_teams, and is_admin for RPC filtering.
390 Args:
391 request: FastAPI request object
392 user: User object from auth dependency
394 Returns:
395 Tuple of (user_email, token_teams, is_admin)
397 Examples:
398 >>> from mcpgateway import main
399 >>> from unittest.mock import MagicMock
400 >>> req = MagicMock()
401 >>> req.state = MagicMock()
402 >>> req.state._jwt_verified_payload = ("token", {"teams": ["t1"], "is_admin": True})
403 >>> user = {"email": "test@x.com", "is_admin": True} # User's is_admin is ignored
404 >>> email, teams, is_admin = main._get_rpc_filter_context(req, user)
405 >>> email
406 'test@x.com'
407 >>> teams
408 ['t1']
409 >>> is_admin # From token payload, not user dict
410 True
411 """
412 # Get user email
413 if hasattr(user, "email"):
414 user_email = getattr(user, "email", None)
415 elif isinstance(user, dict):
416 user_email = user.get("sub") or user.get("email")
417 else:
418 user_email = str(user) if user else None
420 # Get normalized teams from verified token
421 token_teams = _get_token_teams_from_request(request)
423 # Check if user is admin - MUST come from token, not DB user
424 # This ensures that tokens with restricted scope (empty teams) don't inherit admin bypass
425 is_admin = False
426 cached = getattr(request.state, "_jwt_verified_payload", None)
427 if cached and isinstance(cached, tuple) and len(cached) == 2:
428 _, payload = cached
429 if payload: 429 ↛ 435line 429 didn't jump to line 435 because the condition on line 429 was always true
430 # Check both top-level is_admin and nested user.is_admin in token
431 is_admin = payload.get("is_admin", False) or payload.get("user", {}).get("is_admin", False)
433 # If token has empty teams array (public-only token), admin bypass is disabled
434 # This allows admins to create properly scoped tokens for restricted access
435 if token_teams is not None and len(token_teams) == 0:
436 is_admin = False
438 return user_email, token_teams, is_admin
441# Initialize cache
442resource_cache = ResourceCache(max_size=settings.resource_cache_size, ttl=settings.resource_cache_ttl)
445@lru_cache(maxsize=512)
446def _parse_jsonpath(jsonpath: str) -> JSONPath:
447 """Cache parsed JSONPath expression.
449 Args:
450 jsonpath: The JSONPath expression string.
452 Returns:
453 Parsed JSONPath object.
455 Raises:
456 Exception: If the JSONPath expression is invalid.
457 """
458 return parse(jsonpath)
461def jsonpath_modifier(data: Any, jsonpath: str = "$[*]", mappings: Optional[Dict[str, str]] = None) -> Union[List, Dict]:
462 """
463 Applies the given JSONPath expression and mappings to the data.
464 Uses cached parsed expressions for performance.
466 Args:
467 data: The JSON data to query.
468 jsonpath: The JSONPath expression to apply.
469 mappings: Optional dictionary of mappings where keys are new field names
470 and values are JSONPath expressions.
472 Returns:
473 Union[List, Dict]: A list (or mapped list) or a Dict of extracted data.
475 Raises:
476 HTTPException: If there's an error parsing or executing the JSONPath expressions.
478 Examples:
479 >>> jsonpath_modifier({'a': 1, 'b': 2}, '$.a')
480 [1]
481 >>> jsonpath_modifier([{'a': 1}, {'a': 2}], '$[*].a')
482 [1, 2]
483 >>> jsonpath_modifier({'a': {'b': 2}}, '$.a.b')
484 [2]
485 >>> jsonpath_modifier({'a': 1}, '$.b')
486 []
487 """
488 if not jsonpath:
489 jsonpath = "$[*]"
491 try:
492 main_expr: JSONPath = _parse_jsonpath(jsonpath)
493 except Exception as e:
494 raise HTTPException(status_code=400, detail=f"Invalid main JSONPath expression: {e}")
496 try:
497 main_matches = main_expr.find(data)
498 except Exception as e:
499 raise HTTPException(status_code=400, detail=f"Error executing main JSONPath: {e}")
501 results = [match.value for match in main_matches]
503 if mappings:
504 results = transform_data_with_mappings(results, mappings)
506 if len(results) == 1 and isinstance(results[0], dict):
507 return results[0]
509 return results
512def transform_data_with_mappings(data: list[Any], mappings: dict[str, str]) -> list[Any]:
513 """
514 Applies mappings to data using cached JSONPath expressions.
515 Parses each mapping expression once per call, not per item.
517 Args:
518 data: The set of data to apply mappings to.
519 mappings: dictionary of mappings where keys are new field names
521 Returns:
522 list[Any]: A list (or mapped list) of re-mapped data
524 Raises:
525 HTTPException: If there's an error parsing or executing the JSONPath expressions.
527 Examples:
528 >>> transform_data_with_mappings([{'first_name': "Bruce", 'second_name': "Wayne"},{'first_name': "Diana", 'second_name': "Prince"}], {"n": "$.first_name"})
529 [{'n': 'Bruce'}, {'n': 'Diana'}]
530 """
531 # Pre-parse all mapping expressions once (not per item)
532 parsed_mappings: Dict[str, JSONPath] = {}
533 for new_key, mapping_expr_str in mappings.items():
534 try:
535 parsed_mappings[new_key] = _parse_jsonpath(mapping_expr_str)
536 except Exception as e:
537 raise HTTPException(status_code=400, detail=f"Invalid mapping JSONPath for key '{new_key}': {e}")
539 mapped_results = []
540 for item in data:
541 mapped_item = {}
542 for new_key, mapping_expr in parsed_mappings.items():
543 try:
544 mapping_matches = mapping_expr.find(item)
545 except Exception as e:
546 raise HTTPException(status_code=400, detail=f"Error executing mapping JSONPath for key '{new_key}': {e}")
548 if not mapping_matches:
549 mapped_item[new_key] = None
550 elif len(mapping_matches) == 1:
551 mapped_item[new_key] = mapping_matches[0].value
552 else:
553 mapped_item[new_key] = [m.value for m in mapping_matches]
554 mapped_results.append(mapped_item)
556 return mapped_results
559async def attempt_to_bootstrap_sso_providers():
560 """
561 Try to bootstrap SSO provider services based on settings.
562 """
563 try:
564 # First-Party
565 from mcpgateway.utils.sso_bootstrap import bootstrap_sso_providers # pylint: disable=import-outside-toplevel
567 await bootstrap_sso_providers()
568 logger.info("SSO providers bootstrapped successfully")
569 except Exception as e:
570 logger.warning(f"Failed to bootstrap SSO providers: {e}")
573####################
574# Startup/Shutdown #
575####################
576@asynccontextmanager
577async def lifespan(_app: FastAPI) -> AsyncIterator[None]:
578 """
579 Manage the application's startup and shutdown lifecycle.
581 The function initialises every core service on entry and then
582 shuts them down in reverse order on exit.
584 Args:
585 _app (FastAPI): FastAPI app
587 Yields:
588 None
590 Raises:
591 SystemExit: When a critical startup error occurs that prevents
592 the application from starting successfully.
593 Exception: Any unhandled error that occurs during service
594 initialisation or shutdown is re-raised to the caller.
595 """
596 aggregation_stop_event: Optional[asyncio.Event] = None
597 aggregation_loop_task: Optional[asyncio.Task] = None
598 aggregation_backfill_task: Optional[asyncio.Task] = None
600 # Initialize logging service FIRST to ensure all logging goes to dual output
601 await logging_service.initialize()
602 logger.info("Starting MCP Gateway services")
604 # Initialize Redis client early (shared pool for all services)
605 await get_redis_client()
607 # Initialize shared HTTP client (connection pool for all outbound requests)
608 # First-Party
609 from mcpgateway.services.http_client_service import SharedHttpClient # pylint: disable=import-outside-toplevel
611 await SharedHttpClient.get_instance()
613 # Update HTTP pool metrics after SharedHttpClient is initialized
614 if hasattr(app.state, "update_http_pool_metrics"): 614 ↛ 619line 614 didn't jump to line 619 because the condition on line 614 was always true
615 app.state.update_http_pool_metrics()
617 # Initialize MCP session pool (for session reuse across tool invocations)
618 # Also initialize if session affinity is enabled (needs the ownership registry)
619 if settings.mcp_session_pool_enabled or settings.mcpgateway_session_affinity_enabled:
620 # First-Party
621 from mcpgateway.services.mcp_session_pool import init_mcp_session_pool # pylint: disable=import-outside-toplevel
623 # Auto-align pool health check interval to min of pool and gateway settings
624 effective_health_check_interval = min(
625 settings.health_check_interval,
626 settings.mcp_session_pool_health_check_interval,
627 )
629 max_sessions_per_key = settings.mcpgateway_session_affinity_max_sessions if settings.mcpgateway_session_affinity_enabled else settings.mcp_session_pool_max_per_key
630 init_mcp_session_pool(
631 max_sessions_per_key=max_sessions_per_key,
632 session_ttl_seconds=settings.mcp_session_pool_ttl,
633 health_check_interval_seconds=effective_health_check_interval,
634 acquire_timeout_seconds=settings.mcp_session_pool_acquire_timeout,
635 session_create_timeout_seconds=settings.mcp_session_pool_create_timeout,
636 circuit_breaker_threshold=settings.mcp_session_pool_circuit_breaker_threshold,
637 circuit_breaker_reset_seconds=settings.mcp_session_pool_circuit_breaker_reset,
638 identity_headers=frozenset(settings.mcp_session_pool_identity_headers),
639 idle_pool_eviction_seconds=settings.mcp_session_pool_idle_eviction,
640 # Use dedicated transport timeout (default 30s to match MCP SDK default).
641 # This is separate from health_check_timeout to allow long-running tool calls.
642 default_transport_timeout_seconds=settings.mcp_session_pool_transport_timeout,
643 # Configurable health check chain - ordered list of methods to try.
644 health_check_methods=settings.mcp_session_pool_health_check_methods,
645 health_check_timeout_seconds=settings.mcp_session_pool_health_check_timeout,
646 )
647 logger.info("MCP session pool initialized")
649 # Initialize LLM chat router Redis client
650 # First-Party
651 from mcpgateway.routers.llmchat_router import init_redis as init_llmchat_redis # pylint: disable=import-outside-toplevel
653 await init_llmchat_redis()
655 # Initialize observability (Phoenix tracing)
656 init_telemetry()
657 logger.info("Observability initialized")
659 try:
660 # Validate security configuration
661 validate_security_configuration()
663 if plugin_manager:
664 await plugin_manager.initialize()
665 logger.info(f"Plugin manager initialized with {plugin_manager.plugin_count} plugins")
667 if settings.enable_header_passthrough:
668 await setup_passthrough_headers()
669 else:
670 logger.info("🔒 Header Passthrough: DISABLED")
672 await tool_service.initialize()
673 await resource_service.initialize()
674 await prompt_service.initialize()
675 await gateway_service.initialize()
677 # Start notification service for event-driven refresh (after gateway_service is ready)
678 if settings.mcp_session_pool_enabled:
679 # First-Party
680 from mcpgateway.services.mcp_session_pool import start_pool_notification_service # pylint: disable=import-outside-toplevel
682 await start_pool_notification_service(gateway_service)
684 # Start RPC listener for multi-worker session affinity
685 if settings.mcpgateway_session_affinity_enabled: 685 ↛ 693line 685 didn't jump to line 693 because the condition on line 685 was always true
686 # First-Party
687 from mcpgateway.services.mcp_session_pool import get_mcp_session_pool # pylint: disable=import-outside-toplevel
689 pool = get_mcp_session_pool()
690 pool._rpc_listener_task = asyncio.create_task(pool.start_rpc_listener()) # pylint: disable=protected-access
691 logger.info("Multi-worker session affinity RPC listener started")
693 await root_service.initialize()
694 await completion_service.initialize()
695 await sampling_handler.initialize()
696 await export_service.initialize()
697 await import_service.initialize()
698 if a2a_service:
699 await a2a_service.initialize()
700 await resource_cache.initialize()
701 await streamable_http_session.initialize()
702 await session_registry.initialize()
704 # Initialize OrchestrationService for tool cancellation if enabled
705 if settings.mcpgateway_tool_cancellation_enabled:
706 await cancellation_service.initialize()
707 logger.info("Tool cancellation feature enabled")
708 else:
709 logger.info("Tool cancellation feature disabled")
711 # Initialize elicitation service
712 if settings.mcpgateway_elicitation_enabled:
713 # First-Party
714 from mcpgateway.services.elicitation_service import get_elicitation_service # pylint: disable=import-outside-toplevel
716 elicitation_service = get_elicitation_service()
717 await elicitation_service.start()
718 logger.info("Elicitation service initialized")
720 # Initialize metrics buffer service for batching metric writes
721 if settings.metrics_buffer_enabled:
722 # First-Party
723 from mcpgateway.services.metrics_buffer_service import get_metrics_buffer_service # pylint: disable=import-outside-toplevel
725 metrics_buffer_service = get_metrics_buffer_service()
726 await metrics_buffer_service.start()
727 if settings.db_metrics_recording_enabled:
728 logger.info("Metrics buffer service initialized")
729 else:
730 logger.info("Metrics buffer service initialized (recording disabled)")
732 # Initialize metrics cleanup service for automatic deletion of old metrics
733 if settings.metrics_cleanup_enabled:
734 # First-Party
735 from mcpgateway.services.metrics_cleanup_service import get_metrics_cleanup_service # pylint: disable=import-outside-toplevel
737 metrics_cleanup_service = get_metrics_cleanup_service()
738 await metrics_cleanup_service.start()
739 logger.info("Metrics cleanup service initialized (retention: %d days)", settings.metrics_retention_days)
741 # Initialize metrics rollup service for hourly aggregation
742 if settings.metrics_rollup_enabled:
743 # First-Party
744 from mcpgateway.services.metrics_rollup_service import get_metrics_rollup_service # pylint: disable=import-outside-toplevel
746 metrics_rollup_service = get_metrics_rollup_service()
747 await metrics_rollup_service.start()
748 logger.info("Metrics rollup service initialized (interval: %dh)", settings.metrics_rollup_interval_hours)
750 refresh_slugs_on_startup()
752 # Bootstrap SSO providers from environment configuration
753 if settings.sso_enabled:
754 await attempt_to_bootstrap_sso_providers()
756 logger.info("All services initialized successfully")
758 # Start cache invalidation subscriber for cross-worker cache synchronization
759 # First-Party
760 from mcpgateway.cache.registry_cache import get_cache_invalidation_subscriber # pylint: disable=import-outside-toplevel
762 cache_invalidation_subscriber = get_cache_invalidation_subscriber()
763 await cache_invalidation_subscriber.start()
765 # Reconfigure uvicorn loggers after startup to capture access logs in dual output
766 logging_service.configure_uvicorn_after_startup()
768 if settings.metrics_aggregation_enabled and settings.metrics_aggregation_auto_start:
769 aggregation_stop_event = asyncio.Event()
770 log_aggregator = get_log_aggregator()
772 async def run_log_backfill() -> None:
773 """Backfill log aggregation metrics for configured hours."""
774 hours = getattr(settings, "metrics_aggregation_backfill_hours", 0)
775 if hours <= 0:
776 return
777 try:
778 await asyncio.to_thread(log_aggregator.backfill, hours)
779 logger.info("Log aggregation backfill completed for last %s hour(s)", hours)
780 except Exception as backfill_error: # pragma: no cover - defensive logging
781 logger.warning("Log aggregation backfill failed: %s", backfill_error)
783 async def run_log_aggregation_loop() -> None:
784 """Run continuous log aggregation at configured intervals.
786 Raises:
787 asyncio.CancelledError: When aggregation is stopped
788 """
789 interval_seconds = max(1, int(settings.metrics_aggregation_window_minutes)) * 60
790 logger.info(
791 "Starting log aggregation loop (window=%s min)",
792 log_aggregator.aggregation_window_minutes,
793 )
794 try:
795 while not aggregation_stop_event.is_set():
796 try:
797 await asyncio.to_thread(log_aggregator.aggregate_all_components)
798 except Exception as agg_error: # pragma: no cover - defensive logging
799 logger.warning("Log aggregation loop iteration failed: %s", agg_error)
801 try:
802 await asyncio.wait_for(aggregation_stop_event.wait(), timeout=interval_seconds)
803 except asyncio.TimeoutError:
804 continue
805 except asyncio.CancelledError:
806 logger.debug("Log aggregation loop cancelled")
807 raise
808 finally:
809 logger.info("Log aggregation loop stopped")
811 aggregation_backfill_task = asyncio.create_task(run_log_backfill())
812 aggregation_loop_task = asyncio.create_task(run_log_aggregation_loop())
813 elif settings.metrics_aggregation_enabled:
814 logger.info("Metrics aggregation auto-start disabled; performance metrics will be generated on-demand when requested.")
816 yield
817 except Exception as e:
818 logger.error(f"Error during startup: {str(e)}")
819 # For plugin errors, exit cleanly without stack trace spam
820 if "Plugin initialization failed" in str(e):
821 # Suppress uvicorn error logging for clean exit
822 # Standard
823 import logging # pylint: disable=import-outside-toplevel
825 logging.getLogger("uvicorn.error").setLevel(logging.CRITICAL)
826 raise SystemExit(1)
827 raise
828 finally:
829 if aggregation_stop_event is not None:
830 aggregation_stop_event.set()
831 for task in (aggregation_backfill_task, aggregation_loop_task):
832 if task:
833 task.cancel()
834 with suppress(asyncio.CancelledError):
835 await task
837 # Shutdown plugin manager
838 if plugin_manager:
839 try:
840 await plugin_manager.shutdown()
841 logger.info("Plugin manager shutdown complete")
842 except Exception as e:
843 logger.error(f"Error shutting down plugin manager: {str(e)}")
845 # Stop cache invalidation subscriber
846 try:
847 # First-Party
848 from mcpgateway.cache.registry_cache import get_cache_invalidation_subscriber # pylint: disable=import-outside-toplevel
850 cache_invalidation_subscriber = get_cache_invalidation_subscriber()
851 await cache_invalidation_subscriber.stop()
852 except Exception as e:
853 logger.debug(f"Error stopping cache invalidation subscriber: {e}")
855 logger.info("Shutting down MCP Gateway services")
856 # await stop_streamablehttp()
857 # Build service list conditionally
858 services_to_shutdown: List[Any] = [
859 resource_cache,
860 sampling_handler,
861 import_service,
862 export_service,
863 logging_service,
864 completion_service,
865 root_service,
866 gateway_service,
867 prompt_service,
868 resource_service,
869 tool_service,
870 streamable_http_session,
871 session_registry,
872 ]
874 # Add cancellation service if enabled
875 if settings.mcpgateway_tool_cancellation_enabled:
876 services_to_shutdown.insert(0, cancellation_service) # Shutdown early to stop accepting new cancellations
878 if a2a_service:
879 services_to_shutdown.insert(4, a2a_service) # Insert after export_service
881 # Add elicitation service if enabled
882 if settings.mcpgateway_elicitation_enabled:
883 # First-Party
884 from mcpgateway.services.elicitation_service import get_elicitation_service # pylint: disable=import-outside-toplevel
886 elicitation_service = get_elicitation_service()
887 services_to_shutdown.insert(5, elicitation_service)
889 # Add metrics buffer service if enabled (flush remaining metrics before shutdown)
890 if settings.metrics_buffer_enabled:
891 # First-Party
892 from mcpgateway.services.metrics_buffer_service import get_metrics_buffer_service # pylint: disable=import-outside-toplevel
894 metrics_buffer_service = get_metrics_buffer_service()
895 services_to_shutdown.insert(0, metrics_buffer_service) # Shutdown first to flush metrics
897 # Add metrics rollup service if enabled (shutdown before cleanup)
898 if settings.metrics_rollup_enabled:
899 # First-Party
900 from mcpgateway.services.metrics_rollup_service import get_metrics_rollup_service # pylint: disable=import-outside-toplevel
902 metrics_rollup_service = get_metrics_rollup_service()
903 services_to_shutdown.insert(1, metrics_rollup_service)
905 # Add metrics cleanup service if enabled
906 if settings.metrics_cleanup_enabled:
907 # First-Party
908 from mcpgateway.services.metrics_cleanup_service import get_metrics_cleanup_service # pylint: disable=import-outside-toplevel
910 metrics_cleanup_service = get_metrics_cleanup_service()
911 services_to_shutdown.insert(2, metrics_cleanup_service)
913 await shutdown_services(services_to_shutdown)
915 # Shutdown MCP session pool (before shared HTTP client)
916 if settings.mcp_session_pool_enabled:
917 # First-Party
918 from mcpgateway.services.mcp_session_pool import close_mcp_session_pool # pylint: disable=import-outside-toplevel
920 await close_mcp_session_pool()
922 # Shutdown shared HTTP client (after services, before Redis)
923 await SharedHttpClient.shutdown()
925 # Close Redis client last (after all services that use it)
926 await close_redis_client()
928 logger.info("Shutdown complete")
931async def shutdown_services(services_to_shutdown: list[Any]):
932 """
933 Awaits shutdown of services provided in a list
935 Args:
936 services_to_shutdown (list[Any]): list of services to shutdown
937 """
938 for service in services_to_shutdown:
939 try:
940 await service.shutdown()
941 except Exception as e:
942 logger.error(f"Error shutting down {service.__class__.__name__}: {str(e)}")
945async def setup_passthrough_headers():
946 """
947 Enables configuration and logs active settings as needed for when passthrough headers are enabled.
948 """
949 logger.info(f"🔄 Header Passthrough: ENABLED (default headers: {settings.default_passthrough_headers})")
950 if settings.enable_overwrite_base_headers:
951 logger.warning("⚠️ Base Header Override: ENABLED - Client headers can override gateway headers")
952 else:
953 logger.info("🔒 Base Header Override: DISABLED - Gateway headers take precedence")
954 db_gen = get_db()
955 db = next(db_gen) # pylint: disable=stop-iteration-return
956 try:
957 await set_global_passthrough_headers(db)
958 finally:
959 db.commit() # End transaction cleanly
960 db.close()
963# Initialize FastAPI app with orjson for 2-3x faster JSON serialization
964app = FastAPI(
965 title=settings.app_name,
966 version=__version__,
967 description="A FastAPI-based MCP Gateway with federation support",
968 root_path=settings.app_root_path,
969 lifespan=lifespan,
970 default_response_class=ORJSONResponse, # Use orjson for high-performance JSON serialization
971)
973# Setup metrics instrumentation
974setup_metrics(app)
977def validate_security_configuration():
978 """
979 Validate security configuration on startup.
980 This function encapsulates:
981 - verifying the configuration,
982 - logging the output for warnings,
983 - critical issues
984 - security recommendations
986 Args: None
987 Raises: Passthrough Errors/Exceptions but doesn't raise any of its own.
988 """
989 logger.info("🔒 Validating security configuration...")
991 # Get security status
992 security_status: settings.SecurityStatus = settings.get_security_status()
993 security_warnings = security_status["warnings"]
995 log_security_warnings(security_warnings)
997 # Critical security checks (fail startup only if REQUIRE_STRONG_SECRETS=true)
998 critical_issues = []
1000 if settings.jwt_secret_key == "my-test-key" and not settings.dev_mode: # nosec B105 - checking for default value
1001 critical_issues.append("Using default JWT secret in non-dev mode. Set JWT_SECRET_KEY environment variable!")
1003 if settings.basic_auth_password.get_secret_value() == "changeme" and settings.mcpgateway_ui_enabled: # nosec B105 - checking for default value 1003 ↛ 1006line 1003 didn't jump to line 1006 because the condition on line 1003 was always true
1004 critical_issues.append("Admin UI enabled with default password. Set BASIC_AUTH_PASSWORD environment variable!")
1006 log_critical_issues(critical_issues)
1008 # Warn about ephemeral storage without strict user-in-DB mode
1009 if not getattr(settings, "require_user_in_db", False): 1009 ↛ 1015line 1009 didn't jump to line 1015 because the condition on line 1009 was always true
1010 is_ephemeral = ":memory:" in settings.database_url or settings.database_url == "sqlite:///./mcp.db"
1011 if is_ephemeral:
1012 logger.warning("Using potentially ephemeral storage with platform admin bootstrap enabled. Consider using persistent storage or setting REQUIRE_USER_IN_DB=true for production.")
1014 # Warn about default JWT issuer/audience in non-development environments
1015 if settings.environment != "development":
1016 if settings.jwt_issuer == "mcpgateway": 1016 ↛ 1018line 1016 didn't jump to line 1018 because the condition on line 1016 was always true
1017 logger.warning("Using default JWT_ISSUER in %s environment. Set a unique JWT_ISSUER per environment to prevent cross-environment token acceptance.", settings.environment)
1018 if settings.jwt_audience == "mcpgateway-api": 1018 ↛ 1021line 1018 didn't jump to line 1021 because the condition on line 1018 was always true
1019 logger.warning("Using default JWT_AUDIENCE in %s environment. Set a unique JWT_AUDIENCE per environment to prevent cross-environment token acceptance.", settings.environment)
1021 log_security_recommendations(security_status)
1024def log_security_warnings(security_warnings: list[str]):
1025 """Log warnings from list of security warnings provided.
1027 Args:
1028 security_warnings: List of security warning messages.
1029 """
1030 if security_warnings: 1030 ↛ exitline 1030 didn't return from function 'log_security_warnings' because the condition on line 1030 was always true
1031 logger.warning("=" * 60)
1032 logger.warning("🚨 SECURITY WARNINGS DETECTED:")
1033 logger.warning("=" * 60)
1034 for warning in security_warnings:
1035 logger.warning(f" {warning}")
1036 logger.warning("=" * 60)
1039def log_critical_issues(critical_issues: list[Any]):
1040 """
1041 Log critical based on configuration settings
1042 If REQUIRE_STRONG_SECRETS set, this will output critical errors and exit the mcpgateway server.
1044 Args:
1045 critical_issues: List
1047 Returns: None
1048 """
1049 # Handle critical issues based on REQUIRE_STRONG_SECRETS setting
1050 if critical_issues: 1050 ↛ exitline 1050 didn't return from function 'log_critical_issues' because the condition on line 1050 was always true
1051 if settings.require_strong_secrets:
1052 logger.error("=" * 60)
1053 logger.error("💀 CRITICAL SECURITY ISSUES DETECTED:")
1054 logger.error("=" * 60)
1055 for issue in critical_issues:
1056 logger.error(f" ❌ {issue}")
1057 logger.error("=" * 60)
1058 logger.error("Startup aborted due to REQUIRE_STRONG_SECRETS=true")
1059 logger.error("To proceed anyway, set REQUIRE_STRONG_SECRETS=false")
1060 logger.error("=" * 60)
1061 sys.exit(1)
1062 else:
1063 # Log as warnings if not enforcing
1064 logger.warning("=" * 60)
1065 logger.warning("⚠️ Critical security issues detected (REQUIRE_STRONG_SECRETS=false):")
1066 for issue in critical_issues:
1067 logger.warning(f" • {issue}")
1068 logger.warning("=" * 60)
1071def log_security_recommendations(security_status: settings.SecurityStatus):
1072 """
1073 Log security recommendations based on configuration settings
1075 Args:
1076 security_status (settings.SecurityStatus): The SecurityStatus object for checking and logging current security settings from MCPGateway.
1078 Returns: None
1079 """
1080 if not security_status["secure_secrets"] or not security_status["auth_enabled"]:
1081 logger.info("=" * 60)
1082 logger.info("📋 SECURITY RECOMMENDATIONS:")
1083 logger.info("=" * 60)
1085 if settings.jwt_secret_key == "my-test-key": # nosec B105 - checking for default value
1086 logger.info(" • Generate a strong JWT secret:")
1087 logger.info(" python3 -c 'import secrets; print(secrets.token_urlsafe(32))'")
1089 if settings.basic_auth_password.get_secret_value() == "changeme": # nosec B105 - checking for default value 1089 ↛ 1092line 1089 didn't jump to line 1092 because the condition on line 1089 was always true
1090 logger.info(" • Set a strong admin password in BASIC_AUTH_PASSWORD")
1092 if not settings.auth_required:
1093 logger.info(" • Enable authentication: AUTH_REQUIRED=true")
1095 if settings.skip_ssl_verify:
1096 logger.info(" • Enable SSL verification: SKIP_SSL_VERIFY=false")
1098 logger.info("=" * 60)
1100 logger.info("✅ Security validation completed")
1103# Global exceptions handlers
1104@app.exception_handler(ValidationError)
1105async def validation_exception_handler(_request: Request, exc: ValidationError):
1106 """Handle Pydantic validation errors globally.
1108 Intercepts ValidationError exceptions raised anywhere in the application
1109 and returns a properly formatted JSON error response with detailed
1110 validation error information.
1112 Args:
1113 _request: The FastAPI request object that triggered the validation error.
1114 (Unused but required by FastAPI's exception handler interface)
1115 exc: The Pydantic ValidationError exception containing validation
1116 failure details.
1118 Returns:
1119 JSONResponse: A 422 Unprocessable Entity response with formatted
1120 validation error details.
1122 Examples:
1123 >>> from pydantic import ValidationError, BaseModel
1124 >>> from fastapi import Request
1125 >>> import asyncio
1126 >>>
1127 >>> class TestModel(BaseModel):
1128 ... name: str
1129 ... age: int
1130 >>>
1131 >>> # Create a validation error
1132 >>> try:
1133 ... TestModel(name="", age="invalid")
1134 ... except ValidationError as e:
1135 ... # Test our handler
1136 ... result = asyncio.run(validation_exception_handler(None, e))
1137 ... result.status_code
1138 422
1139 """
1140 return ORJSONResponse(status_code=422, content=ErrorFormatter.format_validation_error(exc))
1143@app.exception_handler(RequestValidationError)
1144async def request_validation_exception_handler(_request: Request, exc: RequestValidationError):
1145 """Handle FastAPI request validation errors (automatic request parsing).
1147 This handles ValidationErrors that occur during FastAPI's automatic request
1148 parsing before the request reaches your endpoint.
1150 Args:
1151 _request: The FastAPI request object that triggered validation error.
1152 exc: The RequestValidationError exception containing failure details.
1154 Returns:
1155 JSONResponse: A 422 Unprocessable Entity response with error details.
1156 """
1157 if _request.url.path.startswith("/tools"):
1158 error_details = []
1160 for error in exc.errors():
1161 loc = error.get("loc", [])
1162 msg = error.get("msg", "Unknown error")
1163 ctx = error.get("ctx", {"error": {}})
1164 type_ = error.get("type", "value_error")
1165 # Ensure ctx is JSON serializable
1166 if isinstance(ctx, dict):
1167 ctx_serializable = {k: (str(v) if isinstance(v, Exception) else v) for k, v in ctx.items()}
1168 else:
1169 ctx_serializable = str(ctx)
1170 error_detail = {"type": type_, "loc": loc, "msg": msg, "ctx": ctx_serializable}
1171 error_details.append(error_detail)
1173 response_content = {"detail": error_details}
1174 return ORJSONResponse(status_code=422, content=response_content)
1175 return await fastapi_default_validation_handler(_request, exc)
1178@app.exception_handler(IntegrityError)
1179async def database_exception_handler(_request: Request, exc: IntegrityError):
1180 """Handle SQLAlchemy database integrity constraint violations globally.
1182 Intercepts IntegrityError exceptions (e.g., unique constraint violations,
1183 foreign key constraints) and returns a properly formatted JSON error response.
1184 This provides consistent error handling for database constraint violations
1185 across the entire application.
1187 Args:
1188 _request: The FastAPI request object that triggered the database error.
1189 (Unused but required by FastAPI's exception handler interface)
1190 exc: The SQLAlchemy IntegrityError exception containing constraint
1191 violation details.
1193 Returns:
1194 JSONResponse: A 409 Conflict response with formatted database error details.
1196 Examples:
1197 >>> from sqlalchemy.exc import IntegrityError
1198 >>> from fastapi import Request
1199 >>> import asyncio
1200 >>>
1201 >>> # Create a mock integrity error
1202 >>> mock_error = IntegrityError("statement", {}, Exception("duplicate key"))
1203 >>> result = asyncio.run(database_exception_handler(None, mock_error))
1204 >>> result.status_code
1205 409
1206 >>> # Verify ErrorFormatter.format_database_error is called
1207 >>> hasattr(result, 'body')
1208 True
1209 """
1210 return ORJSONResponse(status_code=409, content=ErrorFormatter.format_database_error(exc))
1213@app.exception_handler(PluginViolationError)
1214async def plugin_violation_exception_handler(_request: Request, exc: PluginViolationError):
1215 """Handle plugins violations globally.
1217 Intercepts PluginViolationError exceptions (e.g., OPA policy violation) and returns a properly formatted JSON error response.
1218 This provides consistent error handling for plugin violation across the entire application.
1220 Args:
1221 _request: The FastAPI request object that triggered the database error.
1222 (Unused but required by FastAPI's exception handler interface)
1223 exc: The PluginViolationError exception containing constraint
1224 violation details.
1226 Returns:
1227 JSONResponse: A 200 response with error details in JSON-RPC format.
1229 Examples:
1230 >>> from mcpgateway.plugins.framework import PluginViolationError
1231 >>> from mcpgateway.plugins.framework.models import PluginViolation
1232 >>> from fastapi import Request
1233 >>> import asyncio
1234 >>> import json
1235 >>>
1236 >>> # Create a plugin violation error
1237 >>> mock_error = PluginViolationError(message="plugin violation",violation = PluginViolation(
1238 ... reason="Invalid input",
1239 ... description="The input contains prohibited content",
1240 ... code="PROHIBITED_CONTENT",
1241 ... details={"field": "message", "value": "test"}
1242 ... ))
1243 >>> result = asyncio.run(plugin_violation_exception_handler(None, mock_error))
1244 >>> result.status_code
1245 200
1246 >>> content = orjson.loads(result.body.decode())
1247 >>> content["error"]["code"]
1248 -32602
1249 >>> "Plugin Violation:" in content["error"]["message"]
1250 True
1251 >>> content["error"]["data"]["plugin_error_code"]
1252 'PROHIBITED_CONTENT'
1253 """
1254 policy_violation = exc.violation.model_dump() if exc.violation else {}
1255 message = exc.violation.description if exc.violation else "A plugin violation occurred."
1256 policy_violation["message"] = exc.message
1257 status_code = exc.violation.mcp_error_code if exc.violation and exc.violation.mcp_error_code else -32602
1258 violation_details: dict[str, Any] = {}
1259 if exc.violation:
1260 if exc.violation.description: 1260 ↛ 1262line 1260 didn't jump to line 1262 because the condition on line 1260 was always true
1261 violation_details["description"] = exc.violation.description
1262 if exc.violation.details:
1263 violation_details["details"] = exc.violation.details
1264 if exc.violation.code: 1264 ↛ 1266line 1264 didn't jump to line 1266 because the condition on line 1264 was always true
1265 violation_details["plugin_error_code"] = exc.violation.code
1266 if exc.violation.plugin_name:
1267 violation_details["plugin_name"] = exc.violation.plugin_name
1268 json_rpc_error = PydanticJSONRPCError(code=status_code, message="Plugin Violation: " + message, data=violation_details)
1269 return ORJSONResponse(status_code=200, content={"error": json_rpc_error.model_dump()})
1272@app.exception_handler(PluginError)
1273async def plugin_exception_handler(_request: Request, exc: PluginError):
1274 """Handle plugins errors globally.
1276 Intercepts PluginError exceptions and returns a properly formatted JSON error response.
1277 This provides consistent error handling for plugin error across the entire application.
1279 Args:
1280 _request: The FastAPI request object that triggered the database error.
1281 (Unused but required by FastAPI's exception handler interface)
1282 exc: The PluginError exception containing constraint
1283 violation details.
1285 Returns:
1286 JSONResponse: A 200 response with error details in JSON-RPC format.
1288 Examples:
1289 >>> from mcpgateway.plugins.framework import PluginError
1290 >>> from mcpgateway.plugins.framework.models import PluginErrorModel
1291 >>> from fastapi import Request
1292 >>> import asyncio
1293 >>> import json
1294 >>>
1295 >>> # Create a plugin error
1296 >>> mock_error = PluginError(error = PluginErrorModel(
1297 ... message="plugin error",
1298 ... code="timeout",
1299 ... plugin_name="abc",
1300 ... details={"field": "message", "value": "test"}
1301 ... ))
1302 >>> result = asyncio.run(plugin_exception_handler(None, mock_error))
1303 >>> result.status_code
1304 200
1305 >>> content = orjson.loads(result.body.decode())
1306 >>> content["error"]["code"]
1307 -32603
1308 >>> "Plugin Error:" in content["error"]["message"]
1309 True
1310 >>> content["error"]["data"]["plugin_error_code"]
1311 'timeout'
1312 >>> content["error"]["data"]["plugin_name"]
1313 'abc'
1314 """
1315 message = exc.error.message if exc.error else "A plugin error occurred."
1316 status_code = exc.error.mcp_error_code if exc.error else -32603
1317 error_details: dict[str, Any] = {}
1318 if exc.error: 1318 ↛ 1325line 1318 didn't jump to line 1325 because the condition on line 1318 was always true
1319 if exc.error.details:
1320 error_details["details"] = exc.error.details
1321 if exc.error.code:
1322 error_details["plugin_error_code"] = exc.error.code
1323 if exc.error.plugin_name: 1323 ↛ 1325line 1323 didn't jump to line 1325 because the condition on line 1323 was always true
1324 error_details["plugin_name"] = exc.error.plugin_name
1325 json_rpc_error = PydanticJSONRPCError(code=status_code, message="Plugin Error: " + message, data=error_details)
1326 return ORJSONResponse(status_code=200, content={"error": json_rpc_error.model_dump()})
1329def _normalize_scope_path(scope_path: str, root_path: str) -> str:
1330 """Strip ``root_path`` prefix from *scope_path* when a reverse proxy forwards the full path.
1332 Returns the route-only path (e.g. ``"/qa/gateway/docs"`` -> ``"/docs"``).
1333 A ``root_path`` of ``"/"`` is ignored to avoid stripping the leading slash
1334 from every path. Trailing slashes on *root_path* are stripped before
1335 comparison so that ``"/qa/gateway/"`` is handled identically to
1336 ``"/qa/gateway"``.
1338 Args:
1339 scope_path: The full path from the request scope.
1340 root_path: The root path prefix to be stripped.
1342 Returns:
1343 The normalized path with the root_path prefix removed.
1344 """
1345 if root_path and len(root_path) > 1:
1346 root_path = root_path.rstrip("/")
1347 if root_path and len(root_path) > 1 and scope_path.startswith(root_path):
1348 rest = scope_path[len(root_path) :]
1349 # Ensure we matched a full path segment, not a partial prefix
1350 # e.g. root_path="/app" must not strip from "/application/admin"
1351 if not rest or rest[0] == "/":
1352 return rest or "/"
1353 return scope_path
1356class DocsAuthMiddleware(BaseHTTPMiddleware):
1357 """
1358 Middleware to protect FastAPI's auto-generated documentation routes
1359 (/docs, /redoc, and /openapi.json) using Bearer token authentication.
1361 If a request to one of these paths is made without a valid token,
1362 the request is rejected with a 401 or 403 error.
1364 Note:
1365 OPTIONS requests are exempt from authentication to support CORS preflight
1366 as per RFC 7231 Section 4.3.7 (OPTIONS must not require authentication).
1368 Note:
1369 When DOCS_ALLOW_BASIC_AUTH is enabled, Basic Authentication
1370 is also accepted using BASIC_AUTH_USER and BASIC_AUTH_PASSWORD credentials.
1371 """
1373 async def dispatch(self, request: Request, call_next):
1374 """
1375 Intercepts incoming requests to check if they are accessing protected documentation routes.
1376 If so, it requires a valid Bearer token; otherwise, it allows the request to proceed.
1378 Args:
1379 request (Request): The incoming HTTP request.
1380 call_next (Callable): The function to call the next middleware or endpoint.
1382 Returns:
1383 Response: Either the standard route response or a 401/403 error response.
1385 Examples:
1386 >>> import asyncio
1387 >>> from unittest.mock import Mock, AsyncMock, patch
1388 >>> from fastapi import HTTPException
1389 >>> from fastapi.responses import JSONResponse
1390 >>>
1391 >>> # Test unprotected path - should pass through
1392 >>> middleware = DocsAuthMiddleware(None)
1393 >>> request = Mock()
1394 >>> request.url.path = "/api/tools"
1395 >>> request.scope = {"path": "/api/tools", "root_path": ""}
1396 >>> request.method = "GET"
1397 >>> request.headers.get.return_value = None
1398 >>> call_next = AsyncMock(return_value="response")
1399 >>>
1400 >>> result = asyncio.run(middleware.dispatch(request, call_next))
1401 >>> result
1402 'response'
1403 >>>
1404 >>> # Test that middleware checks protected paths
1405 >>> request.url.path = "/docs"
1406 >>> isinstance(middleware, DocsAuthMiddleware)
1407 True
1408 """
1409 protected_paths = ["/docs", "/redoc", "/openapi.json"]
1411 # Allow OPTIONS requests to pass through for CORS preflight (RFC 7231)
1412 if request.method == "OPTIONS":
1413 return await call_next(request)
1415 # Get path from scope to handle root_path correctly
1416 scope_path = request.scope.get("path", request.url.path)
1417 root_path = request.scope.get("root_path", "")
1418 scope_path = _normalize_scope_path(scope_path, root_path)
1420 is_protected = any(scope_path.startswith(p) for p in protected_paths)
1422 if is_protected:
1423 try:
1424 token = request.headers.get("Authorization")
1425 cookie_token = request.cookies.get("jwt_token")
1427 # Use dedicated docs authentication that bypasses global auth settings
1428 await require_docs_auth_override(token, cookie_token)
1429 except HTTPException as e:
1430 return ORJSONResponse(status_code=e.status_code, content={"detail": e.detail}, headers=e.headers if e.headers else None)
1432 # Proceed to next middleware or route
1433 return await call_next(request)
1436class AdminAuthMiddleware(BaseHTTPMiddleware):
1437 """
1438 Middleware to protect Admin UI routes (/admin/*) requiring admin privileges.
1440 Exempts login-related paths and static assets:
1441 - /admin/login - login page
1442 - /admin/logout - logout action
1443 - /admin/static/* - static assets
1445 All other /admin/* routes require the user to be authenticated AND be an admin.
1446 Non-admin authenticated users receive a 403 Forbidden response.
1448 Note: This middleware respects the auth_required setting. When auth_required=False
1449 (typically in test environments), the middleware allows requests to pass through
1450 and relies on endpoint-level authentication which can be mocked in tests.
1451 """
1453 # Paths under /admin that don't require admin privileges
1454 EXEMPT_PATHS = ["/admin/login", "/admin/logout", "/admin/static"]
1456 @staticmethod
1457 def _error_response(request: Request, root_path: str, status_code: int, detail: str, error_param: str = None):
1458 """Return appropriate error response based on request Accept header.
1460 Args:
1461 request: The incoming HTTP request.
1462 root_path: The root path prefix for the application.
1463 status_code: HTTP status code for JSON responses.
1464 detail: Error message detail.
1465 error_param: Optional error parameter for login redirect URL.
1467 Returns:
1468 RedirectResponse for HTML/HTMX requests, ORJSONResponse for API requests.
1469 """
1470 accept_header = request.headers.get("accept", "")
1471 is_htmx = request.headers.get("hx-request") == "true"
1472 if "text/html" in accept_header or is_htmx:
1473 login_url = f"{root_path}/admin/login" if root_path else "/admin/login"
1474 if error_param:
1475 login_url = f"{login_url}?error={error_param}"
1476 return RedirectResponse(url=login_url, status_code=302)
1477 return ORJSONResponse(status_code=status_code, content={"detail": detail})
1479 async def dispatch(self, request: Request, call_next): # pylint: disable=too-many-return-statements
1480 """
1481 Check admin privileges for admin routes.
1483 Args:
1484 request (Request): The incoming HTTP request.
1485 call_next (Callable): The function to call the next middleware or endpoint.
1487 Returns:
1488 Response: Either the standard route response or a 401/403 error response.
1489 """
1490 # Skip admin auth check if auth is not required (e.g., test environments)
1491 # This allows tests to mock authentication at the dependency level
1492 if not settings.auth_required:
1493 return await call_next(request)
1495 # Get path from scope to handle root_path correctly
1496 scope_path = request.scope.get("path", request.url.path)
1497 root_path = request.scope.get("root_path", "")
1498 scope_path = _normalize_scope_path(scope_path, root_path)
1500 # Allow OPTIONS requests for CORS preflight (RFC 7231)
1501 if request.method == "OPTIONS":
1502 return await call_next(request)
1504 # Check if this is an admin route
1505 is_admin_route = scope_path.startswith("/admin")
1507 if not is_admin_route:
1508 return await call_next(request)
1510 # Check if path is exempt (login, logout, static)
1511 is_exempt = any(scope_path.startswith(p) for p in self.EXEMPT_PATHS)
1512 if is_exempt:
1513 return await call_next(request)
1515 # For protected admin routes, verify admin status
1516 try:
1517 token = request.headers.get("Authorization")
1518 cookie_token = request.cookies.get("jwt_token") or request.cookies.get("access_token")
1520 # Extract token from header or cookie
1521 jwt_token = None
1522 if cookie_token:
1523 jwt_token = cookie_token
1524 elif token and token.startswith("Bearer "):
1525 jwt_token = token.split(" ", 1)[1]
1527 username = None
1529 if jwt_token:
1530 # Try JWT authentication first
1531 try:
1532 payload = await verify_jwt_token(jwt_token)
1533 username = payload.get("sub") or payload.get("email")
1535 if not username:
1536 return ORJSONResponse(status_code=401, content={"detail": "Invalid token"})
1538 # Check if token is revoked (if JTI exists)
1539 jti = payload.get("jti")
1540 if jti:
1541 try:
1542 is_revoked = await asyncio.to_thread(_check_token_revoked_sync, jti)
1543 if is_revoked: 1543 ↛ 1568line 1543 didn't jump to line 1568 because the condition on line 1543 was always true
1544 logger.warning(f"Admin access denied for revoked token: {username}")
1545 return self._error_response(request, root_path, 401, "Token has been revoked", "token_revoked")
1546 except Exception as revoke_error:
1547 logger.warning(f"Token revocation check failed: {revoke_error}")
1548 # Continue - don't fail auth if revocation check fails
1549 except Exception:
1550 # JWT validation failed, try API token
1551 token_hash = hashlib.sha256(jwt_token.encode()).hexdigest()
1552 api_token_info = await asyncio.to_thread(_lookup_api_token_sync, token_hash)
1554 if api_token_info: 1554 ↛ 1568line 1554 didn't jump to line 1568 because the condition on line 1554 was always true
1555 if api_token_info.get("expired"):
1556 return ORJSONResponse(status_code=401, content={"detail": "API token expired"})
1557 if api_token_info.get("revoked"):
1558 return ORJSONResponse(status_code=401, content={"detail": "API token has been revoked"})
1559 username = api_token_info["user_email"]
1560 logger.debug(f"Admin auth via API token: {username}")
1562 # NOTE: Basic auth is NOT supported for admin UI endpoints.
1563 # While AdminAuthMiddleware could validate Basic credentials, the admin
1564 # endpoints use get_current_user_with_permissions which requires JWT tokens.
1565 # Supporting Basic auth would require passing auth context to routes,
1566 # which increases complexity and attack surface. Use JWT or API tokens instead.
1568 if not username and settings.trust_proxy_auth and not settings.mcp_client_auth_enabled:
1569 # Proxy authentication path (when MCP client auth is disabled and proxy auth is trusted)
1570 proxy_user = request.headers.get(settings.proxy_user_header)
1571 if proxy_user: 1571 ↛ 1575line 1571 didn't jump to line 1575 because the condition on line 1571 was always true
1572 username = proxy_user
1573 logger.debug(f"Admin auth via proxy header: {username}")
1575 if not username:
1576 # No authentication method succeeded - redirect to login or return 401
1577 return self._error_response(request, root_path, 401, "Authentication required")
1579 # Check if user exists, is active, and has admin permissions
1580 db = next(get_db())
1581 try:
1582 auth_service = EmailAuthService(db)
1583 user = await auth_service.get_user_by_email(username)
1585 if not user:
1586 # Platform admin bootstrap (when REQUIRE_USER_IN_DB=false)
1587 platform_admin_email = getattr(settings, "platform_admin_email", "admin@example.com")
1588 if not settings.require_user_in_db and username == platform_admin_email:
1589 logger.info(f"Platform admin bootstrap authentication for {username}")
1590 # Allow platform admin through - they have implicit admin privileges
1591 else:
1592 return ORJSONResponse(status_code=401, content={"detail": "User not found"})
1593 else:
1594 # User exists in DB - check active status
1595 if not user.is_active:
1596 logger.warning(f"Admin access denied for disabled user: {username}")
1597 return self._error_response(request, root_path, 403, "Account is disabled", "account_disabled")
1599 # Check if user has admin permissions (either is_admin flag OR admin.* RBAC permissions)
1600 # This allows granular admin access for users with specific admin permissions
1601 permission_service = PermissionService(db)
1602 has_admin_access = await permission_service.has_admin_permission(username)
1603 if not has_admin_access:
1604 logger.warning(f"Admin access denied for user without admin permissions: {username}")
1605 return self._error_response(request, root_path, 403, "Admin privileges required", "admin_required")
1606 finally:
1607 db.close()
1609 except HTTPException as e:
1610 return self._error_response(request, root_path, e.status_code, e.detail)
1611 except Exception as e:
1612 logger.error(f"Admin auth middleware error: {e}")
1613 return ORJSONResponse(status_code=500, content={"detail": "Authentication error"})
1615 # Proceed to next middleware or route
1616 return await call_next(request)
1619class MCPPathRewriteMiddleware:
1620 """
1621 Middleware that rewrites paths ending with '/mcp' to '/mcp/', after performing authentication.
1623 - Rewrites paths like '/servers/<server_id>/mcp' to '/mcp/'.
1624 - Only paths ending with '/mcp' or '/mcp/' (but not exactly '/mcp' or '/mcp/') are rewritten.
1625 - Authentication is performed before any path rewriting.
1626 - If authentication fails, the request is not processed further.
1627 - All other requests are passed through without change.
1628 - Routes through the middleware stack (including CORSMiddleware) for proper CORS preflight handling.
1630 Attributes:
1631 application (Callable): The next ASGI application to process the request.
1632 """
1634 def __init__(self, application, dispatch=None):
1635 """
1636 Initialize the middleware with the ASGI application.
1638 Args:
1639 application (Callable): The next ASGI application to handle the request.
1640 dispatch (Callable, optional): An optional dispatch function for additional middleware processing.
1642 Example:
1643 >>> import asyncio
1644 >>> from unittest.mock import AsyncMock, patch
1645 >>> app_mock = AsyncMock()
1646 >>> middleware = MCPPathRewriteMiddleware(app_mock)
1647 >>> isinstance(middleware.application, AsyncMock)
1648 True
1649 """
1650 self.application = application
1651 self.dispatch = dispatch # this can be TokenScopingMiddleware
1653 async def __call__(self, scope, receive, send):
1654 """
1655 Intercept and potentially rewrite the incoming HTTP request path.
1657 Args:
1658 scope (dict): The ASGI connection scope.
1659 receive (Callable): Awaitable that yields events from the client.
1660 send (Callable): Awaitable used to send events to the client.
1662 Examples:
1663 >>> import asyncio
1664 >>> from unittest.mock import AsyncMock, patch
1665 >>> app_mock = AsyncMock()
1666 >>> middleware = MCPPathRewriteMiddleware(app_mock)
1668 >>> # Test path rewriting for /servers/123/mcp
1669 >>> scope = { "type": "http", "path": "/servers/123/mcp", "headers": [(b"host", b"example.com")] }
1670 >>> receive = AsyncMock()
1671 >>> send = AsyncMock()
1672 >>> with patch('mcpgateway.main.streamable_http_auth', return_value=True):
1673 ... asyncio.run(middleware(scope, receive, send))
1674 >>> scope["path"]
1675 '/mcp/'
1676 >>> app_mock.assert_called()
1678 >>> # Test regular path (no rewrite)
1679 >>> scope = { "type": "http","path": "/tools","headers": [(b"host", b"example.com")] }
1680 >>> with patch('mcpgateway.main.streamable_http_auth', return_value=True):
1681 ... asyncio.run(middleware(scope, receive, send))
1682 ... scope["path"]
1683 '/tools'
1684 """
1685 if scope["type"] != "http":
1686 await self.application(scope, receive, send)
1687 return
1689 # If a dispatch (request middleware) is provided, adapt it
1690 if self.dispatch is not None:
1691 request = starletteRequest(scope, receive=receive)
1693 async def call_next(_req: starletteRequest) -> starletteResponse:
1694 """
1695 Handles the next request in the middleware chain by calling a streamable HTTP response.
1697 Args:
1698 _req (starletteRequest): The incoming request to be processed.
1700 Returns:
1701 starletteResponse: A response generated from the streamable HTTP call.
1702 """
1703 return await self._call_streamable_http(scope, receive, send)
1705 response = await self.dispatch(request, call_next)
1707 if response is None:
1708 # Either the dispatch handled the response itself,
1709 # or it blocked the request. Just return.
1710 return
1712 await response(scope, receive, send)
1713 return
1715 # Otherwise, just continue as normal
1716 await self._call_streamable_http(scope, receive, send)
1718 async def _call_streamable_http(self, scope, receive, send):
1719 """
1720 Handles the streamable HTTP request after authentication and path rewriting.
1722 If auth succeeds and path ends with /mcp, rewrites to /mcp/ and calls self.application
1723 (continuing through middleware stack including CORSMiddleware).
1725 Args:
1726 scope (dict): The ASGI connection scope containing request metadata.
1727 receive (Callable): The function to receive events from the client.
1728 send (Callable): The function to send events to the client.
1730 Example:
1731 >>> import asyncio
1732 >>> from unittest.mock import AsyncMock, patch
1733 >>> app_mock = AsyncMock()
1734 >>> middleware = MCPPathRewriteMiddleware(app_mock)
1735 >>> scope = {"type": "http", "path": "/servers/123/mcp"}
1736 >>> receive = AsyncMock()
1737 >>> send = AsyncMock()
1738 >>> with patch('mcpgateway.main.streamable_http_auth', return_value=True):
1739 ... asyncio.run(middleware._call_streamable_http(scope, receive, send))
1740 >>> app_mock.assert_called_once_with(scope, receive, send)
1741 """
1742 # Auth check first
1743 auth_ok = await streamable_http_auth(scope, receive, send)
1744 if not auth_ok:
1745 return
1747 original_path = scope.get("path", "")
1748 scope["modified_path"] = original_path
1749 if (original_path.endswith("/mcp") and original_path != "/mcp") or (original_path.endswith("/mcp/") and original_path != "/mcp/"):
1750 # Rewrite to /mcp/ and continue through middleware (lets CORSMiddleware handle preflight)
1751 scope["path"] = "/mcp/"
1752 await self.application(scope, receive, send)
1753 return
1754 await self.application(scope, receive, send)
1757# Configure CORS with environment-aware origins
1758cors_origins = list(settings.allowed_origins) if settings.allowed_origins else []
1760# Ensure we never use wildcard in production
1761if settings.environment == "production" and not cors_origins:
1762 logger.warning("No CORS origins configured for production environment. CORS will be disabled.")
1763 cors_origins = []
1765app.add_middleware(
1766 CORSMiddleware,
1767 allow_origins=cors_origins,
1768 allow_credentials=settings.cors_allow_credentials,
1769 allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
1770 allow_headers=["*"],
1771 expose_headers=["Content-Length", "X-Request-ID", "X-Password-Change-Required"],
1772 max_age=600, # Cache preflight requests for 10 minutes
1773)
1775# Add response compression middleware (Brotli, Zstd, GZip)
1776# Automatically negotiates compression algorithm based on client Accept-Encoding header
1777# Priority: Brotli (best compression) > Zstd (fast) > GZip (universal fallback)
1778# Only compress responses larger than minimum_size to avoid overhead
1779# NOTE: When json_response_enabled=False (SSE mode), /mcp paths are excluded from
1780# compression to prevent buffering/breaking of streaming responses. See middleware/compression.py.
1781if settings.compression_enabled:
1782 app.add_middleware(
1783 SSEAwareCompressMiddleware,
1784 minimum_size=settings.compression_minimum_size,
1785 gzip_level=settings.compression_gzip_level,
1786 brotli_quality=settings.compression_brotli_quality,
1787 zstd_level=settings.compression_zstd_level,
1788 )
1789 logger.info(
1790 f"🗜️ Response compression enabled (SSE-aware): minimum_size={settings.compression_minimum_size}B, "
1791 f"gzip_level={settings.compression_gzip_level}, "
1792 f"brotli_quality={settings.compression_brotli_quality}, "
1793 f"zstd_level={settings.compression_zstd_level}"
1794 )
1795else:
1796 logger.info("🚫 Response compression disabled")
1798# Add security headers middleware
1799app.add_middleware(SecurityHeadersMiddleware)
1801# Add validation middleware if explicitly enabled
1802if settings.validation_middleware_enabled:
1803 app.add_middleware(ValidationMiddleware)
1804 logger.info("🔒 Input validation and output sanitization middleware enabled")
1805else:
1806 logger.info("🔒 Input validation and output sanitization middleware disabled")
1808# Add MCP Protocol Version validation middleware (validates MCP-Protocol-Version header)
1809app.add_middleware(MCPProtocolVersionMiddleware)
1811# Add token scoping middleware (only when email auth is enabled)
1812if settings.email_auth_enabled:
1813 app.add_middleware(BaseHTTPMiddleware, dispatch=token_scoping_middleware)
1814 # Add streamable HTTP middleware for /mcp routes with token scoping
1815 app.add_middleware(MCPPathRewriteMiddleware, dispatch=token_scoping_middleware)
1816else:
1817 # Add streamable HTTP middleware for /mcp routes
1818 app.add_middleware(MCPPathRewriteMiddleware)
1820# Add HTTP authentication hook middleware for plugins (before auth dependencies)
1821if plugin_manager:
1822 app.add_middleware(HttpAuthMiddleware, plugin_manager=plugin_manager)
1823 logger.info("🔌 HTTP authentication hooks enabled for plugins")
1825# Add request logging middleware FIRST (always enabled for gateway boundary logging)
1826# IMPORTANT: Must be registered BEFORE CorrelationIDMiddleware so it executes AFTER correlation ID is set
1827# Gateway boundary logging (request_started/completed) runs regardless of log_requests setting
1828# Detailed payload logging only runs if log_detailed_requests=True
1829app.add_middleware(
1830 RequestLoggingMiddleware,
1831 enable_gateway_logging=True,
1832 log_detailed_requests=settings.log_requests,
1833 log_level=settings.log_level,
1834 max_body_size=settings.log_detailed_max_body_size,
1835 log_resolve_user_identity=settings.log_resolve_user_identity,
1836 log_detailed_skip_endpoints=settings.log_detailed_skip_endpoints,
1837 log_detailed_sample_rate=settings.log_detailed_sample_rate,
1838)
1840# Add custom DocsAuthMiddleware
1841app.add_middleware(DocsAuthMiddleware)
1843# Add AdminAuthMiddleware to protect admin routes (requires admin privileges)
1844# This ensures all /admin/* routes (except login/logout) require admin status
1845app.add_middleware(AdminAuthMiddleware)
1847# Trust all proxies (or lock down with a list of host patterns)
1848app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*")
1850# Add correlation ID middleware if enabled
1851# Note: Registered AFTER RequestLoggingMiddleware so correlation ID is available when RequestLoggingMiddleware executes
1852if settings.correlation_id_enabled: 1852 ↛ 1859line 1852 didn't jump to line 1859 because the condition on line 1852 was always true
1853 app.add_middleware(CorrelationIDMiddleware)
1854 logger.info(f"✅ Correlation ID tracking enabled (header: {settings.correlation_id_header})")
1856# Add authentication context middleware if security logging is enabled
1857# This middleware extracts user context and logs security events (authentication attempts)
1858# Note: This is independent of observability - security logging is always important
1859if settings.security_logging_enabled:
1860 # First-Party
1861 from mcpgateway.middleware.auth_middleware import AuthContextMiddleware
1863 app.add_middleware(AuthContextMiddleware)
1864 logger.info("🔐 Authentication context middleware enabled - logging security events")
1865else:
1866 logger.info("🔐 Security event logging disabled")
1868# Add observability middleware if enabled
1869# Note: Middleware runs in REVERSE order (last added runs first)
1870# If AuthContextMiddleware is already registered, ObservabilityMiddleware wraps it
1871# Execution order will be: AuthContext -> Observability -> Request Handler
1872if settings.observability_enabled:
1873 # First-Party
1874 from mcpgateway.middleware.observability_middleware import ObservabilityMiddleware
1876 app.add_middleware(ObservabilityMiddleware, enabled=True)
1877 logger.info("🔍 Observability middleware enabled - tracing include-listed requests")
1878else:
1879 logger.info("🔍 Observability middleware disabled")
1881# Database query logging middleware (for N+1 detection)
1882if settings.db_query_log_enabled:
1883 # First-Party
1884 from mcpgateway.db import engine
1885 from mcpgateway.middleware.db_query_logging import setup_query_logging
1887 setup_query_logging(app, engine)
1888 logger.info(f"📊 Database query logging enabled - logs: {settings.db_query_log_file}")
1889else:
1890 logger.debug("📊 Database query logging disabled (enable with DB_QUERY_LOG_ENABLED=true)")
1892# Set up Jinja2 templates and store in app state for later use
1893# auto_reload=False in production prevents re-parsing templates on each request (performance)
1894jinja_env = Environment(
1895 loader=FileSystemLoader(str(settings.templates_dir)),
1896 autoescape=True,
1897 auto_reload=settings.templates_auto_reload,
1898)
1899templates = Jinja2Templates(env=jinja_env)
1900if not settings.templates_auto_reload: 1900 ↛ 1902line 1900 didn't jump to line 1902 because the condition on line 1900 was always true
1901 logger.info("🎨 Template auto-reload disabled (production mode)")
1902app.state.templates = templates
1904# Store plugin manager in app state for access in routes
1905app.state.plugin_manager = plugin_manager
1907# Initialize plugin service with plugin manager
1908if plugin_manager:
1909 # First-Party
1910 from mcpgateway.services.plugin_service import get_plugin_service
1912 plugin_service = get_plugin_service()
1913 plugin_service.set_plugin_manager(plugin_manager)
1915# Create API routers
1916protocol_router = APIRouter(prefix="/protocol", tags=["Protocol"])
1917tool_router = APIRouter(prefix="/tools", tags=["Tools"])
1918resource_router = APIRouter(prefix="/resources", tags=["Resources"])
1919prompt_router = APIRouter(prefix="/prompts", tags=["Prompts"])
1920gateway_router = APIRouter(prefix="/gateways", tags=["Gateways"])
1921root_router = APIRouter(prefix="/roots", tags=["Roots"])
1922utility_router = APIRouter(tags=["Utilities"])
1923server_router = APIRouter(prefix="/servers", tags=["Servers"])
1924metrics_router = APIRouter(prefix="/metrics", tags=["Metrics"])
1925tag_router = APIRouter(prefix="/tags", tags=["Tags"])
1926export_import_router = APIRouter(tags=["Export/Import"])
1927a2a_router = APIRouter(prefix="/a2a", tags=["A2A Agents"])
1929# Basic Auth setup
1932# Database dependency
1933def get_db():
1934 """
1935 Dependency function to provide a database session.
1937 Commits the transaction on successful completion to avoid implicit rollbacks
1938 for read-only operations. Rolls back explicitly on exception.
1940 This function handles connection failures gracefully by invalidating broken
1941 connections. When a connection is broken (e.g., due to PgBouncer timeout or
1942 network issues), the rollback will fail. In this case, we invalidate the
1943 session to ensure the broken connection is discarded from the pool rather
1944 than being returned in a bad state.
1946 Yields:
1947 Session: A SQLAlchemy session object for interacting with the database.
1949 Raises:
1950 Exception: Re-raises any exception after rolling back the transaction.
1952 Ensures:
1953 The database session is closed after the request completes, even in the case of an exception.
1955 Examples:
1956 >>> # Test that get_db returns a generator
1957 >>> db_gen = get_db()
1958 >>> hasattr(db_gen, '__next__')
1959 True
1960 >>> # Test cleanup happens
1961 >>> try:
1962 ... db = next(db_gen)
1963 ... type(db).__name__
1964 ... finally:
1965 ... try:
1966 ... next(db_gen)
1967 ... except StopIteration:
1968 ... pass # Expected - generator cleanup
1969 'ResilientSession'
1970 """
1971 db = SessionLocal()
1972 try:
1973 yield db
1974 # Only commit if the transaction is still active.
1975 # The transaction can become inactive if an exception occurred during
1976 # async context manager cleanup (e.g., CancelledError during MCP session teardown).
1977 if db.is_active: 1977 ↛ 1997line 1977 didn't jump to line 1997 because the condition on line 1977 was always true
1978 db.commit()
1979 except Exception:
1980 try:
1981 # Always call rollback() in exception handler.
1982 # rollback() is safe to call even when is_active=False - it succeeds and
1983 # restores the session to a usable state. When is_active=False (e.g., after
1984 # IntegrityError), rollback() is actually REQUIRED to clear the failed state.
1985 # Skipping rollback when is_active=False would leave the session unusable.
1986 db.rollback()
1987 except Exception:
1988 # Connection is broken - invalidate to remove from pool
1989 # This handles cases like PgBouncer query_wait_timeout where
1990 # the connection is dead and rollback itself fails
1991 try:
1992 db.invalidate()
1993 except Exception:
1994 pass # nosec B110 - Best effort cleanup on connection failure
1995 raise
1996 finally:
1997 db.close()
2000async def _read_request_json(request: Request) -> Any:
2001 """Read JSON payload using orjson.
2003 Args:
2004 request: Incoming FastAPI request to read JSON from.
2006 Returns:
2007 Parsed JSON payload.
2009 Raises:
2010 HTTPException: 400 for invalid JSON bodies.
2011 """
2012 body = await request.body()
2013 if not body:
2014 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid JSON in request body")
2015 try:
2016 return orjson.loads(body)
2017 except orjson.JSONDecodeError as exc:
2018 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid JSON in request body") from exc
2021def require_api_key(api_key: str) -> None:
2022 """Validates the provided API key.
2024 This function checks if the provided API key matches the expected one
2025 based on the settings. If the validation fails, it raises an HTTPException
2026 with a 401 Unauthorized status.
2028 Args:
2029 api_key (str): The API key provided by the user or client.
2031 Raises:
2032 HTTPException: If the API key is invalid, a 401 Unauthorized error is raised.
2034 Examples:
2035 >>> from mcpgateway.config import settings
2036 >>> from pydantic import SecretStr
2037 >>> settings.auth_required = True
2038 >>> settings.basic_auth_user = "admin"
2039 >>> settings.basic_auth_password = SecretStr("secret")
2040 >>>
2041 >>> # Valid API key
2042 >>> require_api_key("admin:secret") # Should not raise
2043 >>>
2044 >>> # Invalid API key
2045 >>> try:
2046 ... require_api_key("wrong:key")
2047 ... except HTTPException as e:
2048 ... e.status_code
2049 401
2050 """
2051 if settings.auth_required:
2052 expected = f"{settings.basic_auth_user}:{settings.basic_auth_password.get_secret_value()}"
2053 if api_key != expected:
2054 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key")
2057async def invalidate_resource_cache(uri: Optional[str] = None) -> None:
2058 """
2059 Invalidates the resource cache.
2061 If a specific URI is provided, only that resource will be removed from the cache.
2062 If no URI is provided, the entire resource cache will be cleared.
2064 Args:
2065 uri (Optional[str]): The URI of the resource to invalidate from the cache. If None, the entire cache is cleared.
2067 Examples:
2068 >>> import asyncio
2069 >>> # Test clearing specific URI from cache
2070 >>> resource_cache.set("/test/resource", {"content": "test data"})
2071 >>> resource_cache.get("/test/resource") is not None
2072 True
2073 >>> asyncio.run(invalidate_resource_cache("/test/resource"))
2074 >>> resource_cache.get("/test/resource") is None
2075 True
2076 >>>
2077 >>> # Test clearing entire cache
2078 >>> resource_cache.set("/resource1", {"content": "data1"})
2079 >>> resource_cache.set("/resource2", {"content": "data2"})
2080 >>> asyncio.run(invalidate_resource_cache())
2081 >>> resource_cache.get("/resource1") is None and resource_cache.get("/resource2") is None
2082 True
2083 """
2084 if uri:
2085 resource_cache.delete(uri)
2086 else:
2087 resource_cache.clear()
2090def get_protocol_from_request(request: Request) -> str:
2091 """
2092 Return "https" or "http" based on:
2093 1) X-Forwarded-Proto (if set by a proxy)
2094 2) request.url.scheme (e.g. when Gunicorn/Uvicorn is terminating TLS)
2096 Args:
2097 request (Request): The FastAPI request object.
2099 Returns:
2100 str: The protocol used for the request, either "http" or "https".
2102 Examples:
2103 Test with X-Forwarded-Proto header (proxy scenario):
2104 >>> from mcpgateway import main
2105 >>> from fastapi import Request
2106 >>> from urllib.parse import urlparse
2107 >>>
2108 >>> # Mock request with X-Forwarded-Proto
2109 >>> scope = {
2110 ... 'type': 'http',
2111 ... 'scheme': 'http',
2112 ... 'headers': [(b'x-forwarded-proto', b'https')],
2113 ... 'server': ('testserver', 80),
2114 ... 'path': '/',
2115 ... }
2116 >>> req = Request(scope)
2117 >>> main.get_protocol_from_request(req)
2118 'https'
2120 Test with comma-separated X-Forwarded-Proto:
2121 >>> scope_multi = {
2122 ... 'type': 'http',
2123 ... 'scheme': 'http',
2124 ... 'headers': [(b'x-forwarded-proto', b'https,http')],
2125 ... 'server': ('testserver', 80),
2126 ... 'path': '/',
2127 ... }
2128 >>> req_multi = Request(scope_multi)
2129 >>> main.get_protocol_from_request(req_multi)
2130 'https'
2132 Test without X-Forwarded-Proto (direct connection):
2133 >>> scope_direct = {
2134 ... 'type': 'http',
2135 ... 'scheme': 'https',
2136 ... 'headers': [],
2137 ... 'server': ('testserver', 443),
2138 ... 'path': '/',
2139 ... }
2140 >>> req_direct = Request(scope_direct)
2141 >>> main.get_protocol_from_request(req_direct)
2142 'https'
2144 Test with HTTP direct connection:
2145 >>> scope_http = {
2146 ... 'type': 'http',
2147 ... 'scheme': 'http',
2148 ... 'headers': [],
2149 ... 'server': ('testserver', 80),
2150 ... 'path': '/',
2151 ... }
2152 >>> req_http = Request(scope_http)
2153 >>> main.get_protocol_from_request(req_http)
2154 'http'
2155 """
2156 forwarded = request.headers.get("x-forwarded-proto")
2157 if forwarded:
2158 # may be a comma-separated list; take the first
2159 return forwarded.split(",")[0].strip()
2160 return request.url.scheme
2163def update_url_protocol(request: Request) -> str:
2164 """
2165 Update the base URL protocol based on the request's scheme or forwarded headers.
2167 Args:
2168 request (Request): The FastAPI request object.
2170 Returns:
2171 str: The base URL with the correct protocol.
2173 Examples:
2174 Test URL protocol update with HTTPS proxy:
2175 >>> from mcpgateway import main
2176 >>> from fastapi import Request
2177 >>>
2178 >>> # Mock request with HTTPS forwarded proto
2179 >>> scope_https = {
2180 ... 'type': 'http',
2181 ... 'scheme': 'http',
2182 ... 'server': ('example.com', 80),
2183 ... 'path': '/',
2184 ... 'headers': [(b'x-forwarded-proto', b'https')],
2185 ... }
2186 >>> req_https = Request(scope_https)
2187 >>> url = main.update_url_protocol(req_https)
2188 >>> url.startswith('https://example.com')
2189 True
2191 Test URL protocol update with HTTP direct:
2192 >>> scope_http = {
2193 ... 'type': 'http',
2194 ... 'scheme': 'http',
2195 ... 'server': ('localhost', 8000),
2196 ... 'path': '/',
2197 ... 'headers': [],
2198 ... }
2199 >>> req_http = Request(scope_http)
2200 >>> url = main.update_url_protocol(req_http)
2201 >>> url.startswith('http://localhost:8000')
2202 True
2204 Test URL protocol update preserves host and port:
2205 >>> scope_port = {
2206 ... 'type': 'http',
2207 ... 'scheme': 'https',
2208 ... 'server': ('api.test.com', 443),
2209 ... 'path': '/',
2210 ... 'headers': [],
2211 ... }
2212 >>> req_port = Request(scope_port)
2213 >>> url = main.update_url_protocol(req_port)
2214 >>> 'api.test.com' in url and url.startswith('https://')
2215 True
2217 Test trailing slash removal:
2218 >>> # URL should not end with trailing slash
2219 >>> url = main.update_url_protocol(req_http)
2220 >>> url.endswith('/')
2221 False
2222 """
2223 parsed = urlparse(str(request.base_url))
2224 proto = get_protocol_from_request(request)
2225 new_parsed = parsed._replace(scheme=proto)
2226 # urlunparse keeps netloc and path intact
2227 return str(urlunparse(new_parsed)).rstrip("/")
2230# Protocol APIs #
2231@protocol_router.post("/initialize")
2232async def initialize(request: Request, user=Depends(get_current_user)) -> InitializeResult:
2233 """
2234 Initialize a protocol.
2236 This endpoint handles the initialization process of a protocol by accepting
2237 a JSON request body and processing it. The `require_auth` dependency ensures that
2238 the user is authenticated before proceeding.
2240 Args:
2241 request (Request): The incoming request object containing the JSON body.
2242 user (str): The authenticated user (from `require_auth` dependency).
2244 Returns:
2245 InitializeResult: The result of the initialization process.
2247 Raises:
2248 HTTPException: If the request body contains invalid JSON, a 400 Bad Request error is raised.
2249 """
2250 try:
2251 body = await _read_request_json(request)
2253 logger.debug(f"Authenticated user {user} is initializing the protocol.")
2254 return await session_registry.handle_initialize_logic(body)
2256 except orjson.JSONDecodeError:
2257 raise HTTPException(
2258 status_code=status.HTTP_400_BAD_REQUEST,
2259 detail="Invalid JSON in request body",
2260 )
2263@protocol_router.post("/ping")
2264async def ping(request: Request, user=Depends(get_current_user)) -> JSONResponse:
2265 """
2266 Handle a ping request according to the MCP specification.
2268 This endpoint expects a JSON-RPC request with the method "ping" and responds
2269 with a JSON-RPC response containing an empty result, as required by the protocol.
2271 Args:
2272 request (Request): The incoming FastAPI request.
2273 user (str): The authenticated user (dependency injection).
2275 Returns:
2276 JSONResponse: A JSON-RPC response with an empty result or an error response.
2278 Raises:
2279 HTTPException: If the request method is not "ping".
2280 """
2281 req_id: Optional[str] = None
2282 try:
2283 body: dict = await _read_request_json(request)
2284 if body.get("method") != "ping":
2285 raise HTTPException(status_code=400, detail="Invalid method")
2286 req_id = body.get("id")
2287 logger.debug(f"Authenticated user {user} sent ping request.")
2288 # Return an empty result per the MCP ping specification.
2289 response: dict = {"jsonrpc": "2.0", "id": req_id, "result": {}}
2290 return ORJSONResponse(content=response)
2291 except Exception as e:
2292 error_response: dict = {
2293 "jsonrpc": "2.0",
2294 "id": req_id, # Now req_id is always defined
2295 "error": {"code": -32603, "message": "Internal error", "data": str(e)},
2296 }
2297 return ORJSONResponse(status_code=500, content=error_response)
2300@protocol_router.post("/notifications")
2301async def handle_notification(request: Request, user=Depends(get_current_user)) -> None:
2302 """
2303 Handles incoming notifications from clients. Depending on the notification method,
2304 different actions are taken (e.g., logging initialization, cancellation, or messages).
2306 Args:
2307 request (Request): The incoming request containing the notification data.
2308 user (str): The authenticated user making the request.
2309 """
2310 body = await _read_request_json(request)
2311 logger.debug(f"User {user} sent a notification")
2312 if body.get("method") == "notifications/initialized":
2313 logger.info("Client initialized")
2314 await logging_service.notify("Client initialized", LogLevel.INFO)
2315 elif body.get("method") == "notifications/cancelled":
2316 # Note: requestId can be 0 (valid per JSON-RPC), so use 'is not None' and normalize to string
2317 raw_request_id = body.get("params", {}).get("requestId")
2318 request_id = str(raw_request_id) if raw_request_id is not None else None
2319 reason = body.get("params", {}).get("reason")
2320 logger.info(f"Request cancelled: {request_id}, reason: {reason}")
2321 # Attempt local cancellation per MCP spec
2322 if request_id is not None: 2322 ↛ 2324line 2322 didn't jump to line 2324 because the condition on line 2322 was always true
2323 await cancellation_service.cancel_run(request_id, reason=reason)
2324 await logging_service.notify(f"Request cancelled: {request_id}", LogLevel.INFO)
2325 elif body.get("method") == "notifications/message":
2326 params = body.get("params", {})
2327 await logging_service.notify(
2328 params.get("data"),
2329 LogLevel(params.get("level", "info")),
2330 params.get("logger"),
2331 )
2334@protocol_router.post("/completion/complete")
2335async def handle_completion(request: Request, db: Session = Depends(get_db), user=Depends(get_current_user_with_permissions)):
2336 """
2337 Handles the completion of tasks by processing a completion request.
2339 Args:
2340 request (Request): The incoming request with completion data.
2341 db (Session): The database session used to interact with the data store.
2342 user (str): The authenticated user making the request.
2344 Returns:
2345 The result of the completion process.
2346 """
2347 body = await _read_request_json(request)
2348 logger.debug(f"User {user['email']} sent a completion request")
2349 return await completion_service.handle_completion(db, body)
2352@protocol_router.post("/sampling/createMessage")
2353async def handle_sampling(request: Request, db: Session = Depends(get_db), user=Depends(get_current_user_with_permissions)):
2354 """
2355 Handles the creation of a new message for sampling.
2357 Args:
2358 request (Request): The incoming request with sampling data.
2359 db (Session): The database session used to interact with the data store.
2360 user (str): The authenticated user making the request.
2362 Returns:
2363 The result of the message creation process.
2364 """
2365 logger.debug(f"User {user['email']} sent a sampling request")
2366 body = await _read_request_json(request)
2367 return await sampling_handler.create_message(db, body)
2370###############
2371# Server APIs #
2372###############
2373@server_router.get("", response_model=Union[List[ServerRead], CursorPaginatedServersResponse])
2374@server_router.get("/", response_model=Union[List[ServerRead], CursorPaginatedServersResponse])
2375@require_permission("servers.read")
2376async def list_servers(
2377 request: Request,
2378 cursor: Optional[str] = Query(None, description="Cursor for pagination"),
2379 include_pagination: bool = Query(False, description="Include cursor pagination metadata in response"),
2380 limit: Optional[int] = Query(None, ge=0, description="Maximum number of servers to return"),
2381 include_inactive: bool = False,
2382 tags: Optional[str] = None,
2383 team_id: Optional[str] = None,
2384 visibility: Optional[str] = None,
2385 db: Session = Depends(get_db),
2386 user=Depends(get_current_user_with_permissions),
2387) -> Union[List[ServerRead], Dict[str, Any]]:
2388 """
2389 Lists servers accessible to the user, with team filtering and cursor pagination support.
2391 Args:
2392 request (Request): The incoming request object for team_id retrieval.
2393 cursor (Optional[str]): Cursor for pagination.
2394 include_pagination (bool): Include cursor pagination metadata in response.
2395 limit (Optional[int]): Maximum number of servers to return.
2396 include_inactive (bool): Whether to include inactive servers in the response.
2397 tags (Optional[str]): Comma-separated list of tags to filter by.
2398 team_id (Optional[str]): Filter by specific team ID.
2399 visibility (Optional[str]): Filter by visibility (private, team, public).
2400 db (Session): The database session used to interact with the data store.
2401 user (str): The authenticated user making the request.
2403 Returns:
2404 Union[List[ServerRead], Dict[str, Any]]: A list of server objects or paginated response with nextCursor.
2405 """
2406 # Parse tags parameter if provided
2407 tags_list = None
2408 if tags:
2409 tags_list = [tag.strip() for tag in tags.split(",") if tag.strip()]
2410 # Get user email for team filtering
2411 user_email = get_user_email(user)
2413 # Check team ID from token
2414 token_team_id = getattr(request.state, "team_id", None)
2415 token_teams = getattr(request.state, "token_teams", None)
2417 # Check for team ID mismatch
2418 if team_id is not None and token_team_id is not None and team_id != token_team_id:
2419 return ORJSONResponse(
2420 content={"message": "Access issue: This API token does not have the required permissions for this team."},
2421 status_code=status.HTTP_403_FORBIDDEN,
2422 )
2424 # Determine final team ID
2425 team_id = team_id or token_team_id
2427 # SECURITY: token_teams is normalized in auth.py:
2428 # - None: admin bypass (is_admin=true with explicit null teams) - sees ALL resources
2429 # - []: public-only (missing teams or explicit empty) - sees only public
2430 # - [...]: team-scoped - sees public + teams + user's private
2431 is_admin_bypass = token_teams is None
2432 is_public_only_token = token_teams is not None and len(token_teams) == 0
2434 # Use consolidated server listing with optional team filtering
2435 # For admin bypass: pass user_email=None and token_teams=None to skip all filtering
2436 logger.debug(f"User: {user_email} requested server list with include_inactive={include_inactive}, tags={tags_list}, team_id={team_id}, visibility={visibility}")
2437 data, next_cursor = await server_service.list_servers(
2438 db=db,
2439 cursor=cursor,
2440 limit=limit,
2441 include_inactive=include_inactive,
2442 tags=tags_list,
2443 user_email=None if is_admin_bypass else user_email, # Admin bypass: no user filtering
2444 team_id=team_id,
2445 visibility="public" if is_public_only_token and not visibility else visibility,
2446 token_teams=token_teams, # None = admin bypass, [] = public-only, [...] = team-scoped
2447 )
2449 if include_pagination:
2450 payload = {"servers": [server.model_dump(by_alias=True) for server in data]}
2451 if next_cursor: 2451 ↛ 2453line 2451 didn't jump to line 2453 because the condition on line 2451 was always true
2452 payload["nextCursor"] = next_cursor
2453 return payload
2454 return data
2457@server_router.get("/{server_id}", response_model=ServerRead)
2458@require_permission("servers.read")
2459async def get_server(server_id: str, db: Session = Depends(get_db), user=Depends(get_current_user_with_permissions)) -> ServerRead:
2460 """
2461 Retrieves a server by its ID.
2463 Args:
2464 server_id (str): The ID of the server to retrieve.
2465 db (Session): The database session used to interact with the data store.
2466 user (str): The authenticated user making the request.
2468 Returns:
2469 ServerRead: The server object with the specified ID.
2471 Raises:
2472 HTTPException: If the server is not found.
2473 """
2474 try:
2475 logger.debug(f"User {user} requested server with ID {server_id}")
2476 return await server_service.get_server(db, server_id)
2477 except ServerNotFoundError as e:
2478 raise HTTPException(status_code=404, detail=str(e))
2481@server_router.post("", response_model=ServerRead, status_code=201)
2482@server_router.post("/", response_model=ServerRead, status_code=201)
2483@require_permission("servers.create")
2484async def create_server(
2485 server: ServerCreate,
2486 request: Request,
2487 team_id: Optional[str] = Body(None, description="Team ID to assign server to"),
2488 visibility: Optional[str] = Body(None, description="Server visibility: private, team, public"),
2489 db: Session = Depends(get_db),
2490 user=Depends(get_current_user_with_permissions),
2491) -> ServerRead:
2492 """
2493 Creates a new server.
2495 Args:
2496 server (ServerCreate): The data for the new server.
2497 request (Request): The incoming request object for extracting metadata.
2498 team_id (Optional[str]): Team ID to assign the server to.
2499 visibility (str): Server visibility level (private, team, public).
2500 db (Session): The database session used to interact with the data store.
2501 user (str): The authenticated user making the request.
2503 Returns:
2504 ServerRead: The created server object.
2506 Raises:
2507 HTTPException: If there is a conflict with the server name or other errors.
2508 """
2509 try:
2510 # Extract metadata from request
2511 metadata = MetadataCapture.extract_creation_metadata(request, user)
2513 # Get user email and handle team assignment
2514 user_email = get_user_email(user)
2516 token_team_id = getattr(request.state, "team_id", None)
2517 token_teams = getattr(request.state, "token_teams", None)
2519 # SECURITY: Public-only tokens (teams == []) cannot create team/private resources
2520 is_public_only_token = token_teams is not None and len(token_teams) == 0
2521 if is_public_only_token and visibility in ("team", "private"):
2522 return ORJSONResponse(
2523 content={"message": "Public-only tokens cannot create team or private resources. Use visibility='public' or obtain a team-scoped token."},
2524 status_code=status.HTTP_403_FORBIDDEN,
2525 )
2527 # Check for team ID mismatch (only for non-public-only tokens)
2528 if not is_public_only_token and team_id is not None and token_team_id is not None and team_id != token_team_id:
2529 return ORJSONResponse(
2530 content={"message": "Access issue: This API token does not have the required permissions for this team."},
2531 status_code=status.HTTP_403_FORBIDDEN,
2532 )
2534 # Determine final team ID (public-only tokens get no team)
2535 if is_public_only_token:
2536 team_id = None
2537 else:
2538 team_id = team_id or token_team_id
2540 logger.debug(f"User {user_email} is creating a new server for team {team_id}")
2541 result = await server_service.register_server(
2542 db,
2543 server,
2544 created_by=metadata["created_by"],
2545 created_from_ip=metadata["created_from_ip"],
2546 created_via=metadata["created_via"],
2547 created_user_agent=metadata["created_user_agent"],
2548 team_id=team_id,
2549 owner_email=user_email,
2550 visibility=visibility,
2551 )
2552 db.commit()
2553 db.close()
2554 return result
2555 except ServerNameConflictError as e:
2556 raise HTTPException(status_code=409, detail=str(e))
2557 except ServerError as e:
2558 raise HTTPException(status_code=400, detail=str(e))
2559 except ValidationError as e:
2560 logger.error(f"Validation error while creating server: {e}")
2561 raise HTTPException(status_code=422, detail=ErrorFormatter.format_validation_error(e))
2562 except IntegrityError as e:
2563 logger.error(f"Integrity error while creating server: {e}")
2564 raise HTTPException(status_code=409, detail=ErrorFormatter.format_database_error(e))
2567@server_router.put("/{server_id}", response_model=ServerRead)
2568@require_permission("servers.update")
2569async def update_server(
2570 server_id: str,
2571 server: ServerUpdate,
2572 request: Request,
2573 db: Session = Depends(get_db),
2574 user=Depends(get_current_user_with_permissions),
2575) -> ServerRead:
2576 """
2577 Updates the information of an existing server.
2579 Args:
2580 server_id (str): The ID of the server to update.
2581 server (ServerUpdate): The updated server data.
2582 request (Request): The incoming request object containing metadata.
2583 db (Session): The database session used to interact with the data store.
2584 user (str): The authenticated user making the request.
2586 Returns:
2587 ServerRead: The updated server object.
2589 Raises:
2590 HTTPException: If the server is not found, there is a name conflict, or other errors.
2591 """
2592 try:
2593 logger.debug(f"User {user} is updating server with ID {server_id}")
2594 # Extract modification metadata
2595 mod_metadata = MetadataCapture.extract_modification_metadata(request, user, 0) # Version will be incremented in service
2597 user_email: str = get_user_email(user)
2599 result = await server_service.update_server(
2600 db,
2601 server_id,
2602 server,
2603 user_email,
2604 modified_by=mod_metadata["modified_by"],
2605 modified_from_ip=mod_metadata["modified_from_ip"],
2606 modified_via=mod_metadata["modified_via"],
2607 modified_user_agent=mod_metadata["modified_user_agent"],
2608 )
2609 db.commit()
2610 db.close()
2611 return result
2612 except PermissionError as e:
2613 raise HTTPException(status_code=403, detail=str(e))
2614 except ServerNotFoundError as e:
2615 raise HTTPException(status_code=404, detail=str(e))
2616 except ServerNameConflictError as e:
2617 raise HTTPException(status_code=409, detail=str(e))
2618 except ServerError as e:
2619 raise HTTPException(status_code=400, detail=str(e))
2620 except ValidationError as e:
2621 logger.error(f"Validation error while updating server {server_id}: {e}")
2622 raise HTTPException(status_code=422, detail=ErrorFormatter.format_validation_error(e))
2623 except IntegrityError as e:
2624 logger.error(f"Integrity error while updating server {server_id}: {e}")
2625 raise HTTPException(status_code=409, detail=ErrorFormatter.format_database_error(e))
2628@server_router.post("/{server_id}/state", response_model=ServerRead)
2629@require_permission("servers.update")
2630async def set_server_state(
2631 server_id: str,
2632 activate: bool = True,
2633 db: Session = Depends(get_db),
2634 user=Depends(get_current_user_with_permissions),
2635) -> ServerRead:
2636 """
2637 Sets the status of a server (activate or deactivate).
2639 Args:
2640 server_id (str): The ID of the server to set state for.
2641 activate (bool): Whether to activate or deactivate the server.
2642 db (Session): The database session used to interact with the data store.
2643 user (str): The authenticated user making the request.
2645 Returns:
2646 ServerRead: The server object after the status change.
2648 Raises:
2649 HTTPException: If the server is not found or there is an error.
2650 """
2651 try:
2652 user_email = user.get("email") if isinstance(user, dict) else str(user)
2653 logger.debug(f"User {user} is setting server with ID {server_id} to {'active' if activate else 'inactive'}")
2654 return await server_service.set_server_state(db, server_id, activate, user_email=user_email)
2655 except PermissionError as e:
2656 raise HTTPException(status_code=403, detail=str(e))
2657 except ServerNotFoundError as e:
2658 raise HTTPException(status_code=404, detail=str(e))
2659 except ServerLockConflictError as e:
2660 raise HTTPException(status_code=409, detail=str(e))
2661 except ServerError as e:
2662 raise HTTPException(status_code=400, detail=str(e))
2665@server_router.post("/{server_id}/toggle", response_model=ServerRead, deprecated=True)
2666@require_permission("servers.update")
2667async def toggle_server_status(
2668 server_id: str,
2669 activate: bool = True,
2670 db: Session = Depends(get_db),
2671 user=Depends(get_current_user_with_permissions),
2672) -> ServerRead:
2673 """DEPRECATED: Use /state endpoint instead. This endpoint will be removed in a future release.
2675 Sets the status of a server (activate or deactivate).
2677 Args:
2678 server_id: The server ID.
2679 activate: Whether to activate (True) or deactivate (False) the server.
2680 db: Database session.
2681 user: Authenticated user context.
2683 Returns:
2684 The updated server.
2685 """
2687 warnings.warn("The /toggle endpoint is deprecated. Use /state instead.", DeprecationWarning, stacklevel=2)
2688 return await set_server_state(server_id, activate, db, user)
2691@server_router.delete("/{server_id}", response_model=Dict[str, str])
2692@require_permission("servers.delete")
2693async def delete_server(
2694 server_id: str,
2695 purge_metrics: bool = Query(False, description="Purge raw + rollup metrics for this server"),
2696 db: Session = Depends(get_db),
2697 user=Depends(get_current_user_with_permissions),
2698) -> Dict[str, str]:
2699 """
2700 Deletes a server by its ID.
2702 Args:
2703 server_id (str): The ID of the server to delete.
2704 purge_metrics (bool): Whether to delete raw + hourly rollup metrics for this server.
2705 db (Session): The database session used to interact with the data store.
2706 user (str): The authenticated user making the request.
2708 Returns:
2709 Dict[str, str]: A success message indicating the server was deleted.
2711 Raises:
2712 HTTPException: If the server is not found or there is an error.
2713 """
2714 try:
2715 logger.debug(f"User {user} is deleting server with ID {server_id}")
2716 user_email = user.get("email") if isinstance(user, dict) else str(user)
2717 await server_service.get_server(db, server_id)
2718 await server_service.delete_server(db, server_id, user_email=user_email, purge_metrics=purge_metrics)
2719 db.commit()
2720 db.close()
2721 return {
2722 "status": "success",
2723 "message": f"Server {server_id} deleted successfully",
2724 }
2725 except PermissionError as e:
2726 raise HTTPException(status_code=403, detail=str(e))
2727 except ServerNotFoundError as e:
2728 raise HTTPException(status_code=404, detail=str(e))
2729 except ServerError as e:
2730 raise HTTPException(status_code=400, detail=str(e))
2733@server_router.get("/{server_id}/sse")
2734@require_permission("servers.use")
2735async def sse_endpoint(request: Request, server_id: str, user=Depends(get_current_user_with_permissions)):
2736 """
2737 Establishes a Server-Sent Events (SSE) connection for real-time updates about a server.
2739 Args:
2740 request (Request): The incoming request.
2741 server_id (str): The ID of the server for which updates are received.
2742 user (str): The authenticated user making the request.
2744 Returns:
2745 The SSE response object for the established connection.
2747 Raises:
2748 HTTPException: If there is an error in establishing the SSE connection.
2749 asyncio.CancelledError: If the request is cancelled during SSE setup.
2750 """
2751 try:
2752 logger.debug(f"User {user} is establishing SSE connection for server {server_id}")
2753 base_url = update_url_protocol(request)
2754 server_sse_url = f"{base_url}/servers/{server_id}"
2756 # SSE transport generates its own session_id - server-initiated, not client-provided
2757 transport = SSETransport(base_url=server_sse_url)
2758 await transport.connect()
2759 await session_registry.add_session(transport.session_id, transport)
2761 # Extract auth token from request (header OR cookie, like get_current_user_with_permissions)
2762 # MUST be computed BEFORE create_sse_response to avoid race condition (Finding 1)
2763 auth_token = None
2764 auth_header = request.headers.get("authorization", "")
2765 if auth_header.lower().startswith("bearer "):
2766 auth_token = auth_header[7:]
2767 elif hasattr(request, "cookies") and request.cookies:
2768 # Cookie auth (admin UI sessions)
2769 auth_token = request.cookies.get("jwt_token") or request.cookies.get("access_token")
2771 # Extract and normalize token teams
2772 # Returns None if no JWT payload (non-JWT auth), or list if JWT exists
2773 # SECURITY: Preserve None vs [] distinction for admin bypass:
2774 # - None: unrestricted (admin keeps bypass, non-admin gets their accessible resources)
2775 # - []: public-only (admin bypass disabled)
2776 # - [...]: team-scoped access
2777 token_teams = _get_token_teams_from_request(request)
2779 # Preserve is_admin from user object (for cookie-authenticated admins)
2780 is_admin = False
2781 if hasattr(user, "is_admin"):
2782 is_admin = getattr(user, "is_admin", False)
2783 elif isinstance(user, dict): 2783 ↛ 2787line 2783 didn't jump to line 2787 because the condition on line 2783 was always true
2784 is_admin = user.get("is_admin", False) or user.get("user", {}).get("is_admin", False)
2786 # Create enriched user dict
2787 user_with_token = dict(user) if isinstance(user, dict) else {"email": getattr(user, "email", str(user))}
2788 user_with_token["auth_token"] = auth_token
2789 user_with_token["token_teams"] = token_teams # None for unrestricted, [] for public-only, [...] for team-scoped
2790 user_with_token["is_admin"] = is_admin # Preserve admin status for fallback token
2792 # Defensive cleanup callback - runs immediately on client disconnect
2793 async def on_disconnect_cleanup() -> None:
2794 """Clean up session when SSE client disconnects."""
2795 try:
2796 await session_registry.remove_session(transport.session_id)
2797 logger.debug("Defensive session cleanup completed: %s", transport.session_id)
2798 except Exception as e:
2799 logger.warning("Defensive session cleanup failed for %s: %s", transport.session_id, e)
2801 # CRITICAL: Create and register respond task BEFORE create_sse_response (Finding 1 fix)
2802 # This ensures the task exists when disconnect callback runs, preventing orphaned tasks
2803 respond_task = asyncio.create_task(session_registry.respond(server_id, user_with_token, session_id=transport.session_id, base_url=base_url))
2804 session_registry.register_respond_task(transport.session_id, respond_task)
2806 try:
2807 response = await transport.create_sse_response(request, on_disconnect_callback=on_disconnect_cleanup)
2808 except asyncio.CancelledError:
2809 # Request cancelled - still need to clean up to prevent orphaned tasks
2810 logger.debug(f"SSE request cancelled for {transport.session_id}, cleaning up")
2811 try:
2812 await session_registry.remove_session(transport.session_id)
2813 except Exception as cleanup_error:
2814 logger.warning(f"Cleanup after SSE cancellation failed: {cleanup_error}")
2815 raise # Re-raise CancelledError
2816 except Exception as sse_error:
2817 # CRITICAL: Cleanup on failure - respond task and session would be orphaned otherwise
2818 logger.error(f"create_sse_response failed for {transport.session_id}: {sse_error}")
2819 try:
2820 await session_registry.remove_session(transport.session_id)
2821 except Exception as cleanup_error:
2822 logger.warning(f"Cleanup after SSE failure also failed: {cleanup_error}")
2823 raise
2825 tasks = BackgroundTasks()
2826 tasks.add_task(session_registry.remove_session, transport.session_id)
2827 response.background = tasks
2828 logger.info(f"SSE connection established: {transport.session_id}")
2829 return response
2830 except Exception as e:
2831 logger.error(f"SSE connection error: {e}")
2832 raise HTTPException(status_code=500, detail="SSE connection failed")
2835@server_router.post("/{server_id}/message")
2836@require_permission("servers.use")
2837async def message_endpoint(request: Request, server_id: str, user=Depends(get_current_user_with_permissions)):
2838 """
2839 Handles incoming messages for a specific server.
2841 Args:
2842 request (Request): The incoming message request.
2843 server_id (str): The ID of the server receiving the message.
2844 user (str): The authenticated user making the request.
2846 Returns:
2847 JSONResponse: A success status after processing the message.
2849 Raises:
2850 HTTPException: If there are errors processing the message.
2851 """
2852 try:
2853 logger.debug(f"User {user} sent a message to server {server_id}")
2854 session_id = request.query_params.get("session_id")
2855 if not session_id:
2856 logger.error("Missing session_id in message request")
2857 raise HTTPException(status_code=400, detail="Missing session_id")
2859 message = await _read_request_json(request)
2861 # Check if this is an elicitation response (JSON-RPC response with result containing action)
2862 is_elicitation_response = False
2863 if "result" in message and isinstance(message.get("result"), dict):
2864 result_data = message["result"]
2865 if "action" in result_data and result_data.get("action") in ["accept", "decline", "cancel"]: 2865 ↛ 2884line 2865 didn't jump to line 2884 because the condition on line 2865 was always true
2866 # This looks like an elicitation response
2867 request_id = message.get("id")
2868 if request_id: 2868 ↛ 2884line 2868 didn't jump to line 2884 because the condition on line 2868 was always true
2869 # Try to complete the elicitation
2870 # First-Party
2871 from mcpgateway.common.models import ElicitResult # pylint: disable=import-outside-toplevel
2872 from mcpgateway.services.elicitation_service import get_elicitation_service # pylint: disable=import-outside-toplevel
2874 elicitation_service = get_elicitation_service()
2875 try:
2876 elicit_result = ElicitResult(**result_data)
2877 if elicitation_service.complete_elicitation(request_id, elicit_result):
2878 logger.info(f"Completed elicitation {request_id} from session {session_id}")
2879 is_elicitation_response = True
2880 except Exception as e:
2881 logger.warning(f"Failed to process elicitation response: {e}")
2883 # If not an elicitation response, broadcast normally
2884 if not is_elicitation_response:
2885 await session_registry.broadcast(
2886 session_id=session_id,
2887 message=message,
2888 )
2890 return ORJSONResponse(content={"status": "success"}, status_code=202)
2891 except ValueError as e:
2892 logger.error(f"Invalid message format: {e}")
2893 raise HTTPException(status_code=400, detail=str(e))
2894 except HTTPException:
2895 raise
2896 except Exception as e:
2897 logger.error(f"Message handling error: {e}")
2898 raise HTTPException(status_code=500, detail="Failed to process message")
2901@server_router.get("/{server_id}/tools", response_model=List[ToolRead])
2902@require_permission("servers.read")
2903async def server_get_tools(
2904 request: Request,
2905 server_id: str,
2906 include_inactive: bool = False,
2907 include_metrics: bool = False,
2908 db: Session = Depends(get_db),
2909 user=Depends(get_current_user_with_permissions),
2910) -> List[Dict[str, Any]]:
2911 """
2912 List tools for the server with an option to include inactive tools.
2914 This endpoint retrieves a list of tools from the database, optionally including
2915 those that are inactive. The inactive filter helps administrators manage tools
2916 that have been deactivated but not deleted from the system.
2918 Args:
2919 request (Request): FastAPI request object.
2920 server_id (str): ID of the server
2921 include_inactive (bool): Whether to include inactive tools in the results.
2922 include_metrics (bool): Whether to include metrics in the tools results.
2923 db (Session): Database session dependency.
2924 user (str): Authenticated user dependency.
2926 Returns:
2927 List[ToolRead]: A list of tool records formatted with by_alias=True.
2928 """
2929 logger.debug(f"User: {user} has listed tools for the server_id: {server_id}")
2930 user_email, token_teams, is_admin = _get_rpc_filter_context(request, user)
2931 _req_email, _req_is_admin = user_email, is_admin
2932 _req_team_roles = get_user_team_roles(db, _req_email) if _req_email and not _req_is_admin else None
2933 # Admin bypass - only when token has NO team restrictions (token_teams is None)
2934 # If token has explicit team scope (even empty [] for public-only), respect it
2935 if is_admin and token_teams is None:
2936 user_email = None
2937 token_teams = None # Admin unrestricted
2938 elif token_teams is None:
2939 token_teams = [] # Non-admin without teams = public-only (secure default)
2940 tools = await tool_service.list_server_tools(
2941 db,
2942 server_id=server_id,
2943 include_inactive=include_inactive,
2944 include_metrics=include_metrics,
2945 user_email=user_email,
2946 token_teams=token_teams,
2947 requesting_user_email=_req_email,
2948 requesting_user_is_admin=_req_is_admin,
2949 requesting_user_team_roles=_req_team_roles,
2950 )
2951 return [tool.model_dump(by_alias=True) for tool in tools]
2954@server_router.get("/{server_id}/resources", response_model=List[ResourceRead])
2955@require_permission("servers.read")
2956async def server_get_resources(
2957 request: Request,
2958 server_id: str,
2959 include_inactive: bool = False,
2960 db: Session = Depends(get_db),
2961 user=Depends(get_current_user_with_permissions),
2962) -> List[Dict[str, Any]]:
2963 """
2964 List resources for the server with an option to include inactive resources.
2966 This endpoint retrieves a list of resources from the database, optionally including
2967 those that are inactive. The inactive filter is useful for administrators who need
2968 to view or manage resources that have been deactivated but not deleted.
2970 Args:
2971 request (Request): FastAPI request object.
2972 server_id (str): ID of the server
2973 include_inactive (bool): Whether to include inactive resources in the results.
2974 db (Session): Database session dependency.
2975 user (str): Authenticated user dependency.
2977 Returns:
2978 List[ResourceRead]: A list of resource records formatted with by_alias=True.
2979 """
2980 logger.debug(f"User: {user} has listed resources for the server_id: {server_id}")
2981 user_email, token_teams, is_admin = _get_rpc_filter_context(request, user)
2982 # Admin bypass - only when token has NO team restrictions (token_teams is None)
2983 # If token has explicit team scope (even empty [] for public-only), respect it
2984 if is_admin and token_teams is None:
2985 user_email = None
2986 token_teams = None # Admin unrestricted
2987 elif token_teams is None:
2988 token_teams = [] # Non-admin without teams = public-only (secure default)
2989 resources = await resource_service.list_server_resources(db, server_id=server_id, include_inactive=include_inactive, user_email=user_email, token_teams=token_teams)
2990 return [resource.model_dump(by_alias=True) for resource in resources]
2993@server_router.get("/{server_id}/prompts", response_model=List[PromptRead])
2994@require_permission("servers.read")
2995async def server_get_prompts(
2996 request: Request,
2997 server_id: str,
2998 include_inactive: bool = False,
2999 db: Session = Depends(get_db),
3000 user=Depends(get_current_user_with_permissions),
3001) -> List[Dict[str, Any]]:
3002 """
3003 List prompts for the server with an option to include inactive prompts.
3005 This endpoint retrieves a list of prompts from the database, optionally including
3006 those that are inactive. The inactive filter helps administrators see and manage
3007 prompts that have been deactivated but not deleted from the system.
3009 Args:
3010 request (Request): FastAPI request object.
3011 server_id (str): ID of the server
3012 include_inactive (bool): Whether to include inactive prompts in the results.
3013 db (Session): Database session dependency.
3014 user (str): Authenticated user dependency.
3016 Returns:
3017 List[PromptRead]: A list of prompt records formatted with by_alias=True.
3018 """
3019 logger.debug(f"User: {user} has listed prompts for the server_id: {server_id}")
3020 user_email, token_teams, is_admin = _get_rpc_filter_context(request, user)
3021 # Admin bypass - only when token has NO team restrictions (token_teams is None)
3022 # If token has explicit team scope (even empty [] for public-only), respect it
3023 if is_admin and token_teams is None:
3024 user_email = None
3025 token_teams = None # Admin unrestricted
3026 elif token_teams is None:
3027 token_teams = [] # Non-admin without teams = public-only (secure default)
3028 prompts = await prompt_service.list_server_prompts(db, server_id=server_id, include_inactive=include_inactive, user_email=user_email, token_teams=token_teams)
3029 return [prompt.model_dump(by_alias=True) for prompt in prompts]
3032##################
3033# A2A Agent APIs #
3034##################
3035@a2a_router.get("", response_model=Union[List[A2AAgentRead], CursorPaginatedA2AAgentsResponse])
3036@a2a_router.get("/", response_model=Union[List[A2AAgentRead], CursorPaginatedA2AAgentsResponse])
3037@require_permission("a2a.read")
3038async def list_a2a_agents(
3039 request: Request,
3040 include_inactive: bool = False,
3041 tags: Optional[str] = None,
3042 team_id: Optional[str] = Query(None, description="Filter by team ID"),
3043 visibility: Optional[str] = Query(None, description="Filter by visibility (private, team, public)"),
3044 cursor: Optional[str] = Query(None, description="Cursor for pagination"),
3045 include_pagination: bool = Query(False, description="Include cursor pagination metadata in response"),
3046 limit: Optional[int] = Query(None, description="Maximum number of agents to return"),
3047 db: Session = Depends(get_db),
3048 user=Depends(get_current_user_with_permissions),
3049) -> Union[List[A2AAgentRead], Dict[str, Any]]:
3050 """
3051 Lists A2A agents user has access to with cursor pagination and team filtering.
3053 Args:
3054 request (Request): The FastAPI request object for team_id retrieval.
3055 include_inactive (bool): Whether to include inactive agents in the response.
3056 tags (Optional[str]): Comma-separated list of tags to filter by.
3057 team_id (Optional[str]): Team ID to filter by.
3058 visibility (Optional[str]): Visibility level to filter by.
3059 cursor (Optional[str]): Cursor for pagination.
3060 include_pagination (bool): Include cursor pagination metadata in response.
3061 limit (Optional[int]): Maximum number of agents to return.
3062 db (Session): The database session used to interact with the data store.
3063 user (str): The authenticated user making the request.
3065 Returns:
3066 Union[List[A2AAgentRead], Dict[str, Any]]: A list of A2A agent objects or paginated response with nextCursor.
3068 Raises:
3069 HTTPException: If A2A service is not available.
3070 """
3071 # Parse tags parameter if provided
3072 tags_list = None
3073 if tags:
3074 tags_list = [tag.strip() for tag in tags.split(",") if tag.strip()]
3076 if a2a_service is None:
3077 raise HTTPException(status_code=503, detail="A2A service not available")
3079 # Get filtering context from token (respects token scope)
3080 user_email, token_teams, is_admin = _get_rpc_filter_context(request, user)
3082 # Admin bypass - only when token has NO team restrictions (token_teams is None)
3083 # If token has explicit team scope (even for admins), respect it for least-privilege
3084 if is_admin and token_teams is None:
3085 user_email = None
3086 token_teams = None # Admin unrestricted
3087 elif token_teams is None:
3088 token_teams = [] # Non-admin without teams = public-only (secure default)
3090 # Check team_id from request.state (set during auth)
3091 # Only use for non-empty-team tokens; empty-team tokens should rely on visibility filtering
3092 token_team_id = getattr(request.state, "team_id", None)
3093 is_empty_team_token = token_teams is not None and len(token_teams) == 0
3095 # Check for team ID mismatch (only applies when both are specified and token has teams)
3096 if team_id is not None and token_team_id is not None and team_id != token_team_id and not is_empty_team_token:
3097 return ORJSONResponse(
3098 content={"message": "Access issue: This API token does not have the required permissions for this team."},
3099 status_code=status.HTTP_403_FORBIDDEN,
3100 )
3102 # Determine final team ID - don't use token_team_id for empty-team tokens
3103 # Empty-team tokens should filter by public + owned, not by personal team
3104 if not is_empty_team_token:
3105 team_id = team_id or token_team_id
3107 logger.debug(f"User: {user_email} requested A2A agent list with team_id={team_id}, visibility={visibility}, tags={tags_list}, cursor={cursor}")
3109 # Use consolidated agent listing with token-based team filtering
3110 data, next_cursor = await a2a_service.list_agents(
3111 db=db,
3112 cursor=cursor,
3113 include_inactive=include_inactive,
3114 tags=tags_list,
3115 limit=limit,
3116 user_email=user_email,
3117 token_teams=token_teams,
3118 team_id=team_id,
3119 visibility=visibility,
3120 )
3122 if include_pagination:
3123 payload = {"agents": [agent.model_dump(by_alias=True) for agent in data]}
3124 if next_cursor: 3124 ↛ 3126line 3124 didn't jump to line 3126 because the condition on line 3124 was always true
3125 payload["nextCursor"] = next_cursor
3126 return payload
3127 return data
3130@a2a_router.get("/{agent_id}", response_model=A2AAgentRead)
3131@require_permission("a2a.read")
3132async def get_a2a_agent(
3133 agent_id: str,
3134 request: Request,
3135 db: Session = Depends(get_db),
3136 user=Depends(get_current_user_with_permissions),
3137) -> A2AAgentRead:
3138 """
3139 Retrieves an A2A agent by its ID.
3141 Args:
3142 agent_id (str): The ID of the agent to retrieve.
3143 request (Request): The FastAPI request object for team_id retrieval.
3144 db (Session): The database session used to interact with the data store.
3145 user (str): The authenticated user making the request.
3147 Returns:
3148 A2AAgentRead: The agent object with the specified ID.
3150 Raises:
3151 HTTPException: If the agent is not found or user lacks access.
3152 """
3153 try:
3154 logger.debug(f"User {user} requested A2A agent with ID {agent_id}")
3155 if a2a_service is None:
3156 raise HTTPException(status_code=503, detail="A2A service not available")
3158 # Get filtering context from token (respects token scope)
3159 user_email, token_teams, is_admin = _get_rpc_filter_context(request, user)
3161 # Admin bypass - only when token has NO team restrictions
3162 if is_admin and token_teams is None:
3163 token_teams = None # Admin unrestricted
3164 elif token_teams is None:
3165 token_teams = [] # Non-admin without teams = public-only
3167 return await a2a_service.get_agent(
3168 db,
3169 agent_id,
3170 user_email=user_email,
3171 token_teams=token_teams,
3172 )
3173 except A2AAgentNotFoundError as e:
3174 raise HTTPException(status_code=404, detail=str(e))
3177@a2a_router.post("", response_model=A2AAgentRead, status_code=201)
3178@a2a_router.post("/", response_model=A2AAgentRead, status_code=201)
3179@require_permission("a2a.create")
3180async def create_a2a_agent(
3181 agent: A2AAgentCreate,
3182 request: Request,
3183 team_id: Optional[str] = Body(None, description="Team ID to assign agent to"),
3184 visibility: Optional[str] = Body("public", description="Agent visibility: private, team, public"),
3185 db: Session = Depends(get_db),
3186 user=Depends(get_current_user_with_permissions),
3187) -> A2AAgentRead:
3188 """
3189 Creates a new A2A agent.
3191 Args:
3192 agent (A2AAgentCreate): The data for the new agent.
3193 request (Request): The FastAPI request object for metadata extraction.
3194 team_id (Optional[str]): Team ID to assign the agent to.
3195 visibility (str): Agent visibility level (private, team, public).
3196 db (Session): The database session used to interact with the data store.
3197 user (str): The authenticated user making the request.
3199 Returns:
3200 A2AAgentRead: The created agent object.
3202 Raises:
3203 HTTPException: If there is a conflict with the agent name or other errors.
3204 """
3205 try:
3206 # Extract metadata from request
3207 metadata = MetadataCapture.extract_creation_metadata(request, user)
3209 # Get user email and handle team assignment
3210 user_email = get_user_email(user)
3212 token_team_id = getattr(request.state, "team_id", None)
3213 token_teams = getattr(request.state, "token_teams", None)
3215 # SECURITY: Public-only tokens (teams == []) cannot create team/private resources
3216 is_public_only_token = token_teams is not None and len(token_teams) == 0
3217 if is_public_only_token and visibility in ("team", "private"):
3218 return ORJSONResponse(
3219 content={"message": "Public-only tokens cannot create team or private resources. Use visibility='public' or obtain a team-scoped token."},
3220 status_code=status.HTTP_403_FORBIDDEN,
3221 )
3223 # Check for team ID mismatch (only for non-public-only tokens)
3224 if not is_public_only_token and team_id is not None and token_team_id is not None and team_id != token_team_id:
3225 return ORJSONResponse(
3226 content={"message": "Access issue: This API token does not have the required permissions for this team."},
3227 status_code=status.HTTP_403_FORBIDDEN,
3228 )
3230 # Determine final team ID (public-only tokens get no team)
3231 if is_public_only_token:
3232 team_id = None
3233 else:
3234 team_id = team_id or token_team_id
3236 logger.debug(f"User {user_email} is creating a new A2A agent for team {team_id}")
3237 if a2a_service is None:
3238 raise HTTPException(status_code=503, detail="A2A service not available")
3239 return await a2a_service.register_agent(
3240 db,
3241 agent,
3242 created_by=metadata["created_by"],
3243 created_from_ip=metadata["created_from_ip"],
3244 created_via=metadata["created_via"],
3245 created_user_agent=metadata["created_user_agent"],
3246 import_batch_id=metadata["import_batch_id"],
3247 federation_source=metadata["federation_source"],
3248 team_id=team_id,
3249 owner_email=user_email,
3250 visibility=visibility,
3251 )
3252 except A2AAgentNameConflictError as e:
3253 raise HTTPException(status_code=409, detail=str(e))
3254 except A2AAgentError as e:
3255 raise HTTPException(status_code=400, detail=str(e))
3256 except ValidationError as e:
3257 logger.error(f"Validation error while creating A2A agent: {e}")
3258 raise HTTPException(status_code=422, detail=ErrorFormatter.format_validation_error(e))
3259 except IntegrityError as e:
3260 logger.error(f"Integrity error while creating A2A agent: {e}")
3261 raise HTTPException(status_code=409, detail=ErrorFormatter.format_database_error(e))
3264@a2a_router.put("/{agent_id}", response_model=A2AAgentRead)
3265@require_permission("a2a.update")
3266async def update_a2a_agent(
3267 agent_id: str,
3268 agent: A2AAgentUpdate,
3269 request: Request,
3270 db: Session = Depends(get_db),
3271 user=Depends(get_current_user_with_permissions),
3272) -> A2AAgentRead:
3273 """
3274 Updates the information of an existing A2A agent.
3276 Args:
3277 agent_id (str): The ID of the agent to update.
3278 agent (A2AAgentUpdate): The updated agent data.
3279 request (Request): The FastAPI request object for metadata extraction.
3280 db (Session): The database session used to interact with the data store.
3281 user (str): The authenticated user making the request.
3283 Returns:
3284 A2AAgentRead: The updated agent object.
3286 Raises:
3287 HTTPException: If the agent is not found, there is a name conflict, or other errors.
3288 """
3289 try:
3290 logger.debug(f"User {user} is updating A2A agent with ID {agent_id}")
3291 # Extract modification metadata
3292 mod_metadata = MetadataCapture.extract_modification_metadata(request, user, 0) # Version will be incremented in service
3294 if a2a_service is None:
3295 raise HTTPException(status_code=503, detail="A2A service not available")
3296 user_email = user.get("email") if isinstance(user, dict) else str(user)
3297 return await a2a_service.update_agent(
3298 db,
3299 agent_id,
3300 agent,
3301 modified_by=mod_metadata["modified_by"],
3302 modified_from_ip=mod_metadata["modified_from_ip"],
3303 modified_via=mod_metadata["modified_via"],
3304 modified_user_agent=mod_metadata["modified_user_agent"],
3305 user_email=user_email,
3306 )
3307 except PermissionError as e:
3308 raise HTTPException(status_code=403, detail=str(e))
3309 except A2AAgentNotFoundError as e:
3310 raise HTTPException(status_code=404, detail=str(e))
3311 except A2AAgentNameConflictError as e:
3312 raise HTTPException(status_code=409, detail=str(e))
3313 except A2AAgentError as e:
3314 raise HTTPException(status_code=400, detail=str(e))
3315 except ValidationError as e:
3316 logger.error(f"Validation error while updating A2A agent {agent_id}: {e}")
3317 raise HTTPException(status_code=422, detail=ErrorFormatter.format_validation_error(e))
3318 except IntegrityError as e:
3319 logger.error(f"Integrity error while updating A2A agent {agent_id}: {e}")
3320 raise HTTPException(status_code=409, detail=ErrorFormatter.format_database_error(e))
3323@a2a_router.post("/{agent_id}/state", response_model=A2AAgentRead)
3324@require_permission("a2a.update")
3325async def set_a2a_agent_state(
3326 agent_id: str,
3327 activate: bool = True,
3328 db: Session = Depends(get_db),
3329 user=Depends(get_current_user_with_permissions),
3330) -> A2AAgentRead:
3331 """
3332 Sets the status of an A2A agent (activate or deactivate).
3334 Args:
3335 agent_id (str): The ID of the agent to update.
3336 activate (bool): Whether to activate or deactivate the agent.
3337 db (Session): The database session used to interact with the data store.
3338 user (str): The authenticated user making the request.
3340 Returns:
3341 A2AAgentRead: The agent object after the status change.
3343 Raises:
3344 HTTPException: If the agent is not found or there is an error.
3345 """
3346 try:
3347 user_email = user.get("email") if isinstance(user, dict) else str(user)
3348 logger.debug(f"User {user} is toggling A2A agent with ID {agent_id} to {'active' if activate else 'inactive'}")
3349 if a2a_service is None:
3350 raise HTTPException(status_code=503, detail="A2A service not available")
3351 return await a2a_service.set_agent_state(db, agent_id, activate, user_email=user_email)
3352 except PermissionError as e:
3353 raise HTTPException(status_code=403, detail=str(e))
3354 except A2AAgentNotFoundError as e:
3355 raise HTTPException(status_code=404, detail=str(e))
3356 except A2AAgentError as e:
3357 raise HTTPException(status_code=400, detail=str(e))
3360@a2a_router.post("/{agent_id}/toggle", response_model=A2AAgentRead, deprecated=True)
3361@require_permission("a2a.update")
3362async def toggle_a2a_agent_status(
3363 agent_id: str,
3364 activate: bool = True,
3365 db: Session = Depends(get_db),
3366 user=Depends(get_current_user_with_permissions),
3367) -> A2AAgentRead:
3368 """DEPRECATED: Use /state endpoint instead. This endpoint will be removed in a future release.
3370 Sets the status of an A2A agent (activate or deactivate).
3372 Args:
3373 agent_id: The A2A agent ID.
3374 activate: Whether to activate (True) or deactivate (False) the agent.
3375 db: Database session.
3376 user: Authenticated user context.
3378 Returns:
3379 The updated A2A agent.
3380 """
3382 warnings.warn("The /toggle endpoint is deprecated. Use /state instead.", DeprecationWarning, stacklevel=2)
3383 return await set_a2a_agent_state(agent_id, activate, db, user)
3386@a2a_router.delete("/{agent_id}", response_model=Dict[str, str])
3387@require_permission("a2a.delete")
3388async def delete_a2a_agent(
3389 agent_id: str,
3390 purge_metrics: bool = Query(False, description="Purge raw + rollup metrics for this agent"),
3391 db: Session = Depends(get_db),
3392 user=Depends(get_current_user_with_permissions),
3393) -> Dict[str, str]:
3394 """
3395 Deletes an A2A agent by its ID.
3397 Args:
3398 agent_id (str): The ID of the agent to delete.
3399 purge_metrics (bool): Whether to delete raw + hourly rollup metrics for this agent.
3400 db (Session): The database session used to interact with the data store.
3401 user (str): The authenticated user making the request.
3403 Returns:
3404 Dict[str, str]: A success message indicating the agent was deleted.
3406 Raises:
3407 HTTPException: If the agent is not found or there is an error.
3408 """
3409 try:
3410 logger.debug(f"User {user} is deleting A2A agent with ID {agent_id}")
3411 if a2a_service is None:
3412 raise HTTPException(status_code=503, detail="A2A service not available")
3413 user_email = user.get("email") if isinstance(user, dict) else str(user)
3414 await a2a_service.delete_agent(db, agent_id, user_email=user_email, purge_metrics=purge_metrics)
3415 return {
3416 "status": "success",
3417 "message": f"A2A Agent {agent_id} deleted successfully",
3418 }
3419 except PermissionError as e:
3420 raise HTTPException(status_code=403, detail=str(e))
3421 except A2AAgentNotFoundError as e:
3422 raise HTTPException(status_code=404, detail=str(e))
3423 except A2AAgentError as e:
3424 raise HTTPException(status_code=400, detail=str(e))
3427@a2a_router.post("/{agent_name}/invoke", response_model=Dict[str, Any])
3428@require_permission("a2a.invoke")
3429async def invoke_a2a_agent(
3430 agent_name: str,
3431 request: Request,
3432 parameters: Dict[str, Any] = Body(default_factory=dict),
3433 interaction_type: str = Body(default="query"),
3434 db: Session = Depends(get_db),
3435 user=Depends(get_current_user_with_permissions),
3436) -> Dict[str, Any]:
3437 """
3438 Invokes an A2A agent with the specified parameters.
3440 Args:
3441 agent_name (str): The name of the agent to invoke.
3442 request (Request): The FastAPI request object for team_id retrieval.
3443 parameters (Dict[str, Any]): Parameters for the agent interaction.
3444 interaction_type (str): Type of interaction (query, execute, etc.).
3445 db (Session): The database session used to interact with the data store.
3446 user (str): The authenticated user making the request.
3448 Returns:
3449 Dict[str, Any]: The response from the A2A agent.
3451 Raises:
3452 HTTPException: If the agent is not found, user lacks access, or there is an error during invocation.
3453 """
3454 try:
3455 logger.debug(f"User {user} is invoking A2A agent '{agent_name}' with type '{interaction_type}'")
3456 if a2a_service is None:
3457 raise HTTPException(status_code=503, detail="A2A service not available")
3459 # Get filtering context from token (respects token scope)
3460 user_email, token_teams, is_admin = _get_rpc_filter_context(request, user)
3462 # Admin bypass - only when token has NO team restrictions
3463 if is_admin and token_teams is None:
3464 token_teams = None # Admin unrestricted
3465 elif token_teams is None:
3466 token_teams = [] # Non-admin without teams = public-only
3468 user_id = None
3469 if isinstance(user, dict):
3470 user_id = str(user.get("id") or user.get("sub") or user_email)
3471 else:
3472 user_id = str(user)
3474 return await a2a_service.invoke_agent(
3475 db,
3476 agent_name,
3477 parameters,
3478 interaction_type,
3479 user_id=user_id,
3480 user_email=user_email,
3481 token_teams=token_teams,
3482 )
3483 except A2AAgentNotFoundError as e:
3484 raise HTTPException(status_code=404, detail=str(e))
3485 except A2AAgentError as e:
3486 raise HTTPException(status_code=400, detail=str(e))
3489#############
3490# Tool APIs #
3491#############
3492@tool_router.get("", response_model=Union[List[ToolRead], CursorPaginatedToolsResponse])
3493@tool_router.get("/", response_model=Union[List[ToolRead], CursorPaginatedToolsResponse])
3494@require_permission("tools.read")
3495async def list_tools(
3496 request: Request,
3497 cursor: Optional[str] = None,
3498 include_pagination: bool = Query(False, description="Include cursor pagination metadata in response"),
3499 limit: Optional[int] = Query(None, ge=0, description="Maximum number of tools to return. 0 means all (no limit). Default uses pagination_default_page_size."),
3500 include_inactive: bool = False,
3501 tags: Optional[str] = None,
3502 team_id: Optional[str] = Query(None, description="Filter by team ID"),
3503 visibility: Optional[str] = Query(None, description="Filter by visibility: private, team, public"),
3504 gateway_id: Optional[str] = Query(None, description="Filter by gateway ID"),
3505 db: Session = Depends(get_db),
3506 apijsonpath: JsonPathModifier = Body(None),
3507 user=Depends(get_current_user_with_permissions),
3508) -> Union[List[ToolRead], List[Dict], Dict]:
3509 """List all registered tools with team-based filtering and pagination support.
3511 Args:
3512 request (Request): The FastAPI request object for team_id retrieval
3513 cursor: Pagination cursor for fetching the next set of results
3514 include_pagination: Whether to include cursor pagination metadata in the response
3515 limit: Maximum number of tools to return. Use 0 for all tools (no limit).
3516 If not specified, uses pagination_default_page_size (default: 50).
3517 include_inactive: Whether to include inactive tools in the results
3518 tags: Comma-separated list of tags to filter by (e.g., "api,data")
3519 team_id: Optional team ID to filter tools by specific team
3520 visibility: Optional visibility filter (private, team, public)
3521 gateway_id: Optional gateway ID to filter tools by specific gateway
3522 db: Database session
3523 apijsonpath: JSON path modifier to filter or transform the response
3524 user: Authenticated user with permissions
3526 Returns:
3527 List of tools or modified result based on jsonpath
3528 """
3530 # Parse tags parameter if provided
3531 tags_list = None
3532 if tags:
3533 tags_list = [tag.strip() for tag in tags.split(",") if tag.strip()]
3535 # Get filtering context from token (respects token scope)
3536 user_email, token_teams, is_admin = _get_rpc_filter_context(request, user)
3537 # Capture original identity for header masking (before admin bypass modifies user_email)
3538 _req_email, _req_is_admin = user_email, is_admin
3540 # Admin bypass - only when token has NO team restrictions (token_teams is None)
3541 # If token has explicit team scope (even for admins), respect it for least-privilege
3542 if is_admin and token_teams is None:
3543 user_email = None
3544 token_teams = None # Admin unrestricted
3545 elif token_teams is None:
3546 token_teams = [] # Non-admin without teams = public-only (secure default)
3548 # Check team_id from request.state (set during auth)
3549 # Only use for non-empty-team tokens; empty-team tokens should rely on visibility filtering
3550 token_team_id = getattr(request.state, "team_id", None)
3551 is_empty_team_token = token_teams is not None and len(token_teams) == 0
3553 # Check for team ID mismatch (only applies when both are specified and token has teams)
3554 if team_id is not None and token_team_id is not None and team_id != token_team_id and not is_empty_team_token:
3555 return ORJSONResponse(
3556 content={"message": "Access issue: This API token does not have the required permissions for this team."},
3557 status_code=status.HTTP_403_FORBIDDEN,
3558 )
3560 # Determine final team ID - don't use token_team_id for empty-team tokens
3561 # Empty-team tokens should filter by public + owned, not by personal team
3562 if not is_empty_team_token:
3563 team_id = team_id or token_team_id
3565 # Use unified list_tools() with token-based team filtering
3566 # Always apply visibility filtering based on token scope
3567 _req_team_roles = get_user_team_roles(db, _req_email) if _req_email and not _req_is_admin else None
3568 data, next_cursor = await tool_service.list_tools(
3569 db=db,
3570 cursor=cursor,
3571 include_inactive=include_inactive,
3572 tags=tags_list,
3573 gateway_id=gateway_id,
3574 limit=limit,
3575 user_email=user_email,
3576 team_id=team_id,
3577 visibility=visibility,
3578 token_teams=token_teams,
3579 requesting_user_email=_req_email,
3580 requesting_user_is_admin=_req_is_admin,
3581 requesting_user_team_roles=_req_team_roles,
3582 )
3583 # Release transaction before response serialization
3584 db.commit()
3585 db.close()
3587 if apijsonpath is None:
3588 if include_pagination:
3589 payload = {"tools": [tool.model_dump(by_alias=True) for tool in data]}
3590 if next_cursor: 3590 ↛ 3592line 3590 didn't jump to line 3592 because the condition on line 3590 was always true
3591 payload["nextCursor"] = next_cursor
3592 return payload
3593 return data
3595 tools_dict_list = [tool.to_dict(use_alias=True) for tool in data]
3597 return jsonpath_modifier(tools_dict_list, apijsonpath.jsonpath, apijsonpath.mapping)
3600@tool_router.post("", response_model=ToolRead)
3601@tool_router.post("/", response_model=ToolRead)
3602@require_permission("tools.create")
3603async def create_tool(
3604 tool: ToolCreate,
3605 request: Request,
3606 team_id: Optional[str] = Body(None, description="Team ID to assign tool to"),
3607 db: Session = Depends(get_db),
3608 user=Depends(get_current_user_with_permissions),
3609) -> ToolRead:
3610 """
3611 Creates a new tool in the system with team assignment support.
3613 Args:
3614 tool (ToolCreate): The data needed to create the tool.
3615 request (Request): The FastAPI request object for metadata extraction.
3616 team_id (Optional[str]): Team ID to assign the tool to.
3617 db (Session): The database session dependency.
3618 user: The authenticated user making the request.
3620 Returns:
3621 ToolRead: The created tool data.
3623 Raises:
3624 HTTPException: If the tool name already exists or other validation errors occur.
3625 """
3626 try:
3627 # Extract metadata from request
3628 metadata = MetadataCapture.extract_creation_metadata(request, user)
3630 # Get user email and handle team assignment
3631 user_email = get_user_email(user)
3633 token_team_id = getattr(request.state, "team_id", None)
3634 token_teams = getattr(request.state, "token_teams", None)
3636 # SECURITY: Public-only tokens (teams == []) cannot create team/private resources
3637 is_public_only_token = token_teams is not None and len(token_teams) == 0
3638 if is_public_only_token and tool.visibility in ("team", "private"):
3639 return ORJSONResponse(
3640 content={"message": "Public-only tokens cannot create team or private resources. Use visibility='public' or obtain a team-scoped token."},
3641 status_code=status.HTTP_403_FORBIDDEN,
3642 )
3644 # Check for team ID mismatch (only for non-public-only tokens)
3645 if not is_public_only_token and team_id is not None and token_team_id is not None and team_id != token_team_id:
3646 return ORJSONResponse(
3647 content={"message": "Access issue: This API token does not have the required permissions for this team."},
3648 status_code=status.HTTP_403_FORBIDDEN,
3649 )
3651 # Determine final team ID (public-only tokens get no team)
3652 if is_public_only_token:
3653 team_id = None
3654 else:
3655 team_id = team_id or token_team_id
3657 logger.debug(f"User {user_email} is creating a new tool for team {team_id}")
3658 result = await tool_service.register_tool(
3659 db,
3660 tool,
3661 created_by=metadata["created_by"],
3662 created_from_ip=metadata["created_from_ip"],
3663 created_via=metadata["created_via"],
3664 created_user_agent=metadata["created_user_agent"],
3665 import_batch_id=metadata["import_batch_id"],
3666 federation_source=metadata["federation_source"],
3667 team_id=team_id,
3668 owner_email=user_email,
3669 visibility=tool.visibility,
3670 )
3671 db.commit()
3672 db.close()
3673 return result
3674 except Exception as ex:
3675 logger.error(f"Error while creating tool: {ex}")
3676 if isinstance(ex, ToolNameConflictError):
3677 if not ex.enabled and ex.tool_id:
3678 raise HTTPException(
3679 status_code=status.HTTP_409_CONFLICT,
3680 detail=f"Tool name already exists but is inactive. Consider activating it with ID: {ex.tool_id}",
3681 )
3682 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(ex))
3683 if isinstance(ex, (ValidationError, ValueError)):
3684 logger.error(f"Validation error while creating tool: {ex}")
3685 raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail=ErrorFormatter.format_validation_error(ex))
3686 if isinstance(ex, IntegrityError):
3687 logger.error(f"Integrity error while creating tool: {ex}")
3688 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=ErrorFormatter.format_database_error(ex))
3689 if isinstance(ex, ToolError):
3690 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(ex))
3691 logger.error(f"Unexpected error while creating tool: {ex}")
3692 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred while creating the tool")
3695@tool_router.get("/{tool_id}", response_model=Union[ToolRead, Dict])
3696@require_permission("tools.read")
3697async def get_tool(
3698 tool_id: str,
3699 request: Request,
3700 db: Session = Depends(get_db),
3701 user=Depends(get_current_user_with_permissions),
3702 apijsonpath: JsonPathModifier = Body(None),
3703) -> Union[ToolRead, Dict]:
3704 """
3705 Retrieve a tool by ID, optionally applying a JSONPath post-filter.
3707 Args:
3708 tool_id: The numeric ID of the tool.
3709 request: The incoming HTTP request.
3710 db: Active SQLAlchemy session (dependency).
3711 user: Authenticated username (dependency).
3712 apijsonpath: Optional JSON-Path modifier supplied in the body.
3714 Returns:
3715 The raw ``ToolRead`` model **or** a JSON-transformed ``dict`` if
3716 a JSONPath filter/mapping was supplied.
3718 Raises:
3719 HTTPException: If the tool does not exist or the transformation fails.
3720 """
3721 try:
3722 logger.debug(f"User {user} is retrieving tool with ID {tool_id}")
3723 _req_email, _, _req_is_admin = _get_rpc_filter_context(request, user)
3724 _req_team_roles = get_user_team_roles(db, _req_email) if _req_email and not _req_is_admin else None
3725 data = await tool_service.get_tool(db, tool_id, requesting_user_email=_req_email, requesting_user_is_admin=_req_is_admin, requesting_user_team_roles=_req_team_roles)
3726 if apijsonpath is None:
3727 return data
3729 data_dict = data.to_dict(use_alias=True)
3731 return jsonpath_modifier(data_dict, apijsonpath.jsonpath, apijsonpath.mapping)
3732 except Exception as e:
3733 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
3736@tool_router.put("/{tool_id}", response_model=ToolRead)
3737@require_permission("tools.update")
3738async def update_tool(
3739 tool_id: str,
3740 tool: ToolUpdate,
3741 request: Request,
3742 db: Session = Depends(get_db),
3743 user=Depends(get_current_user_with_permissions),
3744) -> ToolRead:
3745 """
3746 Updates an existing tool with new data.
3748 Args:
3749 tool_id (str): The ID of the tool to update.
3750 tool (ToolUpdate): The updated tool information.
3751 request (Request): The FastAPI request object for metadata extraction.
3752 db (Session): The database session dependency.
3753 user (str): The authenticated user making the request.
3755 Returns:
3756 ToolRead: The updated tool data.
3758 Raises:
3759 HTTPException: If an error occurs during the update.
3760 """
3761 try:
3762 # Get current tool to extract current version
3763 current_tool = db.get(DbTool, tool_id)
3764 current_version = getattr(current_tool, "version", 0) if current_tool else 0
3766 # Extract modification metadata
3767 mod_metadata = MetadataCapture.extract_modification_metadata(request, user, current_version)
3769 logger.debug(f"User {user} is updating tool with ID {tool_id}")
3770 user_email = user.get("email") if isinstance(user, dict) else str(user)
3771 result = await tool_service.update_tool(
3772 db,
3773 tool_id,
3774 tool,
3775 modified_by=mod_metadata["modified_by"],
3776 modified_from_ip=mod_metadata["modified_from_ip"],
3777 modified_via=mod_metadata["modified_via"],
3778 modified_user_agent=mod_metadata["modified_user_agent"],
3779 user_email=user_email,
3780 )
3781 db.commit()
3782 db.close()
3783 return result
3784 except Exception as ex:
3785 if isinstance(ex, PermissionError):
3786 raise HTTPException(status_code=403, detail=str(ex))
3787 if isinstance(ex, ToolNotFoundError):
3788 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(ex))
3789 if isinstance(ex, ValidationError):
3790 logger.error(f"Validation error while updating tool: {ex}")
3791 raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail=ErrorFormatter.format_validation_error(ex))
3792 if isinstance(ex, IntegrityError):
3793 logger.error(f"Integrity error while updating tool: {ex}")
3794 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=ErrorFormatter.format_database_error(ex))
3795 if isinstance(ex, ToolError):
3796 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(ex))
3797 logger.error(f"Unexpected error while updating tool: {ex}")
3798 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred while updating the tool")
3801@tool_router.delete("/{tool_id}")
3802@require_permission("tools.delete")
3803async def delete_tool(
3804 tool_id: str,
3805 purge_metrics: bool = Query(False, description="Purge raw + rollup metrics for this tool"),
3806 db: Session = Depends(get_db),
3807 user=Depends(get_current_user_with_permissions),
3808) -> Dict[str, str]:
3809 """
3810 Permanently deletes a tool by ID.
3812 Args:
3813 tool_id (str): The ID of the tool to delete.
3814 purge_metrics (bool): Whether to delete raw + hourly rollup metrics for this tool.
3815 db (Session): The database session dependency.
3816 user (str): The authenticated user making the request.
3818 Returns:
3819 Dict[str, str]: A confirmation message upon successful deletion.
3821 Raises:
3822 HTTPException: If an error occurs during deletion.
3823 """
3824 try:
3825 logger.debug(f"User {user} is deleting tool with ID {tool_id}")
3826 user_email = user.get("email") if isinstance(user, dict) else str(user)
3827 await tool_service.delete_tool(db, tool_id, user_email=user_email, purge_metrics=purge_metrics)
3828 db.commit()
3829 db.close()
3830 return {"status": "success", "message": f"Tool {tool_id} permanently deleted"}
3831 except PermissionError as e:
3832 raise HTTPException(status_code=403, detail=str(e))
3833 except ToolNotFoundError as e:
3834 raise HTTPException(status_code=404, detail=str(e))
3835 except Exception as e:
3836 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
3839@tool_router.post("/{tool_id}/state")
3840@require_permission("tools.update")
3841async def set_tool_state(
3842 tool_id: str,
3843 activate: bool = True,
3844 db: Session = Depends(get_db),
3845 user=Depends(get_current_user_with_permissions),
3846) -> Dict[str, Any]:
3847 """
3848 Activates or deactivates a tool.
3850 Args:
3851 tool_id (str): The ID of the tool to update.
3852 activate (bool): Whether to activate (`True`) or deactivate (`False`) the tool.
3853 db (Session): The database session dependency.
3854 user (str): The authenticated user making the request.
3856 Returns:
3857 Dict[str, Any]: The status, message, and updated tool data.
3859 Raises:
3860 HTTPException: If an error occurs during state change.
3861 """
3862 try:
3863 logger.debug(f"User {user} is setting tool state for ID {tool_id} to {'active' if activate else 'inactive'}")
3864 user_email = user.get("email") if isinstance(user, dict) else str(user)
3865 tool = await tool_service.set_tool_state(db, tool_id, activate, reachable=activate, user_email=user_email)
3866 return {
3867 "status": "success",
3868 "message": f"Tool {tool_id} {'activated' if activate else 'deactivated'}",
3869 "tool": tool.model_dump(),
3870 }
3871 except PermissionError as e:
3872 raise HTTPException(status_code=403, detail=str(e))
3873 except ToolNotFoundError as e:
3874 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
3875 except ToolLockConflictError as e:
3876 raise HTTPException(status_code=409, detail=str(e))
3877 except Exception as e:
3878 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
3881@tool_router.post("/{tool_id}/toggle", deprecated=True)
3882@require_permission("tools.update")
3883async def toggle_tool_status(
3884 tool_id: str,
3885 activate: bool = True,
3886 db: Session = Depends(get_db),
3887 user=Depends(get_current_user_with_permissions),
3888) -> Dict[str, Any]:
3889 """DEPRECATED: Use /state endpoint instead. This endpoint will be removed in a future release.
3891 Activates or deactivates a tool.
3893 Args:
3894 tool_id: The tool ID.
3895 activate: Whether to activate (True) or deactivate (False) the tool.
3896 db: Database session.
3897 user: Authenticated user context.
3899 Returns:
3900 Status message with tool state.
3901 """
3903 warnings.warn("The /toggle endpoint is deprecated. Use /state instead.", DeprecationWarning, stacklevel=2)
3904 return await set_tool_state(tool_id, activate, db, user)
3907#################
3908# Resource APIs #
3909#################
3910# --- Resource templates endpoint - MUST come before variable paths ---
3911@resource_router.get("/templates/list", response_model=ListResourceTemplatesResult)
3912@require_permission("resources.read")
3913async def list_resource_templates(
3914 request: Request,
3915 db: Session = Depends(get_db),
3916 include_inactive: bool = False,
3917 tags: Optional[str] = None,
3918 visibility: Optional[str] = None,
3919 user=Depends(get_current_user_with_permissions),
3920) -> ListResourceTemplatesResult:
3921 """
3922 List all available resource templates.
3924 Args:
3925 request (Request): The FastAPI request object for team_id retrieval.
3926 db (Session): Database session.
3927 user (str): Authenticated user.
3928 include_inactive (bool): Whether to include inactive resources.
3929 tags (Optional[str]): Comma-separated list of tags to filter by.
3930 visibility (Optional[str]): Filter by visibility (private, team, public).
3932 Returns:
3933 ListResourceTemplatesResult: A paginated list of resource templates.
3934 """
3935 logger.info(f"User {user} requested resource templates")
3937 # Parse tags parameter if provided
3938 tags_list = None
3939 if tags:
3940 tags_list = [tag.strip() for tag in tags.split(",") if tag.strip()]
3942 # Get filtering context from token (respects token scope)
3943 user_email, token_teams, is_admin = _get_rpc_filter_context(request, user)
3945 # Admin bypass - only when token has NO team restrictions
3946 if is_admin and token_teams is None:
3947 token_teams = None # Admin unrestricted
3948 elif token_teams is None:
3949 token_teams = [] # Non-admin without teams = public-only
3951 resource_templates = await resource_service.list_resource_templates(
3952 db,
3953 user_email=user_email,
3954 token_teams=token_teams,
3955 include_inactive=include_inactive,
3956 tags=tags_list,
3957 visibility=visibility,
3958 )
3959 # For simplicity, we're not implementing real pagination here
3960 return ListResourceTemplatesResult(_meta={}, resource_templates=resource_templates, next_cursor=None) # No pagination for now
3963@resource_router.post("/{resource_id}/state")
3964@require_permission("resources.update")
3965async def set_resource_state(
3966 resource_id: str,
3967 activate: bool = True,
3968 db: Session = Depends(get_db),
3969 user=Depends(get_current_user_with_permissions),
3970) -> Dict[str, Any]:
3971 """
3972 Activate or deactivate a resource by its ID.
3974 Args:
3975 resource_id (str): The ID of the resource.
3976 activate (bool): True to activate, False to deactivate.
3977 db (Session): Database session.
3978 user (str): Authenticated user.
3980 Returns:
3981 Dict[str, Any]: Status message and updated resource data.
3983 Raises:
3984 HTTPException: If toggling fails.
3985 """
3986 logger.debug(f"User {user} is toggling resource with ID {resource_id} to {'active' if activate else 'inactive'}")
3987 try:
3988 user_email = user.get("email") if isinstance(user, dict) else str(user)
3989 resource = await resource_service.set_resource_state(db, resource_id, activate, user_email=user_email)
3990 return {
3991 "status": "success",
3992 "message": f"Resource {resource_id} {'activated' if activate else 'deactivated'}",
3993 "resource": resource.model_dump(),
3994 }
3995 except PermissionError as e:
3996 raise HTTPException(status_code=403, detail=str(e))
3997 except ResourceNotFoundError as e:
3998 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
3999 except ResourceLockConflictError as e:
4000 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
4001 except Exception as e:
4002 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
4005@resource_router.post("/{resource_id}/toggle", deprecated=True)
4006@require_permission("resources.update")
4007async def toggle_resource_status(
4008 resource_id: str,
4009 activate: bool = True,
4010 db: Session = Depends(get_db),
4011 user=Depends(get_current_user_with_permissions),
4012) -> Dict[str, Any]:
4013 """DEPRECATED: Use /state endpoint instead. This endpoint will be removed in a future release.
4015 Activate or deactivate a resource by its ID.
4017 Args:
4018 resource_id: The resource ID.
4019 activate: Whether to activate (True) or deactivate (False) the resource.
4020 db: Database session.
4021 user: Authenticated user context.
4023 Returns:
4024 Status message with resource state.
4025 """
4027 warnings.warn("The /toggle endpoint is deprecated. Use /state instead.", DeprecationWarning, stacklevel=2)
4028 return await set_resource_state(resource_id, activate, db, user)
4031@resource_router.get("", response_model=Union[List[ResourceRead], CursorPaginatedResourcesResponse])
4032@resource_router.get("/", response_model=Union[List[ResourceRead], CursorPaginatedResourcesResponse])
4033@require_permission("resources.read")
4034async def list_resources(
4035 request: Request,
4036 cursor: Optional[str] = Query(None, description="Cursor for pagination"),
4037 include_pagination: bool = Query(False, description="Include cursor pagination metadata in response"),
4038 limit: Optional[int] = Query(None, ge=0, description="Maximum number of resources to return"),
4039 include_inactive: bool = False,
4040 tags: Optional[str] = None,
4041 team_id: Optional[str] = None,
4042 visibility: Optional[str] = None,
4043 db: Session = Depends(get_db),
4044 user=Depends(get_current_user_with_permissions),
4045) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
4046 """
4047 Retrieve a list of resources accessible to the user, with team filtering and cursor pagination support.
4049 Args:
4050 request (Request): The FastAPI request object for team_id retrieval
4051 cursor (Optional[str]): Cursor for pagination.
4052 include_pagination (bool): Include cursor pagination metadata in response.
4053 limit (Optional[int]): Maximum number of resources to return.
4054 include_inactive (bool): Whether to include inactive resources.
4055 tags (Optional[str]): Comma-separated list of tags to filter by.
4056 team_id (Optional[str]): Filter by specific team ID.
4057 visibility (Optional[str]): Filter by visibility (private, team, public).
4058 db (Session): Database session.
4059 user (str): Authenticated user.
4061 Returns:
4062 Union[List[ResourceRead], Dict[str, Any]]: List of resources or paginated response with nextCursor.
4063 """
4064 # Parse tags parameter if provided
4065 tags_list = None
4066 if tags:
4067 tags_list = [tag.strip() for tag in tags.split(",") if tag.strip()]
4069 # Get filtering context from token (respects token scope)
4070 user_email, token_teams, is_admin = _get_rpc_filter_context(request, user)
4072 # Admin bypass - only when token has NO team restrictions (token_teams is None)
4073 # If token has explicit team scope (even for admins), respect it for least-privilege
4074 if is_admin and token_teams is None:
4075 user_email = None
4076 token_teams = None # Admin unrestricted
4077 elif token_teams is None:
4078 token_teams = [] # Non-admin without teams = public-only (secure default)
4080 # Check team_id from request.state (set during auth)
4081 # Only use for non-empty-team tokens; empty-team tokens should rely on visibility filtering
4082 token_team_id = getattr(request.state, "team_id", None)
4083 is_empty_team_token = token_teams is not None and len(token_teams) == 0
4085 # Check for team ID mismatch (only applies when both are specified and token has teams)
4086 if team_id is not None and token_team_id is not None and team_id != token_team_id and not is_empty_team_token:
4087 return ORJSONResponse(
4088 content={"message": "Access issue: This API token does not have the required permissions for this team."},
4089 status_code=status.HTTP_403_FORBIDDEN,
4090 )
4092 # Determine final team ID - don't use token_team_id for empty-team tokens
4093 # Empty-team tokens should filter by public + owned, not by personal team
4094 if not is_empty_team_token:
4095 team_id = team_id or token_team_id
4097 # Use unified list_resources() with token-based team filtering
4098 # Always apply visibility filtering based on token scope
4099 logger.debug(f"User {user_email} requested resource list with cursor {cursor}, include_inactive={include_inactive}, tags={tags_list}, team_id={team_id}, visibility={visibility}")
4100 data, next_cursor = await resource_service.list_resources(
4101 db=db,
4102 cursor=cursor,
4103 limit=limit,
4104 include_inactive=include_inactive,
4105 tags=tags_list,
4106 user_email=user_email,
4107 team_id=team_id,
4108 visibility=visibility,
4109 token_teams=token_teams,
4110 )
4111 # Release transaction before response serialization
4112 db.commit()
4113 db.close()
4115 if include_pagination:
4116 payload = {"resources": [resource.model_dump(by_alias=True) if hasattr(resource, "model_dump") else resource for resource in data]}
4117 if next_cursor:
4118 payload["nextCursor"] = next_cursor
4119 return payload
4120 return data
4123@resource_router.post("", response_model=ResourceRead)
4124@resource_router.post("/", response_model=ResourceRead)
4125@require_permission("resources.create")
4126async def create_resource(
4127 resource: ResourceCreate,
4128 request: Request,
4129 team_id: Optional[str] = Body(None, description="Team ID to assign resource to"),
4130 visibility: Optional[str] = Body("public", description="Resource visibility: private, team, public"),
4131 db: Session = Depends(get_db),
4132 user=Depends(get_current_user_with_permissions),
4133) -> ResourceRead:
4134 """
4135 Create a new resource.
4137 Args:
4138 resource (ResourceCreate): Data for the new resource.
4139 request (Request): FastAPI request object for metadata extraction.
4140 team_id (Optional[str]): Team ID to assign the resource to.
4141 visibility (str): Resource visibility level (private, team, public).
4142 db (Session): Database session.
4143 user (str): Authenticated user.
4145 Returns:
4146 ResourceRead: The created resource.
4148 Raises:
4149 HTTPException: On conflict or validation errors or IntegrityError.
4150 """
4151 try:
4152 # Extract metadata from request
4153 metadata = MetadataCapture.extract_creation_metadata(request, user)
4155 # Get user email and handle team assignment
4156 user_email = get_user_email(user)
4158 token_team_id = getattr(request.state, "team_id", None)
4159 token_teams = getattr(request.state, "token_teams", None)
4161 # SECURITY: Public-only tokens (teams == []) cannot create team/private resources
4162 is_public_only_token = token_teams is not None and len(token_teams) == 0
4163 if is_public_only_token and visibility in ("team", "private"):
4164 return ORJSONResponse(
4165 content={"message": "Public-only tokens cannot create team or private resources. Use visibility='public' or obtain a team-scoped token."},
4166 status_code=status.HTTP_403_FORBIDDEN,
4167 )
4169 # Check for team ID mismatch (only for non-public-only tokens)
4170 if not is_public_only_token and team_id is not None and token_team_id is not None and team_id != token_team_id:
4171 return ORJSONResponse(
4172 content={"message": "Access issue: This API token does not have the required permissions for this team."},
4173 status_code=status.HTTP_403_FORBIDDEN,
4174 )
4176 # Determine final team ID (public-only tokens get no team)
4177 if is_public_only_token:
4178 team_id = None
4179 else:
4180 team_id = team_id or token_team_id
4182 logger.debug(f"User {user_email} is creating a new resource for team {team_id}")
4183 result = await resource_service.register_resource(
4184 db,
4185 resource,
4186 created_by=metadata["created_by"],
4187 created_from_ip=metadata["created_from_ip"],
4188 created_via=metadata["created_via"],
4189 created_user_agent=metadata["created_user_agent"],
4190 import_batch_id=metadata["import_batch_id"],
4191 federation_source=metadata["federation_source"],
4192 team_id=team_id,
4193 owner_email=user_email,
4194 visibility=visibility,
4195 )
4196 db.commit()
4197 db.close()
4198 return result
4199 except ResourceURIConflictError as e:
4200 raise HTTPException(status_code=409, detail=str(e))
4201 except ResourceError as e:
4202 raise HTTPException(status_code=400, detail=str(e))
4203 except ValidationError as e:
4204 # Handle validation errors from Pydantic
4205 logger.error(f"Validation error while creating resource: {e}")
4206 raise HTTPException(status_code=422, detail=ErrorFormatter.format_validation_error(e))
4207 except IntegrityError as e:
4208 logger.error(f"Integrity error while creating resource: {e}")
4209 raise HTTPException(status_code=409, detail=ErrorFormatter.format_database_error(e))
4212@resource_router.get("/{resource_id}")
4213@require_permission("resources.read")
4214async def read_resource(resource_id: str, request: Request, db: Session = Depends(get_db), user=Depends(get_current_user_with_permissions)) -> Any:
4215 """
4216 Read a resource by its ID with plugin support.
4218 Args:
4219 resource_id (str): ID of the resource.
4220 request (Request): FastAPI request object for context.
4221 db (Session): Database session.
4222 user (str): Authenticated user.
4224 Returns:
4225 Any: The content of the resource.
4227 Raises:
4228 HTTPException: If the resource cannot be found or read.
4229 """
4230 # Get request ID from headers or generate one
4231 request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
4232 server_id = request.headers.get("X-Server-ID")
4234 logger.debug(f"User {user} requested resource with ID {resource_id} (request_id: {request_id})")
4236 # NOTE: Removed endpoint-level cache to prevent authorization bypass
4237 # The cache was checked before access control, allowing unauthorized users
4238 # to access cached private resources. Service layer handles caching safely.
4240 # Get plugin contexts from request.state for cross-hook sharing
4241 plugin_context_table = getattr(request.state, "plugin_context_table", None)
4242 plugin_global_context = getattr(request.state, "plugin_global_context", None)
4244 try:
4245 # Extract user email and admin status for authorization
4246 user_email = get_user_email(user)
4247 is_admin = user.get("is_admin", False) if isinstance(user, dict) else False
4249 # Admin bypass: pass user=None to trigger unrestricted access
4250 # Non-admin: pass user_email and let service look up teams
4251 auth_user_email = None if is_admin else user_email
4253 # Call service with context for plugin support
4254 content = await resource_service.read_resource(
4255 db,
4256 resource_id=resource_id,
4257 request_id=request_id,
4258 user=auth_user_email,
4259 server_id=server_id,
4260 token_teams=None, # Admin: bypass; Non-admin: lookup teams
4261 plugin_context_table=plugin_context_table,
4262 plugin_global_context=plugin_global_context,
4263 )
4264 # Release transaction before response serialization
4265 db.commit()
4266 db.close()
4267 except (ResourceNotFoundError, ResourceError) as exc:
4268 # Translate to FastAPI HTTP error
4269 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
4271 # NOTE: Removed cache.set() - see cache removal comment above
4272 # Ensure a plain JSON-serializable structure
4273 try:
4274 # First-Party
4275 from mcpgateway.common.models import ResourceContent, TextContent # pylint: disable=import-outside-toplevel
4277 # If already a ResourceContent, serialize directly
4278 if isinstance(content, ResourceContent):
4279 return content.model_dump()
4281 # If TextContent, wrap into resource envelope with text
4282 if isinstance(content, TextContent):
4283 return {"type": "resource", "id": resource_id, "uri": content.uri, "text": content.text}
4284 except Exception:
4285 pass # nosec B110 - Intentionally continue with fallback resource content handling
4287 if isinstance(content, bytes):
4288 return {"type": "resource", "id": resource_id, "uri": content.uri, "blob": content.decode("utf-8", errors="ignore")}
4289 if isinstance(content, str):
4290 return {"type": "resource", "id": resource_id, "uri": content.uri, "text": content}
4292 # Objects with a 'text' attribute (e.g., mocks) – best-effort mapping
4293 if hasattr(content, "text"):
4294 return {"type": "resource", "id": resource_id, "uri": content.uri, "text": getattr(content, "text")}
4296 return {"type": "resource", "id": resource_id, "uri": content.uri, "text": str(content)}
4299@resource_router.get("/{resource_id}/info", response_model=ResourceRead)
4300@require_permission("resources.read")
4301async def get_resource_info(
4302 resource_id: str,
4303 include_inactive: bool = Query(False, description="Include inactive resources"),
4304 db: Session = Depends(get_db),
4305 user=Depends(get_current_user_with_permissions),
4306) -> ResourceRead:
4307 """
4308 Get resource metadata by ID.
4310 Returns the resource metadata including the enabled status. This endpoint
4311 is different from GET /resources/{resource_id} which returns the resource content.
4313 Args:
4314 resource_id (str): ID of the resource.
4315 include_inactive (bool): Whether to include inactive resources.
4316 db (Session): Database session.
4317 user (str): Authenticated user.
4319 Returns:
4320 ResourceRead: The resource metadata including enabled status.
4322 Raises:
4323 HTTPException: If the resource is not found.
4324 """
4325 try:
4326 logger.debug(f"User {user} requested resource info for ID {resource_id}")
4327 return await resource_service.get_resource_by_id(db, resource_id, include_inactive=include_inactive)
4328 except ResourceNotFoundError as e:
4329 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
4332@resource_router.put("/{resource_id}", response_model=ResourceRead)
4333@require_permission("resources.update")
4334async def update_resource(
4335 resource_id: str,
4336 resource: ResourceUpdate,
4337 request: Request,
4338 db: Session = Depends(get_db),
4339 user=Depends(get_current_user_with_permissions),
4340) -> ResourceRead:
4341 """
4342 Update a resource identified by its ID.
4344 Args:
4345 resource_id (str): ID of the resource.
4346 resource (ResourceUpdate): New resource data.
4347 request (Request): The FastAPI request object for metadata extraction.
4348 db (Session): Database session.
4349 user (str): Authenticated user.
4351 Returns:
4352 ResourceRead: The updated resource.
4354 Raises:
4355 HTTPException: If the resource is not found or update fails.
4356 """
4357 try:
4358 logger.debug(f"User {user} is updating resource with ID {resource_id}")
4359 # Extract modification metadata
4360 mod_metadata = MetadataCapture.extract_modification_metadata(request, user, 0) # Version will be incremented in service
4362 user_email = user.get("email") if isinstance(user, dict) else str(user)
4363 result = await resource_service.update_resource(
4364 db,
4365 resource_id,
4366 resource,
4367 modified_by=mod_metadata["modified_by"],
4368 modified_from_ip=mod_metadata["modified_from_ip"],
4369 modified_via=mod_metadata["modified_via"],
4370 modified_user_agent=mod_metadata["modified_user_agent"],
4371 user_email=user_email,
4372 )
4373 except PermissionError as e:
4374 raise HTTPException(status_code=403, detail=str(e))
4375 except ResourceNotFoundError as e:
4376 raise HTTPException(status_code=404, detail=str(e))
4377 except ValidationError as e:
4378 logger.error(f"Validation error while updating resource {resource_id}: {e}")
4379 raise HTTPException(status_code=422, detail=ErrorFormatter.format_validation_error(e))
4380 except IntegrityError as e:
4381 logger.error(f"Integrity error while updating resource {resource_id}: {e}")
4382 raise HTTPException(status_code=409, detail=ErrorFormatter.format_database_error(e))
4383 except ResourceURIConflictError as e:
4384 raise HTTPException(status_code=409, detail=str(e))
4385 db.commit()
4386 db.close()
4387 await invalidate_resource_cache(resource_id)
4388 return result
4391@resource_router.delete("/{resource_id}")
4392@require_permission("resources.delete")
4393async def delete_resource(
4394 resource_id: str,
4395 purge_metrics: bool = Query(False, description="Purge raw + rollup metrics for this resource"),
4396 db: Session = Depends(get_db),
4397 user=Depends(get_current_user_with_permissions),
4398) -> Dict[str, str]:
4399 """
4400 Delete a resource by its ID.
4402 Args:
4403 resource_id (str): ID of the resource to delete.
4404 purge_metrics (bool): Whether to delete raw + hourly rollup metrics for this resource.
4405 db (Session): Database session.
4406 user (str): Authenticated user.
4408 Returns:
4409 Dict[str, str]: Status message indicating deletion success.
4411 Raises:
4412 HTTPException: If the resource is not found or deletion fails.
4413 """
4414 try:
4415 logger.debug(f"User {user} is deleting resource with id {resource_id}")
4416 user_email = user.get("email") if isinstance(user, dict) else str(user)
4417 await resource_service.delete_resource(db, resource_id, user_email=user_email, purge_metrics=purge_metrics)
4418 db.commit()
4419 db.close()
4420 await invalidate_resource_cache(resource_id)
4421 return {"status": "success", "message": f"Resource {resource_id} deleted"}
4422 except PermissionError as e:
4423 raise HTTPException(status_code=403, detail=str(e))
4424 except ResourceNotFoundError as e:
4425 raise HTTPException(status_code=404, detail=str(e))
4426 except ResourceError as e:
4427 raise HTTPException(status_code=400, detail=str(e))
4430@resource_router.post("/subscribe")
4431@require_permission("resources.read")
4432async def subscribe_resource(user=Depends(get_current_user_with_permissions)) -> StreamingResponse:
4433 """
4434 Subscribe to server-sent events (SSE) for a specific resource.
4436 Args:
4437 user (str): Authenticated user.
4439 Returns:
4440 StreamingResponse: A streaming response with event updates.
4441 """
4442 logger.debug(f"User {user} is subscribing to resource")
4443 return StreamingResponse(resource_service.subscribe_events(), media_type="text/event-stream")
4446###############
4447# Prompt APIs #
4448###############
4449@prompt_router.post("/{prompt_id}/state")
4450@require_permission("prompts.update")
4451async def set_prompt_state(
4452 prompt_id: str,
4453 activate: bool = True,
4454 db: Session = Depends(get_db),
4455 user=Depends(get_current_user_with_permissions),
4456) -> Dict[str, Any]:
4457 """
4458 Set the activation status of a prompt.
4460 Args:
4461 prompt_id: ID of the prompt to update.
4462 activate: True to activate, False to deactivate.
4463 db: Database session.
4464 user: Authenticated user.
4466 Returns:
4467 Status message and updated prompt details.
4469 Raises:
4470 HTTPException: If the state change fails (e.g., prompt not found or database error); emitted with *400 Bad Request* status and an error message.
4471 """
4472 logger.debug(f"User: {user} requested state change for prompt {prompt_id}, activate={activate}")
4473 try:
4474 user_email = user.get("email") if isinstance(user, dict) else str(user)
4475 prompt = await prompt_service.set_prompt_state(db, prompt_id, activate, user_email=user_email)
4476 return {
4477 "status": "success",
4478 "message": f"Prompt {prompt_id} {'activated' if activate else 'deactivated'}",
4479 "prompt": prompt.model_dump(),
4480 }
4481 except PermissionError as e:
4482 raise HTTPException(status_code=403, detail=str(e))
4483 except PromptNotFoundError as e:
4484 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
4485 except PromptLockConflictError as e:
4486 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
4487 except Exception as e:
4488 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
4491@prompt_router.post("/{prompt_id}/toggle", deprecated=True)
4492@require_permission("prompts.update")
4493async def toggle_prompt_status(
4494 prompt_id: str,
4495 activate: bool = True,
4496 db: Session = Depends(get_db),
4497 user=Depends(get_current_user_with_permissions),
4498) -> Dict[str, Any]:
4499 """DEPRECATED: Use /state endpoint instead. This endpoint will be removed in a future release.
4501 Set the activation status of a prompt.
4503 Args:
4504 prompt_id: The prompt ID.
4505 activate: Whether to activate (True) or deactivate (False) the prompt.
4506 db: Database session.
4507 user: Authenticated user context.
4509 Returns:
4510 Status message with prompt state.
4511 """
4513 warnings.warn("The /toggle endpoint is deprecated. Use /state instead.", DeprecationWarning, stacklevel=2)
4514 return await set_prompt_state(prompt_id, activate, db, user)
4517@prompt_router.get("", response_model=Union[List[PromptRead], CursorPaginatedPromptsResponse])
4518@prompt_router.get("/", response_model=Union[List[PromptRead], CursorPaginatedPromptsResponse])
4519@require_permission("prompts.read")
4520async def list_prompts(
4521 request: Request,
4522 cursor: Optional[str] = Query(None, description="Cursor for pagination"),
4523 include_pagination: bool = Query(False, description="Include cursor pagination metadata in response"),
4524 limit: Optional[int] = Query(None, ge=0, description="Maximum number of prompts to return"),
4525 include_inactive: bool = False,
4526 tags: Optional[str] = None,
4527 team_id: Optional[str] = None,
4528 visibility: Optional[str] = None,
4529 db: Session = Depends(get_db),
4530 user=Depends(get_current_user_with_permissions),
4531) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
4532 """
4533 List prompts accessible to the user, with team filtering and cursor pagination support.
4535 Args:
4536 request (Request): The FastAPI request object for team_id retrieval
4537 cursor (Optional[str]): Cursor for pagination.
4538 include_pagination (bool): Include cursor pagination metadata in response.
4539 limit (Optional[int]): Maximum number of prompts to return.
4540 include_inactive: Include inactive prompts.
4541 tags: Comma-separated list of tags to filter by.
4542 team_id: Filter by specific team ID.
4543 visibility: Filter by visibility (private, team, public).
4544 db: Database session.
4545 user: Authenticated user.
4547 Returns:
4548 Union[List[Dict[str, Any]], Dict[str, Any]]: List of prompt records or paginated response with nextCursor.
4549 """
4550 # Parse tags parameter if provided
4551 tags_list = None
4552 if tags:
4553 tags_list = [tag.strip() for tag in tags.split(",") if tag.strip()]
4555 # Get filtering context from token (respects token scope)
4556 user_email, token_teams, is_admin = _get_rpc_filter_context(request, user)
4558 # Admin bypass - only when token has NO team restrictions (token_teams is None)
4559 # If token has explicit team scope (even for admins), respect it for least-privilege
4560 if is_admin and token_teams is None:
4561 user_email = None
4562 token_teams = None # Admin unrestricted
4563 elif token_teams is None:
4564 token_teams = [] # Non-admin without teams = public-only (secure default)
4566 # Check team_id from request.state (set during auth)
4567 # Only use for non-empty-team tokens; empty-team tokens should rely on visibility filtering
4568 token_team_id = getattr(request.state, "team_id", None)
4569 is_empty_team_token = token_teams is not None and len(token_teams) == 0
4571 # Check for team ID mismatch (only applies when both are specified and token has teams)
4572 if team_id is not None and token_team_id is not None and team_id != token_team_id and not is_empty_team_token:
4573 return ORJSONResponse(
4574 content={"message": "Access issue: This API token does not have the required permissions for this team."},
4575 status_code=status.HTTP_403_FORBIDDEN,
4576 )
4578 # Determine final team ID - don't use token_team_id for empty-team tokens
4579 # Empty-team tokens should filter by public + owned, not by personal team
4580 if not is_empty_team_token:
4581 team_id = team_id or token_team_id
4583 # Use consolidated prompt listing with token-based team filtering
4584 # Always apply visibility filtering based on token scope
4585 logger.debug(f"User: {user_email} requested prompt list with include_inactive={include_inactive}, cursor={cursor}, tags={tags_list}, team_id={team_id}, visibility={visibility}")
4586 data, next_cursor = await prompt_service.list_prompts(
4587 db=db,
4588 cursor=cursor,
4589 limit=limit,
4590 include_inactive=include_inactive,
4591 tags=tags_list,
4592 user_email=user_email,
4593 team_id=team_id,
4594 visibility=visibility,
4595 token_teams=token_teams,
4596 )
4597 # Release transaction before response serialization
4598 db.commit()
4599 db.close()
4601 if include_pagination:
4602 payload = {"prompts": [prompt.model_dump(by_alias=True) if hasattr(prompt, "model_dump") else prompt for prompt in data]}
4603 if next_cursor: 4603 ↛ 4605line 4603 didn't jump to line 4605 because the condition on line 4603 was always true
4604 payload["nextCursor"] = next_cursor
4605 return payload
4606 return data
4609@prompt_router.post("", response_model=PromptRead)
4610@prompt_router.post("/", response_model=PromptRead)
4611@require_permission("prompts.create")
4612async def create_prompt(
4613 prompt: PromptCreate,
4614 request: Request,
4615 team_id: Optional[str] = Body(None, description="Team ID to assign prompt to"),
4616 visibility: Optional[str] = Body("public", description="Prompt visibility: private, team, public"),
4617 db: Session = Depends(get_db),
4618 user=Depends(get_current_user_with_permissions),
4619) -> PromptRead:
4620 """
4621 Create a new prompt.
4623 Args:
4624 prompt (PromptCreate): Payload describing the prompt to create.
4625 request (Request): The FastAPI request object for metadata extraction.
4626 team_id (Optional[str]): Team ID to assign the prompt to.
4627 visibility (str): Prompt visibility level (private, team, public).
4628 db (Session): Active SQLAlchemy session.
4629 user (str): Authenticated username.
4631 Returns:
4632 PromptRead: The newly-created prompt.
4634 Raises:
4635 HTTPException: * **409 Conflict** - another prompt with the same name already exists.
4636 * **400 Bad Request** - validation or persistence error raised
4637 by :pyclass:`~mcpgateway.services.prompt_service.PromptService`.
4638 """
4639 try:
4640 # Extract metadata from request
4641 metadata = MetadataCapture.extract_creation_metadata(request, user)
4643 # Get user email and handle team assignment
4644 user_email = get_user_email(user)
4646 token_team_id = getattr(request.state, "team_id", None)
4647 token_teams = getattr(request.state, "token_teams", None)
4649 # SECURITY: Public-only tokens (teams == []) cannot create team/private resources
4650 is_public_only_token = token_teams is not None and len(token_teams) == 0
4651 if is_public_only_token and visibility in ("team", "private"):
4652 return ORJSONResponse(
4653 content={"message": "Public-only tokens cannot create team or private resources. Use visibility='public' or obtain a team-scoped token."},
4654 status_code=status.HTTP_403_FORBIDDEN,
4655 )
4657 # Check for team ID mismatch (only for non-public-only tokens)
4658 if not is_public_only_token and team_id is not None and token_team_id is not None and team_id != token_team_id:
4659 return ORJSONResponse(
4660 content={"message": "Access issue: This API token does not have the required permissions for this team."},
4661 status_code=status.HTTP_403_FORBIDDEN,
4662 )
4664 # Determine final team ID (public-only tokens get no team)
4665 if is_public_only_token:
4666 team_id = None
4667 else:
4668 team_id = team_id or token_team_id
4670 logger.debug(f"User {user_email} is creating a new prompt for team {team_id}")
4671 result = await prompt_service.register_prompt(
4672 db,
4673 prompt,
4674 created_by=metadata["created_by"],
4675 created_from_ip=metadata["created_from_ip"],
4676 created_via=metadata["created_via"],
4677 created_user_agent=metadata["created_user_agent"],
4678 import_batch_id=metadata["import_batch_id"],
4679 federation_source=metadata["federation_source"],
4680 team_id=team_id,
4681 owner_email=user_email,
4682 visibility=visibility,
4683 )
4684 db.commit()
4685 db.close()
4686 return result
4687 except Exception as e:
4688 if isinstance(e, PromptNameConflictError):
4689 # If the prompt name already exists, return a 409 Conflict error
4690 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
4691 if isinstance(e, PromptError):
4692 # If there is a general prompt error, return a 400 Bad Request error
4693 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
4694 if isinstance(e, ValidationError):
4695 # If there is a validation error, return a 422 Unprocessable Entity error
4696 logger.error(f"Validation error while creating prompt: {e}")
4697 raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail=ErrorFormatter.format_validation_error(e))
4698 if isinstance(e, IntegrityError):
4699 # If there is an integrity error, return a 409 Conflict error
4700 logger.error(f"Integrity error while creating prompt: {e}")
4701 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=ErrorFormatter.format_database_error(e))
4702 # For any other unexpected errors, return a 500 Internal Server Error
4703 logger.error(f"Unexpected error while creating prompt: {e}")
4704 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred while creating the prompt")
4707@prompt_router.post("/{prompt_id}")
4708@require_permission("prompts.read")
4709async def get_prompt(
4710 request: Request,
4711 prompt_id: str,
4712 args: Dict[str, str] = Body({}),
4713 db: Session = Depends(get_db),
4714 user=Depends(get_current_user_with_permissions),
4715) -> Any:
4716 """Get a prompt by prompt_id with arguments.
4718 This implements the prompts/get functionality from the MCP spec,
4719 which requires a POST request with arguments in the body.
4722 Args:
4723 request: FastAPI request object.
4724 prompt_id: ID of the prompt.
4725 args: Template arguments.
4726 db: Database session.
4727 user: Authenticated user.
4729 Returns:
4730 Rendered prompt or metadata.
4732 Raises:
4733 Exception: Re-raised if not a handled exception type.
4734 """
4735 logger.debug(f"User: {user} requested prompt: {prompt_id} with args={args}")
4737 # Get plugin contexts from request.state for cross-hook sharing
4738 plugin_context_table = getattr(request.state, "plugin_context_table", None)
4739 plugin_global_context = getattr(request.state, "plugin_global_context", None)
4741 # Extract user email, admin status, and server_id for authorization
4742 user_email = get_user_email(user)
4743 is_admin = user.get("is_admin", False) if isinstance(user, dict) else False
4744 server_id = request.headers.get("X-Server-ID")
4746 # Admin bypass: pass user=None to trigger unrestricted access
4747 # Non-admin: pass user_email and let service look up teams
4748 auth_user_email = None if is_admin else user_email
4750 try:
4751 PromptExecuteArgs(args=args)
4752 result = await prompt_service.get_prompt(
4753 db,
4754 prompt_id,
4755 args,
4756 user=auth_user_email,
4757 server_id=server_id,
4758 token_teams=None, # Admin: bypass; Non-admin: lookup teams
4759 plugin_context_table=plugin_context_table,
4760 plugin_global_context=plugin_global_context,
4761 )
4762 logger.debug(f"Prompt execution successful for '{prompt_id}'")
4763 except Exception as ex:
4764 logger.error(f"Could not retrieve prompt {prompt_id}: {ex}")
4765 if isinstance(ex, PluginViolationError):
4766 # Return the actual plugin violation message
4767 return ORJSONResponse(content={"message": ex.message, "details": str(ex.violation) if hasattr(ex, "violation") else None}, status_code=422)
4768 if isinstance(ex, (ValueError, PromptError)):
4769 # Return the actual error message
4770 return ORJSONResponse(content={"message": str(ex)}, status_code=422)
4771 raise
4773 return result
4776@prompt_router.get("/{prompt_id}")
4777@require_permission("prompts.read")
4778async def get_prompt_no_args(
4779 request: Request,
4780 prompt_id: str,
4781 db: Session = Depends(get_db),
4782 user=Depends(get_current_user_with_permissions),
4783) -> Any:
4784 """Get a prompt by ID without arguments.
4786 This endpoint is for convenience when no arguments are needed.
4788 Args:
4789 request: FastAPI request object.
4790 prompt_id: The ID of the prompt to retrieve
4791 db: Database session
4792 user: Authenticated user
4794 Returns:
4795 The prompt template information
4797 Raises:
4798 HTTPException: 404 if prompt not found, 403 if permission denied.
4799 """
4800 logger.debug(f"User: {user} requested prompt: {prompt_id} with no arguments")
4802 # Get plugin contexts from request.state for cross-hook sharing
4803 plugin_context_table = getattr(request.state, "plugin_context_table", None)
4804 plugin_global_context = getattr(request.state, "plugin_global_context", None)
4806 # Extract user email, admin status, and server_id for authorization
4807 user_email = get_user_email(user)
4808 is_admin = user.get("is_admin", False) if isinstance(user, dict) else False
4809 server_id = request.headers.get("X-Server-ID")
4811 # Admin bypass: pass user=None to trigger unrestricted access
4812 # Non-admin: pass user_email and let service look up teams
4813 auth_user_email = None if is_admin else user_email
4815 try:
4816 return await prompt_service.get_prompt(
4817 db,
4818 prompt_id,
4819 {},
4820 user=auth_user_email,
4821 server_id=server_id,
4822 token_teams=None, # Admin: bypass; Non-admin: lookup teams
4823 plugin_context_table=plugin_context_table,
4824 plugin_global_context=plugin_global_context,
4825 )
4826 except PromptNotFoundError as e:
4827 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
4828 except PermissionError as e:
4829 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))
4832@prompt_router.put("/{prompt_id}", response_model=PromptRead)
4833@require_permission("prompts.update")
4834async def update_prompt(
4835 prompt_id: str,
4836 prompt: PromptUpdate,
4837 request: Request,
4838 db: Session = Depends(get_db),
4839 user=Depends(get_current_user_with_permissions),
4840) -> PromptRead:
4841 """
4842 Update (overwrite) an existing prompt definition.
4844 Args:
4845 prompt_id (str): Identifier of the prompt to update.
4846 prompt (PromptUpdate): New prompt content and metadata.
4847 request (Request): The FastAPI request object for metadata extraction.
4848 db (Session): Active SQLAlchemy session.
4849 user (str): Authenticated username.
4851 Returns:
4852 PromptRead: The updated prompt object.
4854 Raises:
4855 HTTPException: * **409 Conflict** - a different prompt with the same *name* already exists and is still active.
4856 * **400 Bad Request** - validation or persistence error raised by :pyclass:`~mcpgateway.services.prompt_service.PromptService`.
4857 """
4858 logger.debug(f"User: {user} requested to update prompt: {prompt_id} with data={prompt}")
4859 try:
4860 # Extract modification metadata
4861 mod_metadata = MetadataCapture.extract_modification_metadata(request, user, 0) # Version will be incremented in service
4863 user_email = user.get("email") if isinstance(user, dict) else str(user)
4864 result = await prompt_service.update_prompt(
4865 db,
4866 prompt_id,
4867 prompt,
4868 modified_by=mod_metadata["modified_by"],
4869 modified_from_ip=mod_metadata["modified_from_ip"],
4870 modified_via=mod_metadata["modified_via"],
4871 modified_user_agent=mod_metadata["modified_user_agent"],
4872 user_email=user_email,
4873 )
4874 db.commit()
4875 db.close()
4876 return result
4877 except Exception as e:
4878 if isinstance(e, PermissionError):
4879 raise HTTPException(status_code=403, detail=str(e))
4880 if isinstance(e, PromptNotFoundError):
4881 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
4882 if isinstance(e, ValidationError):
4883 logger.error(f"Validation error while updating prompt: {e}")
4884 raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail=ErrorFormatter.format_validation_error(e))
4885 if isinstance(e, IntegrityError):
4886 logger.error(f"Integrity error while updating prompt: {e}")
4887 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=ErrorFormatter.format_database_error(e))
4888 if isinstance(e, PromptNameConflictError):
4889 # If the prompt name already exists, return a 409 Conflict error
4890 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
4891 if isinstance(e, PromptError):
4892 # If there is a general prompt error, return a 400 Bad Request error
4893 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
4894 # For any other unexpected errors, return a 500 Internal Server Error
4895 logger.error(f"Unexpected error while updating prompt: {e}")
4896 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred while updating the prompt")
4899@prompt_router.delete("/{prompt_id}")
4900@require_permission("prompts.delete")
4901async def delete_prompt(
4902 prompt_id: str,
4903 purge_metrics: bool = Query(False, description="Purge raw + rollup metrics for this prompt"),
4904 db: Session = Depends(get_db),
4905 user=Depends(get_current_user_with_permissions),
4906) -> Dict[str, str]:
4907 """
4908 Delete a prompt by ID.
4910 Args:
4911 prompt_id: ID of the prompt.
4912 purge_metrics: Whether to delete raw + hourly rollup metrics for this prompt.
4913 db: Database session.
4914 user: Authenticated user.
4916 Returns:
4917 Status message.
4919 Raises:
4920 HTTPException: If the prompt is not found, a prompt error occurs, or an unexpected error occurs during deletion.
4921 """
4922 logger.debug(f"User: {user} requested deletion of prompt {prompt_id}")
4923 try:
4924 user_email = user.get("email") if isinstance(user, dict) else str(user)
4925 await prompt_service.delete_prompt(db, prompt_id, user_email=user_email, purge_metrics=purge_metrics)
4926 db.commit()
4927 db.close()
4928 return {"status": "success", "message": f"Prompt {prompt_id} deleted"}
4929 except Exception as e:
4930 if isinstance(e, PermissionError):
4931 raise HTTPException(status_code=403, detail=str(e))
4932 if isinstance(e, PromptNotFoundError):
4933 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
4934 if isinstance(e, PromptError):
4935 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
4936 logger.error(f"Unexpected error while deleting prompt {prompt_id}: {e}")
4937 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred while deleting the prompt")
4939 # except PromptNotFoundError as e:
4940 # return {"status": "error", "message": str(e)}
4941 # except PromptError as e:
4942 # return {"status": "error", "message": str(e)}
4945################
4946# Gateway APIs #
4947################
4948@gateway_router.post("/{gateway_id}/state")
4949@require_permission("gateways.update")
4950async def set_gateway_state(
4951 gateway_id: str,
4952 activate: bool = True,
4953 db: Session = Depends(get_db),
4954 user=Depends(get_current_user_with_permissions),
4955) -> Dict[str, Any]:
4956 """
4957 Set the activation status of a gateway.
4959 Args:
4960 gateway_id (str): String ID of the gateway to update.
4961 activate (bool): ``True`` to activate, ``False`` to deactivate.
4962 db (Session): Active SQLAlchemy session.
4963 user (str): Authenticated username.
4965 Returns:
4966 Dict[str, Any]: A dict containing the operation status, a message, and the updated gateway object.
4968 Raises:
4969 HTTPException: Returned with **400 Bad Request** if the state change fails (e.g., the gateway does not exist or the database raises an unexpected error).
4970 """
4971 logger.debug(f"User '{user}' requested state change for gateway {gateway_id}, activate={activate}")
4972 try:
4973 user_email = user.get("email") if isinstance(user, dict) else str(user)
4974 gateway = await gateway_service.set_gateway_state(
4975 db,
4976 gateway_id,
4977 activate,
4978 user_email=user_email,
4979 )
4980 return {
4981 "status": "success",
4982 "message": f"Gateway {gateway_id} {'activated' if activate else 'deactivated'}",
4983 "gateway": gateway.model_dump(),
4984 }
4985 except PermissionError as e:
4986 raise HTTPException(status_code=403, detail=str(e))
4987 except GatewayNotFoundError as e:
4988 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
4989 except Exception as e:
4990 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
4993@gateway_router.post("/{gateway_id}/toggle", deprecated=True)
4994@require_permission("gateways.update")
4995async def toggle_gateway_status(
4996 gateway_id: str,
4997 activate: bool = True,
4998 db: Session = Depends(get_db),
4999 user=Depends(get_current_user_with_permissions),
5000) -> Dict[str, Any]:
5001 """DEPRECATED: Use /state endpoint instead. This endpoint will be removed in a future release.
5003 Set the activation status of a gateway.
5005 Args:
5006 gateway_id: The gateway ID.
5007 activate: Whether to activate (True) or deactivate (False) the gateway.
5008 db: Database session.
5009 user: Authenticated user context.
5011 Returns:
5012 Status message with gateway state.
5013 """
5015 warnings.warn("The /toggle endpoint is deprecated. Use /state instead.", DeprecationWarning, stacklevel=2)
5016 return await set_gateway_state(gateway_id, activate, db, user)
5019@gateway_router.get("", response_model=Union[List[GatewayRead], CursorPaginatedGatewaysResponse])
5020@gateway_router.get("/", response_model=Union[List[GatewayRead], CursorPaginatedGatewaysResponse])
5021@require_permission("gateways.read")
5022async def list_gateways(
5023 request: Request,
5024 cursor: Optional[str] = Query(None, description="Cursor for pagination"),
5025 include_pagination: bool = Query(False, description="Include cursor pagination metadata in response"),
5026 limit: Optional[int] = Query(None, ge=0, description="Maximum number of gateways to return"),
5027 include_inactive: bool = False,
5028 team_id: Optional[str] = Query(None, description="Filter by team ID"),
5029 visibility: Optional[str] = Query(None, description="Filter by visibility: private, team, public"),
5030 db: Session = Depends(get_db),
5031 user=Depends(get_current_user_with_permissions),
5032) -> Union[List[GatewayRead], Dict[str, Any]]:
5033 """
5034 List all gateways with cursor pagination support.
5036 Args:
5037 request (Request): The FastAPI request object for team_id retrieval
5038 cursor (Optional[str]): Cursor for pagination.
5039 include_pagination (bool): Include cursor pagination metadata in response.
5040 limit (Optional[int]): Maximum number of gateways to return.
5041 include_inactive: Include inactive gateways.
5042 team_id (Optional): Filter by specific team ID.
5043 visibility (Optional): Filter by visibility (private, team, public).
5044 db: Database session.
5045 user: Authenticated user.
5047 Returns:
5048 Union[List[GatewayRead], Dict[str, Any]]: List of gateway records or paginated response with nextCursor.
5049 """
5050 logger.debug(f"User '{user}' requested list of gateways with include_inactive={include_inactive}")
5052 user_email = get_user_email(user)
5054 # Check team_id from token
5055 token_team_id = getattr(request.state, "team_id", None)
5056 token_teams = getattr(request.state, "token_teams", None)
5058 # Check for team ID mismatch
5059 if team_id is not None and token_team_id is not None and team_id != token_team_id:
5060 return ORJSONResponse(
5061 content={"message": "Access issue: This API token does not have the required permissions for this team."},
5062 status_code=status.HTTP_403_FORBIDDEN,
5063 )
5065 # Determine final team ID
5066 team_id = team_id or token_team_id
5068 # SECURITY: token_teams is normalized in auth.py:
5069 # - None: admin bypass (is_admin=true with explicit null teams) - sees ALL resources
5070 # - []: public-only (missing teams or explicit empty) - sees only public
5071 # - [...]: team-scoped - sees public + teams + user's private
5072 is_admin_bypass = token_teams is None
5073 is_public_only_token = token_teams is not None and len(token_teams) == 0
5075 # Use consolidated gateway listing with optional team filtering
5076 # For admin bypass: pass user_email=None and token_teams=None to skip all filtering
5077 logger.debug(f"User: {user_email} requested gateway list with include_inactive={include_inactive}, team_id={team_id}, visibility={visibility}")
5078 data, next_cursor = await gateway_service.list_gateways(
5079 db=db,
5080 cursor=cursor,
5081 limit=limit,
5082 include_inactive=include_inactive,
5083 user_email=None if is_admin_bypass else user_email, # Admin bypass: no user filtering
5084 team_id=team_id,
5085 visibility="public" if is_public_only_token and not visibility else visibility,
5086 token_teams=token_teams, # None = admin bypass, [] = public-only, [...] = team-scoped
5087 )
5088 # Release transaction before response serialization
5089 db.commit()
5090 db.close()
5092 if include_pagination:
5093 payload = {"gateways": [gateway.model_dump(by_alias=True) for gateway in data]}
5094 if next_cursor: 5094 ↛ 5096line 5094 didn't jump to line 5096 because the condition on line 5094 was always true
5095 payload["nextCursor"] = next_cursor
5096 return payload
5097 return data
5100@gateway_router.post("", response_model=GatewayRead)
5101@gateway_router.post("/", response_model=GatewayRead)
5102@require_permission("gateways.create")
5103async def register_gateway(
5104 gateway: GatewayCreate,
5105 request: Request,
5106 db: Session = Depends(get_db),
5107 user=Depends(get_current_user_with_permissions),
5108) -> Union[GatewayRead, JSONResponse]:
5109 """
5110 Register a new gateway.
5112 Args:
5113 gateway: Gateway creation data.
5114 request: The FastAPI request object for metadata extraction.
5115 db: Database session.
5116 user: Authenticated user.
5118 Returns:
5119 Created gateway.
5120 """
5121 logger.debug(f"User '{user}' requested to register gateway: {gateway}")
5122 try:
5123 # Extract metadata from request
5124 metadata = MetadataCapture.extract_creation_metadata(request, user)
5126 # Get user email and handle team assignment
5127 user_email = get_user_email(user)
5129 token_team_id = getattr(request.state, "team_id", None)
5130 token_teams = getattr(request.state, "token_teams", None)
5131 gateway_team_id = gateway.team_id
5132 visibility = gateway.visibility
5134 # SECURITY: Public-only tokens (teams == []) cannot create team/private resources
5135 is_public_only_token = token_teams is not None and len(token_teams) == 0
5136 if is_public_only_token and visibility in ("team", "private"):
5137 return ORJSONResponse(
5138 content={"message": "Public-only tokens cannot create team or private resources. Use visibility='public' or obtain a team-scoped token."},
5139 status_code=status.HTTP_403_FORBIDDEN,
5140 )
5142 # Check for team ID mismatch (only for non-public-only tokens)
5143 if not is_public_only_token and gateway_team_id is not None and token_team_id is not None and gateway_team_id != token_team_id:
5144 return ORJSONResponse(
5145 content={"message": "Access issue: This API token does not have the required permissions for this team."},
5146 status_code=status.HTTP_403_FORBIDDEN,
5147 )
5149 # Determine final team ID (public-only tokens get no team)
5150 if is_public_only_token:
5151 team_id = None
5152 else:
5153 team_id = gateway_team_id or token_team_id
5155 logger.debug(f"User {user_email} is creating a new gateway for team {team_id}")
5157 return await gateway_service.register_gateway(
5158 db,
5159 gateway,
5160 created_by=metadata["created_by"],
5161 created_from_ip=metadata["created_from_ip"],
5162 created_via=metadata["created_via"],
5163 created_user_agent=metadata["created_user_agent"],
5164 team_id=team_id,
5165 owner_email=user_email,
5166 visibility=visibility,
5167 )
5168 except Exception as ex:
5169 if isinstance(ex, GatewayConnectionError):
5170 return ORJSONResponse(content={"message": str(ex)}, status_code=status.HTTP_502_BAD_GATEWAY)
5171 if isinstance(ex, ValueError):
5172 return ORJSONResponse(content={"message": "Unable to process input"}, status_code=status.HTTP_400_BAD_REQUEST)
5173 if isinstance(ex, GatewayNameConflictError):
5174 return ORJSONResponse(content={"message": "Gateway name already exists"}, status_code=status.HTTP_409_CONFLICT)
5175 if isinstance(ex, GatewayDuplicateConflictError):
5176 return ORJSONResponse(content={"message": "Gateway already exists"}, status_code=status.HTTP_409_CONFLICT)
5177 if isinstance(ex, RuntimeError):
5178 return ORJSONResponse(content={"message": "Error during execution"}, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
5179 if isinstance(ex, ValidationError):
5180 return ORJSONResponse(content=ErrorFormatter.format_validation_error(ex), status_code=status.HTTP_422_UNPROCESSABLE_CONTENT)
5181 if isinstance(ex, IntegrityError):
5182 return ORJSONResponse(status_code=status.HTTP_409_CONFLICT, content=ErrorFormatter.format_database_error(ex))
5183 return ORJSONResponse(content={"message": "Unexpected error"}, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
5186@gateway_router.get("/{gateway_id}", response_model=GatewayRead)
5187@require_permission("gateways.read")
5188async def get_gateway(gateway_id: str, db: Session = Depends(get_db), user=Depends(get_current_user_with_permissions)) -> Union[GatewayRead, JSONResponse]:
5189 """
5190 Retrieve a gateway by ID.
5192 Args:
5193 gateway_id: ID of the gateway.
5194 db: Database session.
5195 user: Authenticated user.
5197 Returns:
5198 Gateway data.
5200 Raises:
5201 HTTPException: 404 if gateway not found.
5202 """
5203 logger.debug(f"User '{user}' requested gateway {gateway_id}")
5204 try:
5205 return await gateway_service.get_gateway(db, gateway_id)
5206 except GatewayNotFoundError as e:
5207 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
5210@gateway_router.put("/{gateway_id}", response_model=GatewayRead)
5211@require_permission("gateways.update")
5212async def update_gateway(
5213 gateway_id: str,
5214 gateway: GatewayUpdate,
5215 request: Request,
5216 db: Session = Depends(get_db),
5217 user=Depends(get_current_user_with_permissions),
5218) -> Union[GatewayRead, JSONResponse]:
5219 """
5220 Update a gateway.
5222 Args:
5223 gateway_id: Gateway ID.
5224 gateway: Gateway update data.
5225 request (Request): The FastAPI request object for metadata extraction.
5226 db: Database session.
5227 user: Authenticated user.
5229 Returns:
5230 Updated gateway.
5231 """
5232 logger.debug(f"User '{user}' requested update on gateway {gateway_id} with data={gateway}")
5233 try:
5234 # Extract modification metadata
5235 mod_metadata = MetadataCapture.extract_modification_metadata(request, user, 0) # Version will be incremented in service
5237 user_email = user.get("email") if isinstance(user, dict) else str(user)
5238 result = await gateway_service.update_gateway(
5239 db,
5240 gateway_id,
5241 gateway,
5242 modified_by=mod_metadata["modified_by"],
5243 modified_from_ip=mod_metadata["modified_from_ip"],
5244 modified_via=mod_metadata["modified_via"],
5245 modified_user_agent=mod_metadata["modified_user_agent"],
5246 user_email=user_email,
5247 )
5248 db.commit()
5249 db.close()
5250 return result
5251 except Exception as ex:
5252 if isinstance(ex, PermissionError):
5253 return ORJSONResponse(content={"message": str(ex)}, status_code=403)
5254 if isinstance(ex, GatewayNotFoundError):
5255 return ORJSONResponse(content={"message": "Gateway not found"}, status_code=status.HTTP_404_NOT_FOUND)
5256 if isinstance(ex, GatewayConnectionError):
5257 return ORJSONResponse(content={"message": str(ex)}, status_code=status.HTTP_502_BAD_GATEWAY)
5258 if isinstance(ex, ValueError):
5259 return ORJSONResponse(content={"message": "Unable to process input"}, status_code=status.HTTP_400_BAD_REQUEST)
5260 if isinstance(ex, GatewayNameConflictError):
5261 return ORJSONResponse(content={"message": "Gateway name already exists"}, status_code=status.HTTP_409_CONFLICT)
5262 if isinstance(ex, GatewayDuplicateConflictError):
5263 return ORJSONResponse(content={"message": "Gateway already exists"}, status_code=status.HTTP_409_CONFLICT)
5264 if isinstance(ex, RuntimeError):
5265 return ORJSONResponse(content={"message": "Error during execution"}, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
5266 if isinstance(ex, ValidationError):
5267 return ORJSONResponse(content=ErrorFormatter.format_validation_error(ex), status_code=status.HTTP_422_UNPROCESSABLE_CONTENT)
5268 if isinstance(ex, IntegrityError):
5269 return ORJSONResponse(status_code=status.HTTP_409_CONFLICT, content=ErrorFormatter.format_database_error(ex))
5270 return ORJSONResponse(content={"message": "Unexpected error"}, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
5273@gateway_router.delete("/{gateway_id}")
5274@require_permission("gateways.delete")
5275async def delete_gateway(gateway_id: str, db: Session = Depends(get_db), user=Depends(get_current_user_with_permissions)) -> Dict[str, str]:
5276 """
5277 Delete a gateway by ID.
5279 Args:
5280 gateway_id: ID of the gateway.
5281 db: Database session.
5282 user: Authenticated user.
5284 Returns:
5285 Status message.
5287 Raises:
5288 HTTPException: If permission denied (403), gateway not found (404), or other gateway error (400).
5289 """
5290 logger.debug(f"User '{user}' requested deletion of gateway {gateway_id}")
5291 try:
5292 user_email = user.get("email") if isinstance(user, dict) else str(user)
5293 current = await gateway_service.get_gateway(db, gateway_id)
5294 has_resources = bool(current.capabilities.get("resources"))
5295 await gateway_service.delete_gateway(db, gateway_id, user_email=user_email)
5297 # If the gateway had resources and was successfully deleted, invalidate
5298 # the whole resource cache. This is needed since the cache holds both
5299 # individual resources and the full listing which will also need to be
5300 # invalidated.
5301 if has_resources:
5302 await invalidate_resource_cache()
5304 db.commit()
5305 db.close()
5306 return {"status": "success", "message": f"Gateway {gateway_id} deleted"}
5307 except PermissionError as e:
5308 raise HTTPException(status_code=403, detail=str(e))
5309 except GatewayNotFoundError as e:
5310 raise HTTPException(status_code=404, detail=str(e))
5311 except GatewayError as e:
5312 raise HTTPException(status_code=400, detail=str(e))
5315@gateway_router.post("/{gateway_id}/tools/refresh", response_model=GatewayRefreshResponse)
5316@require_permission("gateways.update")
5317async def refresh_gateway_tools(
5318 gateway_id: str,
5319 request: Request,
5320 include_resources: bool = Query(False, description="Include resources in refresh"),
5321 include_prompts: bool = Query(False, description="Include prompts in refresh"),
5322 user=Depends(get_current_user_with_permissions),
5323) -> GatewayRefreshResponse:
5324 """
5325 Manually trigger a refresh of tools/resources/prompts from a gateway's MCP server.
5327 This endpoint forces an immediate re-discovery of tools, resources, and prompts
5328 from the specified gateway. It returns counts of added, updated, and removed items,
5329 along with any validation errors encountered.
5331 Args:
5332 gateway_id: ID of the gateway to refresh.
5333 request: The FastAPI request object.
5334 include_resources: Whether to include resources in the refresh.
5335 include_prompts: Whether to include prompts in the refresh.
5336 user: Authenticated user.
5338 Returns:
5339 GatewayRefreshResponse with counts of changes and any validation errors.
5341 Raises:
5342 HTTPException: 404 if gateway not found, 409 if refresh already in progress.
5343 """
5344 logger.info(f"User '{user}' requested manual refresh for gateway {gateway_id}")
5345 try:
5346 user_email = user.get("email") if isinstance(user, dict) else str(user)
5347 result = await gateway_service.refresh_gateway_manually(
5348 gateway_id=gateway_id,
5349 include_resources=include_resources,
5350 include_prompts=include_prompts,
5351 user_email=user_email,
5352 request_headers=dict(request.headers),
5353 )
5354 return GatewayRefreshResponse(gateway_id=gateway_id, **result)
5355 except GatewayNotFoundError as e:
5356 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
5357 except GatewayError as e:
5358 # 409 Conflict for concurrent refresh attempts
5359 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
5362##############
5363# Root APIs #
5364##############
5365@root_router.get("", response_model=List[Root])
5366@root_router.get("/", response_model=List[Root])
5367async def list_roots(
5368 user=Depends(get_current_user_with_permissions),
5369) -> List[Root]:
5370 """
5371 Retrieve a list of all registered roots.
5373 Args:
5374 user: Authenticated user.
5376 Returns:
5377 List of Root objects.
5378 """
5379 logger.debug(f"User '{user}' requested list of roots")
5380 return await root_service.list_roots()
5383@root_router.get("/export", response_model=Dict[str, Any])
5384async def export_root(
5385 uri: str,
5386 user=Depends(get_current_user_with_permissions),
5387) -> Dict[str, Any]:
5388 """
5389 Export a single root configuration to JSON format.
5391 Args:
5392 uri: Root URI to export (query parameter)
5393 user: Authenticated user
5395 Returns:
5396 Export data containing root information
5398 Raises:
5399 HTTPException: If root not found or export fails
5400 """
5401 try:
5402 logger.info(f"User {user} requested root export for URI: {uri}")
5404 # Extract username from user
5405 username: Optional[str] = None
5406 if hasattr(user, "email"):
5407 username = getattr(user, "email", None)
5408 elif isinstance(user, dict):
5409 username = user.get("email", None)
5410 else:
5411 username = None
5413 # Get the root by URI
5414 root = await root_service.get_root_by_uri(uri)
5416 # Create export data
5417 export_data = {
5418 "exported_at": datetime.now().isoformat(),
5419 "exported_by": username or "unknown",
5420 "export_type": "root",
5421 "version": "1.0",
5422 "root": {
5423 "uri": str(root.uri),
5424 "name": root.name,
5425 },
5426 }
5428 return export_data
5430 except RootServiceNotFoundError as e:
5431 logger.error(f"Root not found for export by user {user}: {str(e)}")
5432 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
5433 except Exception as e:
5434 logger.error(f"Unexpected root export error for user {user}: {str(e)}")
5435 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Root export failed: {str(e)}")
5438@root_router.get("/changes")
5439async def subscribe_roots_changes(
5440 user=Depends(get_current_user_with_permissions),
5441) -> StreamingResponse:
5442 """
5443 Subscribe to real-time changes in root list via Server-Sent Events (SSE).
5445 Args:
5446 user: Authenticated user.
5448 Returns:
5449 StreamingResponse with event-stream media type.
5450 """
5451 logger.debug(f"User '{user}' subscribed to root changes stream")
5453 async def generate_events():
5454 """Generate SSE-formatted events from root service changes.
5456 Yields:
5457 str: SSE-formatted event data.
5458 """
5459 async for event in root_service.subscribe_changes():
5460 yield f"data: {orjson.dumps(event).decode()}\n\n"
5462 return StreamingResponse(generate_events(), media_type="text/event-stream")
5465@root_router.get("/{root_uri:path}", response_model=Root)
5466async def get_root_by_uri(
5467 root_uri: str,
5468 user=Depends(get_current_user_with_permissions),
5469) -> Root:
5470 """
5471 Retrieve a specific root by its URI.
5473 Args:
5474 root_uri: URI of the root to retrieve.
5475 user: Authenticated user.
5477 Returns:
5478 Root object.
5480 Raises:
5481 HTTPException: If the root is not found.
5482 Exception: For any other unexpected errors.
5483 """
5484 logger.debug(f"User '{user}' requested root with URI: {root_uri}")
5485 try:
5486 root = await root_service.get_root_by_uri(root_uri)
5487 return root
5488 except RootServiceNotFoundError as e:
5489 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
5490 except Exception as e:
5491 logger.error(f"Error getting root {root_uri}: {e}")
5492 raise e
5495@root_router.post("", response_model=Root)
5496@root_router.post("/", response_model=Root)
5497async def add_root(
5498 root: Root, # Accept JSON body using the Root model from models.py
5499 user=Depends(get_current_user_with_permissions),
5500) -> Root:
5501 """
5502 Add a new root.
5504 Args:
5505 root: Root object containing URI and name.
5506 user: Authenticated user.
5508 Returns:
5509 The added Root object.
5510 """
5511 logger.debug(f"User '{user}' requested to add root: {root}")
5512 return await root_service.add_root(str(root.uri), root.name)
5515@root_router.put("/{root_uri:path}", response_model=Root)
5516async def update_root(
5517 root_uri: str,
5518 root: Root,
5519 user=Depends(get_current_user_with_permissions),
5520) -> Root:
5521 """
5522 Update a root by URI.
5524 Args:
5525 root_uri: URI of the root to update.
5526 root: Root object with updated information.
5527 user: Authenticated user.
5529 Returns:
5530 Updated Root object.
5532 Raises:
5533 HTTPException: If the root is not found.
5534 Exception: For any other unexpected errors.
5535 """
5536 logger.debug(f"User '{user}' requested to update root with URI: {root_uri}")
5537 try:
5538 root = await root_service.update_root(root_uri, root.name)
5539 return root
5540 except RootServiceNotFoundError as e:
5541 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
5542 except Exception as e:
5543 logger.error(f"Error updating root {root_uri}: {e}")
5544 raise e
5547@root_router.delete("/{uri:path}")
5548async def remove_root(
5549 uri: str,
5550 user=Depends(get_current_user_with_permissions),
5551) -> Dict[str, str]:
5552 """
5553 Remove a registered root by URI.
5555 Args:
5556 uri: URI of the root to remove.
5557 user: Authenticated user.
5559 Returns:
5560 Status message indicating result.
5561 """
5562 logger.debug(f"User '{user}' requested to remove root with URI: {uri}")
5563 await root_service.remove_root(uri)
5564 return {"status": "success", "message": f"Root {uri} removed"}
5567##################
5568# Utility Routes #
5569##################
5570@utility_router.post("/rpc/")
5571@utility_router.post("/rpc")
5572async def handle_rpc(request: Request, db: Session = Depends(get_db), user=Depends(get_current_user_with_permissions)):
5573 """Handle RPC requests.
5575 Args:
5576 request (Request): The incoming FastAPI request.
5577 db (Session): Database session.
5578 user: The authenticated user (dict with RBAC context).
5580 Returns:
5581 Response with the RPC result or error.
5583 Raises:
5584 PluginError: If encounters issue with plugin
5585 PluginViolationError: If plugin violated the request. Example - In case of OPA plugin, if the request is denied by policy.
5586 """
5587 req_id = None
5588 try:
5589 # Extract user identifier from either RBAC user object or JWT payload
5590 if hasattr(user, "email"):
5591 user_id = getattr(user, "email", None) # RBAC user object
5592 elif isinstance(user, dict):
5593 user_id = user.get("sub") or user.get("email") or user.get("username", "unknown") # JWT payload
5594 else:
5595 user_id = str(user) # String username from basic auth
5597 logger.debug(f"User {user_id} made an RPC request")
5598 try:
5599 body = orjson.loads(await request.body())
5600 except orjson.JSONDecodeError:
5601 return ORJSONResponse(
5602 status_code=400,
5603 content={
5604 "jsonrpc": "2.0",
5605 "error": {"code": -32700, "message": "Parse error"},
5606 "id": None,
5607 },
5608 )
5609 method = body["method"]
5610 req_id = body.get("id")
5611 if req_id is None:
5612 req_id = str(uuid.uuid4())
5613 params = body.get("params", {})
5614 server_id = params.get("server_id", None)
5615 cursor = params.get("cursor") # Extract cursor parameter
5617 RPCRequest(jsonrpc="2.0", method=method, params=params) # Validate the request body against the RPCRequest model
5619 # Multi-worker session affinity: check if we should forward to another worker
5620 # This applies to ALL methods (except initialize which creates new sessions)
5621 # The x-forwarded-internally header marks requests that have already been forwarded
5622 # to prevent infinite forwarding loops
5623 headers = {k.lower(): v for k, v in request.headers.items()}
5624 # Session ID can come from two sources:
5625 # 1. MCP-Session-Id (mcp-session-id) - MCP protocol header from Streamable HTTP clients
5626 # 2. x-mcp-session-id - our internal header from SSE session_registry calls
5627 mcp_session_id = headers.get("mcp-session-id") or headers.get("x-mcp-session-id")
5628 is_internally_forwarded = headers.get("x-forwarded-internally") == "true"
5630 if settings.mcpgateway_session_affinity_enabled and mcp_session_id and method != "initialize" and not is_internally_forwarded:
5631 # First-Party
5632 from mcpgateway.services.mcp_session_pool import MCPSessionPool, WORKER_ID # pylint: disable=import-outside-toplevel
5634 if not MCPSessionPool.is_valid_mcp_session_id(mcp_session_id):
5635 logger.debug("Invalid MCP session id for affinity forwarding, executing locally")
5636 else:
5637 session_short = mcp_session_id[:8] if len(mcp_session_id) >= 8 else mcp_session_id
5638 logger.debug(f"[AFFINITY] Worker {WORKER_ID} | Session {session_short}... | Method: {method} | RPC request received, checking affinity")
5639 try:
5640 # First-Party
5641 from mcpgateway.services.mcp_session_pool import get_mcp_session_pool # pylint: disable=import-outside-toplevel
5643 pool = get_mcp_session_pool()
5644 forwarded_response = await pool.forward_request_to_owner(
5645 mcp_session_id,
5646 {"method": method, "params": params, "headers": dict(headers), "req_id": req_id},
5647 )
5648 if forwarded_response is not None: 5648 ↛ 5668line 5648 didn't jump to line 5668 because the condition on line 5648 was always true
5649 # Request was handled by another worker
5650 logger.info(f"[AFFINITY] Worker {WORKER_ID} | Session {session_short}... | Method: {method} | Forwarded response received")
5651 if "error" in forwarded_response:
5652 raise JSONRPCError(
5653 forwarded_response["error"].get("code", -32603),
5654 forwarded_response["error"].get("message", "Forwarded request failed"),
5655 )
5656 result = forwarded_response.get("result", {})
5657 return {"jsonrpc": "2.0", "result": result, "id": req_id}
5658 except RuntimeError:
5659 # Pool not initialized - execute locally
5660 logger.debug(f"[AFFINITY] Worker {WORKER_ID} | Session {session_short}... | Method: {method} | Pool not initialized, executing locally")
5661 elif is_internally_forwarded and mcp_session_id:
5662 # First-Party
5663 from mcpgateway.services.mcp_session_pool import WORKER_ID # pylint: disable=import-outside-toplevel
5665 session_short = mcp_session_id[:8] if len(mcp_session_id) >= 8 else mcp_session_id
5666 logger.debug(f"[AFFINITY] Worker {WORKER_ID} | Session {session_short}... | Method: {method} | Internally forwarded request, executing locally")
5668 if method == "initialize":
5669 # Extract session_id from params or query string (for capability tracking)
5670 init_session_id = params.get("session_id") or params.get("sessionId") or request.query_params.get("session_id")
5671 # Pass server_id to advertise OAuth capability if configured per RFC 9728
5672 result = await session_registry.handle_initialize_logic(body.get("params", {}), session_id=init_session_id, server_id=server_id)
5673 if hasattr(result, "model_dump"): 5673 ↛ 5678line 5673 didn't jump to line 5678 because the condition on line 5673 was always true
5674 result = result.model_dump(by_alias=True, exclude_none=True)
5676 # Register session ownership in Redis for multi-worker affinity
5677 # This must happen AFTER initialize succeeds so subsequent requests route to this worker
5678 if settings.mcpgateway_session_affinity_enabled and mcp_session_id and mcp_session_id != "not-provided":
5679 try:
5680 # First-Party
5681 from mcpgateway.services.mcp_session_pool import get_mcp_session_pool, WORKER_ID # pylint: disable=import-outside-toplevel
5683 pool = get_mcp_session_pool()
5684 # Claim-or-refresh ownership for this session (does not steal).
5685 await pool.register_pool_session_owner(mcp_session_id)
5686 logger.debug(f"[AFFINITY_INIT] Worker {WORKER_ID} | Session {mcp_session_id[:8]}... | Registered ownership after initialize")
5687 except Exception as e:
5688 logger.warning(f"[AFFINITY_INIT] Failed to register session ownership: {e}")
5689 elif method == "tools/list":
5690 user_email, token_teams, is_admin = _get_rpc_filter_context(request, user)
5691 _req_email, _req_is_admin = user_email, is_admin
5692 _req_team_roles = get_user_team_roles(db, _req_email) if _req_email and not _req_is_admin else None
5693 # Admin bypass - only when token has NO team restrictions
5694 if is_admin and token_teams is None:
5695 user_email = None
5696 token_teams = None # Admin unrestricted
5697 elif token_teams is None:
5698 token_teams = [] # Non-admin without teams = public-only (secure default)
5699 if server_id:
5700 tools = await tool_service.list_server_tools(
5701 db,
5702 server_id,
5703 cursor=cursor,
5704 user_email=user_email,
5705 token_teams=token_teams,
5706 requesting_user_email=_req_email,
5707 requesting_user_is_admin=_req_is_admin,
5708 requesting_user_team_roles=_req_team_roles,
5709 )
5710 # Release DB connection early to prevent idle-in-transaction under load
5711 db.commit()
5712 db.close()
5713 result = {"tools": [t.model_dump(by_alias=True, exclude_none=True) for t in tools]}
5714 else:
5715 tools, next_cursor = await tool_service.list_tools(
5716 db,
5717 cursor=cursor,
5718 limit=0,
5719 user_email=user_email,
5720 token_teams=token_teams,
5721 requesting_user_email=_req_email,
5722 requesting_user_is_admin=_req_is_admin,
5723 requesting_user_team_roles=_req_team_roles,
5724 )
5725 # Release DB connection early to prevent idle-in-transaction under load
5726 db.commit()
5727 db.close()
5728 result = {"tools": [t.model_dump(by_alias=True, exclude_none=True) for t in tools]}
5729 if next_cursor:
5730 result["nextCursor"] = next_cursor
5731 elif method == "list_tools": # Legacy endpoint
5732 user_email, token_teams, is_admin = _get_rpc_filter_context(request, user)
5733 _req_email, _req_is_admin = user_email, is_admin
5734 _req_team_roles = get_user_team_roles(db, _req_email) if _req_email and not _req_is_admin else None
5735 # Admin bypass - only when token has NO team restrictions (token_teams is None)
5736 # If token has explicit team scope (even empty [] for public-only), respect it
5737 if is_admin and token_teams is None:
5738 user_email = None
5739 token_teams = None # Admin unrestricted
5740 elif token_teams is None:
5741 token_teams = [] # Non-admin without teams = public-only (secure default)
5742 if server_id:
5743 tools = await tool_service.list_server_tools(
5744 db,
5745 server_id,
5746 cursor=cursor,
5747 user_email=user_email,
5748 token_teams=token_teams,
5749 requesting_user_email=_req_email,
5750 requesting_user_is_admin=_req_is_admin,
5751 requesting_user_team_roles=_req_team_roles,
5752 )
5753 db.commit()
5754 db.close()
5755 result = {"tools": [t.model_dump(by_alias=True, exclude_none=True) for t in tools]}
5756 else:
5757 tools, next_cursor = await tool_service.list_tools(
5758 db,
5759 cursor=cursor,
5760 limit=0,
5761 user_email=user_email,
5762 token_teams=token_teams,
5763 requesting_user_email=_req_email,
5764 requesting_user_is_admin=_req_is_admin,
5765 requesting_user_team_roles=_req_team_roles,
5766 )
5767 db.commit()
5768 db.close()
5769 result = {"tools": [t.model_dump(by_alias=True, exclude_none=True) for t in tools]}
5770 if next_cursor:
5771 result["nextCursor"] = next_cursor
5772 elif method == "list_gateways":
5773 user_email, token_teams, is_admin = _get_rpc_filter_context(request, user)
5774 # Admin bypass - only when token has NO team restrictions
5775 if is_admin and token_teams is None:
5776 user_email = None
5777 token_teams = None # Admin unrestricted
5778 elif token_teams is None:
5779 token_teams = [] # Non-admin without teams = public-only (secure default)
5780 gateways, next_cursor = await gateway_service.list_gateways(db, include_inactive=False, user_email=user_email, token_teams=token_teams)
5781 db.commit()
5782 db.close()
5783 result = {"gateways": [g.model_dump(by_alias=True, exclude_none=True) for g in gateways]}
5784 if next_cursor:
5785 result["nextCursor"] = next_cursor
5786 elif method == "list_roots":
5787 roots = await root_service.list_roots()
5788 result = {"roots": [r.model_dump(by_alias=True, exclude_none=True) for r in roots]}
5789 elif method == "resources/list":
5790 user_email, token_teams, is_admin = _get_rpc_filter_context(request, user)
5791 # Admin bypass - only when token has NO team restrictions
5792 if is_admin and token_teams is None:
5793 user_email = None
5794 token_teams = None # Admin unrestricted
5795 elif token_teams is None:
5796 token_teams = [] # Non-admin without teams = public-only (secure default)
5797 if server_id:
5798 resources = await resource_service.list_server_resources(db, server_id, user_email=user_email, token_teams=token_teams)
5799 db.commit()
5800 db.close()
5801 result = {"resources": [r.model_dump(by_alias=True, exclude_none=True) for r in resources]}
5802 else:
5803 resources, next_cursor = await resource_service.list_resources(db, cursor=cursor, limit=0, user_email=user_email, token_teams=token_teams)
5804 db.commit()
5805 db.close()
5806 result = {"resources": [r.model_dump(by_alias=True, exclude_none=True) for r in resources]}
5807 if next_cursor:
5808 result["nextCursor"] = next_cursor
5809 elif method == "resources/read":
5810 uri = params.get("uri")
5811 request_id = params.get("requestId", None)
5812 meta_data = params.get("_meta", None)
5813 if not uri:
5814 raise JSONRPCError(-32602, "Missing resource URI in parameters", params)
5816 # Get authorization context (same as resources/list)
5817 auth_user_email, auth_token_teams, auth_is_admin = _get_rpc_filter_context(request, user)
5818 if auth_is_admin and auth_token_teams is None:
5819 auth_user_email = None
5820 # auth_token_teams stays None (unrestricted)
5821 elif auth_token_teams is None:
5822 auth_token_teams = [] # Non-admin without teams = public-only
5824 # Get user email for OAuth token selection
5825 oauth_user_email = get_user_email(user)
5826 # Get plugin contexts from request.state for cross-hook sharing
5827 plugin_context_table = getattr(request.state, "plugin_context_table", None)
5828 plugin_global_context = getattr(request.state, "plugin_global_context", None)
5829 try:
5830 result = await resource_service.read_resource(
5831 db,
5832 resource_uri=uri,
5833 request_id=request_id,
5834 user=auth_user_email,
5835 server_id=server_id,
5836 token_teams=auth_token_teams,
5837 plugin_context_table=plugin_context_table,
5838 plugin_global_context=plugin_global_context,
5839 meta_data=meta_data,
5840 )
5841 if hasattr(result, "model_dump"):
5842 result = {"contents": [result.model_dump(by_alias=True, exclude_none=True)]}
5843 else:
5844 result = {"contents": [result]}
5845 except ValueError:
5846 # Resource has no local content, forward to upstream MCP server
5847 result = await gateway_service.forward_request(db, method, params, app_user_email=oauth_user_email, user_email=auth_user_email, token_teams=auth_token_teams)
5848 if hasattr(result, "model_dump"):
5849 result = result.model_dump(by_alias=True, exclude_none=True)
5850 # Release transaction after resources/read completes
5851 db.commit()
5852 db.close()
5853 elif method == "resources/subscribe":
5854 # MCP spec-compliant resource subscription endpoint
5855 uri = params.get("uri")
5856 if not uri:
5857 raise JSONRPCError(-32602, "Missing resource URI in parameters", params)
5858 # Get user email for subscriber ID
5859 user_email = get_user_email(user)
5860 subscription = ResourceSubscription(uri=uri, subscriber_id=user_email)
5861 await resource_service.subscribe_resource(db, subscription)
5862 db.commit()
5863 db.close()
5864 result = {}
5865 elif method == "resources/unsubscribe":
5866 # MCP spec-compliant resource unsubscription endpoint
5867 uri = params.get("uri")
5868 if not uri:
5869 raise JSONRPCError(-32602, "Missing resource URI in parameters", params)
5870 # Get user email for subscriber ID
5871 user_email = get_user_email(user)
5872 subscription = ResourceSubscription(uri=uri, subscriber_id=user_email)
5873 await resource_service.unsubscribe_resource(db, subscription)
5874 db.commit()
5875 db.close()
5876 result = {}
5877 elif method == "prompts/list":
5878 user_email, token_teams, is_admin = _get_rpc_filter_context(request, user)
5879 # Admin bypass - only when token has NO team restrictions
5880 if is_admin and token_teams is None:
5881 user_email = None
5882 token_teams = None # Admin unrestricted
5883 elif token_teams is None:
5884 token_teams = [] # Non-admin without teams = public-only (secure default)
5885 if server_id:
5886 prompts = await prompt_service.list_server_prompts(db, server_id, cursor=cursor, user_email=user_email, token_teams=token_teams)
5887 db.commit()
5888 db.close()
5889 result = {"prompts": [p.model_dump(by_alias=True, exclude_none=True) for p in prompts]}
5890 else:
5891 prompts, next_cursor = await prompt_service.list_prompts(db, cursor=cursor, limit=0, user_email=user_email, token_teams=token_teams)
5892 db.commit()
5893 db.close()
5894 result = {"prompts": [p.model_dump(by_alias=True, exclude_none=True) for p in prompts]}
5895 if next_cursor:
5896 result["nextCursor"] = next_cursor
5897 elif method == "prompts/get":
5898 name = params.get("name")
5899 arguments = params.get("arguments", {})
5900 meta_data = params.get("_meta", None)
5901 if not name:
5902 raise JSONRPCError(-32602, "Missing prompt name in parameters", params)
5904 # Get authorization context (same as prompts/list)
5905 auth_user_email, auth_token_teams, auth_is_admin = _get_rpc_filter_context(request, user)
5906 if auth_is_admin and auth_token_teams is None:
5907 auth_user_email = None
5908 # auth_token_teams stays None (unrestricted)
5909 elif auth_token_teams is None:
5910 auth_token_teams = [] # Non-admin without teams = public-only
5912 # Get plugin contexts from request.state for cross-hook sharing
5913 plugin_context_table = getattr(request.state, "plugin_context_table", None)
5914 plugin_global_context = getattr(request.state, "plugin_global_context", None)
5915 result = await prompt_service.get_prompt(
5916 db,
5917 name,
5918 arguments,
5919 user=auth_user_email,
5920 server_id=server_id,
5921 token_teams=auth_token_teams,
5922 plugin_context_table=plugin_context_table,
5923 plugin_global_context=plugin_global_context,
5924 _meta_data=meta_data,
5925 )
5926 if hasattr(result, "model_dump"):
5927 result = result.model_dump(by_alias=True, exclude_none=True)
5928 # Release transaction after prompts/get completes
5929 db.commit()
5930 db.close()
5931 elif method == "ping":
5932 # Per the MCP spec, a ping returns an empty result.
5933 result = {}
5934 elif method == "tools/call": # pylint: disable=too-many-nested-blocks
5935 # Note: Multi-worker session affinity forwarding is handled earlier
5936 # (before method routing) to apply to ALL methods, not just tools/call
5937 name = params.get("name")
5938 arguments = params.get("arguments", {})
5939 meta_data = params.get("_meta", None)
5940 if not name:
5941 raise JSONRPCError(-32602, "Missing tool name in parameters", params)
5943 # Get authorization context (same as tools/list)
5944 auth_user_email, auth_token_teams, auth_is_admin = _get_rpc_filter_context(request, user)
5945 if auth_is_admin and auth_token_teams is None:
5946 auth_user_email = None
5947 # auth_token_teams stays None (unrestricted)
5948 elif auth_token_teams is None:
5949 auth_token_teams = [] # Non-admin without teams = public-only
5951 # Get user email for OAuth token selection
5952 oauth_user_email = get_user_email(user)
5953 # Get plugin contexts from request.state for cross-hook sharing
5954 plugin_context_table = getattr(request.state, "plugin_context_table", None)
5955 plugin_global_context = getattr(request.state, "plugin_global_context", None)
5957 # Register the tool execution for cancellation tracking with task reference (if enabled)
5958 # Note: req_id can be 0 which is falsy but valid per JSON-RPC spec, so use 'is not None'
5959 run_id = str(req_id) if req_id is not None else None
5960 tool_task: Optional[asyncio.Task] = None
5962 async def cancel_tool_task(reason: Optional[str] = None):
5963 """Cancel callback that actually cancels the asyncio task.
5965 Args:
5966 reason: Optional reason for cancellation.
5967 """
5968 if tool_task and not tool_task.done(): 5968 ↛ exitline 5968 didn't return from function 'cancel_tool_task' because the condition on line 5968 was always true
5969 logger.info(f"Cancelling tool task for run_id={run_id}, reason={reason}")
5970 tool_task.cancel()
5972 if settings.mcpgateway_tool_cancellation_enabled and run_id:
5973 await cancellation_service.register_run(run_id, name=f"tool:{name}", cancel_callback=cancel_tool_task)
5975 try:
5976 # Check if cancelled before execution (only if feature enabled)
5977 if settings.mcpgateway_tool_cancellation_enabled and run_id:
5978 run_status = await cancellation_service.get_status(run_id)
5979 if run_status and run_status.get("cancelled"):
5980 raise JSONRPCError(-32800, f"Tool execution cancelled: {name}", {"requestId": run_id})
5982 # Create task for tool execution to enable real cancellation
5983 async def execute_tool():
5984 """Execute tool invocation with fallback to gateway forwarding.
5986 Returns:
5987 The tool invocation result or gateway forwarding result.
5988 """
5989 try:
5990 return await tool_service.invoke_tool(
5991 db=db,
5992 name=name,
5993 arguments=arguments,
5994 request_headers=headers,
5995 app_user_email=oauth_user_email,
5996 user_email=auth_user_email,
5997 token_teams=auth_token_teams,
5998 server_id=server_id,
5999 plugin_context_table=plugin_context_table,
6000 plugin_global_context=plugin_global_context,
6001 meta_data=meta_data,
6002 )
6003 except ValueError:
6004 # Fallback to gateway forwarding
6005 return await gateway_service.forward_request(db, method, params, app_user_email=oauth_user_email, user_email=auth_user_email, token_teams=auth_token_teams)
6007 tool_task = asyncio.create_task(execute_tool())
6009 # Re-check cancellation after task creation to handle race condition
6010 # where cancel arrived between pre-check and task creation (callback saw tool_task=None)
6011 if settings.mcpgateway_tool_cancellation_enabled and run_id:
6012 run_status = await cancellation_service.get_status(run_id)
6013 if run_status and run_status.get("cancelled"):
6014 tool_task.cancel()
6016 try:
6017 result = await tool_task
6018 if hasattr(result, "model_dump"):
6019 result = result.model_dump(by_alias=True, exclude_none=True)
6020 except asyncio.CancelledError:
6021 # Task was cancelled - return partial result or error
6022 logger.info(f"Tool execution cancelled for run_id={run_id}, tool={name}")
6023 raise JSONRPCError(-32800, f"Tool execution cancelled: {name}", {"requestId": run_id, "partial": False})
6024 finally:
6025 # Unregister the run when done (only if feature enabled)
6026 if settings.mcpgateway_tool_cancellation_enabled and run_id:
6027 await cancellation_service.unregister_run(run_id)
6028 # Release transaction after tools/call completes
6029 db.commit()
6030 db.close()
6031 # TODO: Implement methods # pylint: disable=fixme
6032 elif method == "resources/templates/list":
6033 # MCP spec-compliant resource templates list endpoint
6034 # Use _get_rpc_filter_context - same pattern as tools/list
6035 user_email_rpc, token_teams_rpc, is_admin_rpc = _get_rpc_filter_context(request, user)
6037 # Admin bypass - only when token has NO team restrictions
6038 if is_admin_rpc and token_teams_rpc is None:
6039 token_teams_rpc = None # Admin unrestricted
6040 elif token_teams_rpc is None:
6041 token_teams_rpc = [] # Non-admin without teams = public-only
6043 resource_templates = await resource_service.list_resource_templates(
6044 db,
6045 user_email=user_email_rpc,
6046 token_teams=token_teams_rpc,
6047 )
6048 db.commit()
6049 db.close()
6050 result = {"resourceTemplates": [rt.model_dump(by_alias=True, exclude_none=True) for rt in resource_templates]}
6051 elif method == "roots/list":
6052 # MCP spec-compliant method name
6053 roots = await root_service.list_roots()
6054 result = {"roots": [r.model_dump(by_alias=True, exclude_none=True) for r in roots]}
6055 elif method.startswith("roots/"):
6056 # Catch-all for other roots/* methods (currently unsupported)
6057 result = {}
6058 elif method == "notifications/initialized":
6059 # MCP spec-compliant notification: client initialized
6060 logger.info("Client initialized")
6061 await logging_service.notify("Client initialized", LogLevel.INFO)
6062 result = {}
6063 elif method == "notifications/cancelled":
6064 # MCP spec-compliant notification: request cancelled
6065 # Note: requestId can be 0 (valid per JSON-RPC), so use 'is not None' and normalize to string
6066 raw_request_id = params.get("requestId")
6067 request_id = str(raw_request_id) if raw_request_id is not None else None
6068 reason = params.get("reason")
6069 logger.info(f"Request cancelled: {request_id}, reason: {reason}")
6070 # Attempt local cancellation per MCP spec
6071 if request_id is not None: 6071 ↛ 6073line 6071 didn't jump to line 6073 because the condition on line 6071 was always true
6072 await cancellation_service.cancel_run(request_id, reason=reason)
6073 await logging_service.notify(f"Request cancelled: {request_id}", LogLevel.INFO)
6074 result = {}
6075 elif method == "notifications/message":
6076 # MCP spec-compliant notification: log message
6077 await logging_service.notify(
6078 params.get("data"),
6079 LogLevel(params.get("level", "info")),
6080 params.get("logger"),
6081 )
6082 result = {}
6083 elif method.startswith("notifications/"):
6084 # Catch-all for other notifications/* methods (currently unsupported)
6085 result = {}
6086 elif method == "sampling/createMessage":
6087 # MCP spec-compliant sampling endpoint
6088 result = await sampling_handler.create_message(db, params)
6089 elif method.startswith("sampling/"):
6090 # Catch-all for other sampling/* methods (currently unsupported)
6091 result = {}
6092 elif method == "elicitation/create":
6093 # MCP spec 2025-06-18: Elicitation support (server-to-client requests)
6094 # Elicitation allows servers to request structured user input through clients
6096 # Check if elicitation is enabled
6097 if not settings.mcpgateway_elicitation_enabled:
6098 raise JSONRPCError(-32601, "Elicitation feature is disabled", {"feature": "elicitation", "config": "MCPGATEWAY_ELICITATION_ENABLED=false"})
6100 # Validate params
6101 # First-Party
6102 from mcpgateway.common.models import ElicitRequestParams # pylint: disable=import-outside-toplevel
6103 from mcpgateway.services.elicitation_service import get_elicitation_service # pylint: disable=import-outside-toplevel
6105 try:
6106 elicit_params = ElicitRequestParams(**params)
6107 except Exception as e:
6108 raise JSONRPCError(-32602, f"Invalid elicitation params: {e}", params)
6110 # Get target session (from params or find elicitation-capable session)
6111 target_session_id = params.get("session_id") or params.get("sessionId")
6112 if not target_session_id:
6113 # Find an elicitation-capable session
6114 capable_sessions = await session_registry.get_elicitation_capable_sessions()
6115 if not capable_sessions:
6116 raise JSONRPCError(-32000, "No elicitation-capable clients available", {"message": elicit_params.message})
6117 target_session_id = capable_sessions[0]
6118 logger.debug(f"Selected session {target_session_id} for elicitation")
6120 # Verify session has elicitation capability
6121 if not await session_registry.has_elicitation_capability(target_session_id):
6122 raise JSONRPCError(-32000, f"Session {target_session_id} does not support elicitation", {"session_id": target_session_id})
6124 # Get elicitation service and create request
6125 elicitation_service = get_elicitation_service()
6127 # Extract timeout from params or use default
6128 timeout = params.get("timeout", settings.mcpgateway_elicitation_timeout)
6130 try:
6131 # Create elicitation request - this stores it and waits for response
6132 # For now, use dummy upstream_session_id - in full bidirectional proxy,
6133 # this would be the session that initiated the request
6134 upstream_session_id = "gateway"
6136 # Start the elicitation (creates pending request and future)
6137 elicitation_task = asyncio.create_task(
6138 elicitation_service.create_elicitation(
6139 upstream_session_id=upstream_session_id, downstream_session_id=target_session_id, message=elicit_params.message, requested_schema=elicit_params.requestedSchema, timeout=timeout
6140 )
6141 )
6143 # Get the pending elicitation to extract request_id
6144 # Wait a moment for it to be created
6145 await asyncio.sleep(0.01)
6146 pending_elicitations = [e for e in elicitation_service._pending.values() if e.downstream_session_id == target_session_id] # pylint: disable=protected-access
6147 if not pending_elicitations:
6148 raise JSONRPCError(-32000, "Failed to create elicitation request", {})
6150 pending = pending_elicitations[-1] # Get most recent
6152 # Send elicitation request to client via broadcast
6153 elicitation_request = {
6154 "jsonrpc": "2.0",
6155 "id": pending.request_id,
6156 "method": "elicitation/create",
6157 "params": {"message": elicit_params.message, "requestedSchema": elicit_params.requestedSchema},
6158 }
6160 await session_registry.broadcast(target_session_id, elicitation_request)
6161 logger.debug(f"Sent elicitation request {pending.request_id} to session {target_session_id}")
6163 # Wait for response
6164 elicit_result = await elicitation_task
6166 # Return result
6167 result = elicit_result.model_dump(by_alias=True, exclude_none=True)
6169 except asyncio.TimeoutError:
6170 raise JSONRPCError(-32000, f"Elicitation timed out after {timeout}s", {"message": elicit_params.message, "timeout": timeout})
6171 except ValueError as e:
6172 raise JSONRPCError(-32000, str(e), {"message": elicit_params.message})
6173 elif method.startswith("elicitation/"):
6174 # Catch-all for other elicitation/* methods
6175 result = {}
6176 elif method == "completion/complete":
6177 # MCP spec-compliant completion endpoint
6178 result = await completion_service.handle_completion(db, params)
6179 elif method.startswith("completion/"):
6180 # Catch-all for other completion/* methods (currently unsupported)
6181 result = {}
6182 elif method == "logging/setLevel":
6183 # MCP spec-compliant logging endpoint
6184 level = LogLevel(params.get("level"))
6185 await logging_service.set_level(level)
6186 result = {}
6187 elif method.startswith("logging/"):
6188 # Catch-all for other logging/* methods (currently unsupported)
6189 result = {}
6190 else:
6191 # Backward compatibility: Try to invoke as a tool directly
6192 # This allows both old format (method=tool_name) and new format (method=tools/call)
6193 # Standard
6194 headers = {k.lower(): v for k, v in request.headers.items()}
6196 # Get authorization context (same as tools/call)
6197 auth_user_email, auth_token_teams, auth_is_admin = _get_rpc_filter_context(request, user)
6198 if auth_is_admin and auth_token_teams is None:
6199 auth_user_email = None
6200 # auth_token_teams stays None (unrestricted)
6201 elif auth_token_teams is None:
6202 auth_token_teams = [] # Non-admin without teams = public-only
6204 # Get user email for OAuth token selection
6205 oauth_user_email = get_user_email(user)
6206 # Get server_id from params if provided
6207 server_id = params.get("server_id")
6208 # Get plugin contexts from request.state for cross-hook sharing
6209 plugin_context_table = getattr(request.state, "plugin_context_table", None)
6210 plugin_global_context = getattr(request.state, "plugin_global_context", None)
6212 meta_data = params.get("_meta", None)
6214 try:
6215 result = await tool_service.invoke_tool(
6216 db=db,
6217 name=method,
6218 arguments=params,
6219 request_headers=headers,
6220 app_user_email=oauth_user_email,
6221 user_email=auth_user_email,
6222 token_teams=auth_token_teams,
6223 server_id=server_id,
6224 plugin_context_table=plugin_context_table,
6225 plugin_global_context=plugin_global_context,
6226 meta_data=meta_data,
6227 )
6228 if hasattr(result, "model_dump"):
6229 result = result.model_dump(by_alias=True, exclude_none=True)
6230 except (PluginError, PluginViolationError):
6231 raise
6232 except (ValueError, Exception):
6233 # If not a tool, try forwarding to gateway
6234 try:
6235 result = await gateway_service.forward_request(db, method, params, app_user_email=oauth_user_email, user_email=auth_user_email, token_teams=auth_token_teams)
6236 if hasattr(result, "model_dump"):
6237 result = result.model_dump(by_alias=True, exclude_none=True)
6238 except Exception:
6239 # If all else fails, return invalid method error
6240 raise JSONRPCError(-32000, "Invalid method", params)
6242 return {"jsonrpc": "2.0", "result": result, "id": req_id}
6244 except (PluginError, PluginViolationError):
6245 raise
6246 except JSONRPCError as e:
6247 error = e.to_dict()
6248 return {"jsonrpc": "2.0", "error": error["error"], "id": req_id}
6249 except Exception as e:
6250 if isinstance(e, ValueError):
6251 return ORJSONResponse(content={"message": "Method invalid"}, status_code=422)
6252 logger.error(f"RPC error: {str(e)}")
6253 return {
6254 "jsonrpc": "2.0",
6255 "error": {"code": -32000, "message": "Internal error", "data": str(e)},
6256 "id": req_id,
6257 }
6260@utility_router.websocket("/ws")
6261async def websocket_endpoint(websocket: WebSocket):
6262 """
6263 Handle WebSocket connection to relay JSON-RPC requests to the internal RPC endpoint.
6265 Accepts incoming text messages, parses them as JSON-RPC requests, sends them to /rpc,
6266 and returns the result to the client over the same WebSocket.
6268 Args:
6269 websocket: The WebSocket connection instance.
6270 """
6271 # Track auth credentials to forward to /rpc
6272 auth_token: Optional[str] = None
6273 proxy_user: Optional[str] = None
6275 try:
6276 # Authenticate WebSocket connection
6277 if settings.mcp_client_auth_enabled or settings.auth_required:
6278 # Extract auth from query params or headers
6279 # Try to get token from query parameter
6280 if "token" in websocket.query_params:
6281 auth_token = websocket.query_params["token"]
6282 # Try to get token from Authorization header
6283 elif "authorization" in websocket.headers:
6284 auth_header = websocket.headers["authorization"]
6285 if auth_header.startswith("Bearer "): 6285 ↛ 6289line 6285 didn't jump to line 6289 because the condition on line 6285 was always true
6286 auth_token = auth_header[7:]
6288 # Check for proxy auth if MCP client auth is disabled
6289 if not settings.mcp_client_auth_enabled and settings.trust_proxy_auth:
6290 proxy_user = websocket.headers.get(settings.proxy_user_header)
6291 if not proxy_user and not auth_token:
6292 await websocket.close(code=1008, reason="Authentication required")
6293 return
6294 elif settings.auth_required and not auth_token:
6295 await websocket.close(code=1008, reason="Authentication required")
6296 return
6298 # Verify JWT token if provided and MCP client auth is enabled
6299 if auth_token and settings.mcp_client_auth_enabled:
6300 try:
6301 await verify_jwt_token(auth_token)
6302 except Exception:
6303 await websocket.close(code=1008, reason="Invalid authentication")
6304 return
6306 await websocket.accept()
6307 while True:
6308 try:
6309 data = await websocket.receive_text()
6310 client_args = {"timeout": settings.federation_timeout, "verify": not settings.skip_ssl_verify}
6312 # Build headers for /rpc request - forward auth credentials
6313 rpc_headers: Dict[str, str] = {"Content-Type": "application/json"}
6314 if auth_token:
6315 rpc_headers["Authorization"] = f"Bearer {auth_token}"
6316 if proxy_user:
6317 rpc_headers[settings.proxy_user_header] = proxy_user
6319 async with ResilientHttpClient(client_args=client_args) as client:
6320 response = await client.post(
6321 f"http://localhost:{settings.port}{settings.app_root_path}/rpc",
6322 json=orjson.loads(data),
6323 headers=rpc_headers,
6324 )
6325 await websocket.send_text(response.text)
6326 except JSONRPCError as e:
6327 await websocket.send_text(orjson.dumps(e.to_dict()).decode())
6328 except orjson.JSONDecodeError:
6329 await websocket.send_text(
6330 orjson.dumps(
6331 {
6332 "jsonrpc": "2.0",
6333 "error": {"code": -32700, "message": "Parse error"},
6334 "id": None,
6335 }
6336 ).decode()
6337 )
6338 except Exception as e:
6339 logger.error(f"WebSocket error: {str(e)}")
6340 await websocket.close(code=1011)
6341 break
6342 except WebSocketDisconnect:
6343 logger.info("WebSocket disconnected")
6344 except Exception as e:
6345 logger.error(f"WebSocket connection error: {str(e)}")
6346 try:
6347 await websocket.close(code=1011)
6348 except Exception as er:
6349 logger.error(f"Error while closing WebSocket: {er}")
6352@utility_router.get("/sse")
6353@require_permission("tools.invoke")
6354async def utility_sse_endpoint(request: Request, user=Depends(get_current_user_with_permissions)):
6355 """
6356 Establish a Server-Sent Events (SSE) connection for real-time updates.
6358 Args:
6359 request (Request): The incoming HTTP request.
6360 user (str): Authenticated username.
6362 Returns:
6363 StreamingResponse: A streaming response that keeps the connection
6364 open and pushes events to the client.
6366 Raises:
6367 HTTPException: Returned with **500 Internal Server Error** if the SSE connection cannot be established or an unexpected error occurs while creating the transport.
6368 asyncio.CancelledError: If the request is cancelled during SSE setup.
6369 """
6370 try:
6371 logger.debug("User %s requested SSE connection", user)
6372 base_url = update_url_protocol(request)
6374 # SSE transport generates its own session_id - server-initiated, not client-provided
6375 transport = SSETransport(base_url=base_url)
6376 await transport.connect()
6377 await session_registry.add_session(transport.session_id, transport)
6379 # Defensive cleanup callback - runs immediately on client disconnect
6380 async def on_disconnect_cleanup() -> None:
6381 """Clean up session when SSE client disconnects."""
6382 try:
6383 await session_registry.remove_session(transport.session_id)
6384 logger.debug("Defensive session cleanup completed: %s", transport.session_id)
6385 except Exception as e:
6386 logger.warning("Defensive session cleanup failed for %s: %s", transport.session_id, e)
6388 # Extract auth token from request (header OR cookie, like get_current_user_with_permissions)
6389 auth_token = None
6390 auth_header = request.headers.get("authorization", "")
6391 if auth_header.lower().startswith("bearer "):
6392 auth_token = auth_header[7:]
6393 elif hasattr(request, "cookies") and request.cookies:
6394 # Cookie auth (admin UI sessions)
6395 auth_token = request.cookies.get("jwt_token") or request.cookies.get("access_token")
6397 # Extract and normalize token teams
6398 # Returns None if no JWT payload (non-JWT auth), or list if JWT exists
6399 # SECURITY: Preserve None vs [] distinction for admin bypass:
6400 # - None: unrestricted (admin keeps bypass, non-admin gets their accessible resources)
6401 # - []: public-only (admin bypass disabled)
6402 # - [...]: team-scoped access
6403 token_teams = _get_token_teams_from_request(request)
6405 # Preserve is_admin from user object (for cookie-authenticated admins)
6406 is_admin = False
6407 if hasattr(user, "is_admin"):
6408 is_admin = getattr(user, "is_admin", False)
6409 elif isinstance(user, dict): 6409 ↛ 6413line 6409 didn't jump to line 6413 because the condition on line 6409 was always true
6410 is_admin = user.get("is_admin", False) or user.get("user", {}).get("is_admin", False)
6412 # Create enriched user dict
6413 user_with_token = dict(user) if isinstance(user, dict) else {"email": getattr(user, "email", str(user))}
6414 user_with_token["auth_token"] = auth_token
6415 user_with_token["token_teams"] = token_teams # None for unrestricted, [] for public-only, [...] for team-scoped
6416 user_with_token["is_admin"] = is_admin # Preserve admin status for fallback token
6418 # Create respond task and register for cancellation on disconnect
6419 respond_task = asyncio.create_task(session_registry.respond(None, user_with_token, session_id=transport.session_id, base_url=base_url))
6420 session_registry.register_respond_task(transport.session_id, respond_task)
6422 try:
6423 response = await transport.create_sse_response(request, on_disconnect_callback=on_disconnect_cleanup)
6424 except asyncio.CancelledError:
6425 # Request cancelled - still need to clean up to prevent orphaned tasks
6426 logger.debug("SSE request cancelled for %s, cleaning up", transport.session_id)
6427 try:
6428 await session_registry.remove_session(transport.session_id)
6429 except Exception as cleanup_error:
6430 logger.warning("Cleanup after SSE cancellation failed: %s", cleanup_error)
6431 raise # Re-raise CancelledError
6432 except Exception as sse_error:
6433 # CRITICAL: Cleanup on failure - respond task and session would be orphaned otherwise
6434 logger.error("create_sse_response failed for %s: %s", transport.session_id, sse_error)
6435 try:
6436 await session_registry.remove_session(transport.session_id)
6437 except Exception as cleanup_error:
6438 logger.warning("Cleanup after SSE failure also failed: %s", cleanup_error)
6439 raise
6441 tasks = BackgroundTasks()
6442 tasks.add_task(session_registry.remove_session, transport.session_id)
6443 response.background = tasks
6444 logger.info("SSE connection established: %s", transport.session_id)
6445 return response
6446 except Exception as e:
6447 logger.error("SSE connection error: %s", e)
6448 raise HTTPException(status_code=500, detail="SSE connection failed")
6451@utility_router.post("/message")
6452@require_permission("tools.invoke")
6453async def utility_message_endpoint(request: Request, user=Depends(get_current_user_with_permissions)):
6454 """
6455 Handle a JSON-RPC message directed to a specific SSE session.
6457 Args:
6458 request (Request): Incoming request containing the JSON-RPC payload.
6459 user (str): Authenticated user.
6461 Returns:
6462 JSONResponse: ``{"status": "success"}`` with HTTP 202 on success.
6464 Raises:
6465 HTTPException: * **400 Bad Request** - ``session_id`` query parameter is missing or the payload cannot be parsed as JSON.
6466 * **500 Internal Server Error** - An unexpected error occurs while broadcasting the message.
6467 """
6468 try:
6469 logger.debug("User %s sent a message to SSE session", user)
6471 session_id = request.query_params.get("session_id")
6472 if not session_id:
6473 logger.error("Missing session_id in message request")
6474 raise HTTPException(status_code=400, detail="Missing session_id")
6476 message = await _read_request_json(request)
6478 await session_registry.broadcast(
6479 session_id=session_id,
6480 message=message,
6481 )
6483 return ORJSONResponse(content={"status": "success"}, status_code=202)
6485 except ValueError as e:
6486 logger.error("Invalid message format: %s", e)
6487 raise HTTPException(status_code=400, detail=str(e))
6488 except HTTPException:
6489 raise
6490 except Exception as exc:
6491 logger.error("Message handling error: %s", exc)
6492 raise HTTPException(status_code=500, detail="Failed to process message")
6495@utility_router.post("/logging/setLevel")
6496@require_permission("admin.system_config")
6497async def set_log_level(request: Request, user=Depends(get_current_user_with_permissions)) -> None:
6498 """
6499 Update the server's log level at runtime.
6501 Args:
6502 request: HTTP request with log level JSON body.
6503 user: Authenticated user.
6505 Returns:
6506 None
6507 """
6508 logger.debug(f"User {user} requested to set log level")
6509 body = await _read_request_json(request)
6510 level = LogLevel(body["level"])
6511 await logging_service.set_level(level)
6512 return None
6515####################
6516# Metrics #
6517####################
6518@metrics_router.get("", response_model=dict)
6519@require_permission("admin.metrics")
6520async def get_metrics(db: Session = Depends(get_db), user=Depends(get_current_user_with_permissions)) -> dict:
6521 """
6522 Retrieve aggregated metrics for all entity types (Tools, Resources, Servers, Prompts, A2A Agents).
6524 Args:
6525 db: Database session
6526 user: Authenticated user
6528 Returns:
6529 A dictionary with keys for each entity type and their aggregated metrics.
6530 """
6531 logger.debug(f"User {user} requested aggregated metrics")
6532 tool_metrics = await tool_service.aggregate_metrics(db)
6533 resource_metrics = await resource_service.aggregate_metrics(db)
6534 server_metrics = await server_service.aggregate_metrics(db)
6535 prompt_metrics = await prompt_service.aggregate_metrics(db)
6537 metrics_result = {
6538 "tools": tool_metrics,
6539 "resources": resource_metrics,
6540 "servers": server_metrics,
6541 "prompts": prompt_metrics,
6542 }
6544 # Include A2A metrics only if A2A features are enabled
6545 if a2a_service and settings.mcpgateway_a2a_metrics_enabled: 6545 ↛ 6549line 6545 didn't jump to line 6549 because the condition on line 6545 was always true
6546 a2a_metrics = await a2a_service.aggregate_metrics(db)
6547 metrics_result["a2a_agents"] = a2a_metrics
6549 return metrics_result
6552@metrics_router.post("/reset", response_model=dict)
6553@require_permission("admin.metrics")
6554async def reset_metrics(entity: Optional[str] = None, entity_id: Optional[int] = None, db: Session = Depends(get_db), user=Depends(get_current_user_with_permissions)) -> dict:
6555 """
6556 Reset metrics for a specific entity type and optionally a specific entity ID,
6557 or perform a global reset if no entity is specified.
6559 Args:
6560 entity: One of "tool", "resource", "server", "prompt", "a2a_agent", or None for global reset.
6561 entity_id: Specific entity ID to reset metrics for (optional).
6562 db: Database session
6563 user: Authenticated user
6565 Returns:
6566 A success message in a dictionary.
6568 Raises:
6569 HTTPException: If an invalid entity type is specified.
6570 """
6571 logger.debug(f"User {user} requested metrics reset for entity: {entity}, id: {entity_id}")
6572 if entity is None:
6573 # Global reset
6574 await tool_service.reset_metrics(db)
6575 await resource_service.reset_metrics(db)
6576 await server_service.reset_metrics(db)
6577 await prompt_service.reset_metrics(db)
6578 if a2a_service and settings.mcpgateway_a2a_metrics_enabled: 6578 ↛ 6595line 6578 didn't jump to line 6595 because the condition on line 6578 was always true
6579 await a2a_service.reset_metrics(db)
6580 elif entity.lower() == "tool":
6581 await tool_service.reset_metrics(db, entity_id)
6582 elif entity.lower() == "resource":
6583 await resource_service.reset_metrics(db)
6584 elif entity.lower() == "server":
6585 await server_service.reset_metrics(db)
6586 elif entity.lower() == "prompt":
6587 await prompt_service.reset_metrics(db)
6588 elif entity.lower() in ("a2a_agent", "a2a"):
6589 if a2a_service and settings.mcpgateway_a2a_metrics_enabled:
6590 await a2a_service.reset_metrics(db, str(entity_id) if entity_id is not None else None)
6591 else:
6592 raise HTTPException(status_code=400, detail="A2A features are disabled")
6593 else:
6594 raise HTTPException(status_code=400, detail="Invalid entity type for metrics reset")
6595 return {"status": "success", "message": f"Metrics reset for {entity if entity else 'all entities'}"}
6598####################
6599# Healthcheck #
6600####################
6601@app.get("/health")
6602def healthcheck():
6603 """
6604 Perform a basic health check to verify database connectivity.
6606 Sync function so FastAPI runs it in a threadpool, avoiding event loop blocking.
6607 Uses a dedicated session to avoid cross-thread issues and double-commit
6608 from get_db dependency. All DB operations happen in the same thread.
6610 Returns:
6611 A dictionary with the health status and optional error message.
6612 """
6613 db = SessionLocal()
6614 try:
6615 db.execute(text("SELECT 1"))
6616 # Explicitly commit to release PgBouncer backend connection in transaction mode.
6617 db.commit()
6618 return {"status": "healthy"}
6619 except Exception as e:
6620 # Rollback, then invalidate if rollback fails (mirrors get_db cleanup).
6621 try:
6622 db.rollback()
6623 except Exception:
6624 try:
6625 db.invalidate()
6626 except Exception:
6627 pass # nosec B110 - Best effort cleanup on connection failure
6628 error_message = f"Database connection error: {str(e)}"
6629 logger.error(error_message)
6630 return {"status": "unhealthy", "error": error_message}
6631 finally:
6632 db.close()
6635@app.get("/ready")
6636async def readiness_check():
6637 """
6638 Perform a readiness check to verify if the application is ready to receive traffic.
6640 Creates and manages its own session inside the worker thread to ensure all DB
6641 operations (create, execute, commit, rollback, close) happen in the same thread.
6642 This avoids cross-thread session issues and double-commit from get_db.
6644 Returns:
6645 JSONResponse with status 200 if ready, 503 if not.
6646 """
6648 def _check_db() -> str | None:
6649 """Check database connectivity by executing a simple query.
6651 Returns:
6652 None if successful, error message string if failed.
6653 """
6654 # Create session in this thread - all DB operations stay in the same thread.
6655 db = SessionLocal()
6656 try:
6657 db.execute(text("SELECT 1"))
6658 # Explicitly commit to release PgBouncer backend connection.
6659 db.commit()
6660 return None # Success
6661 except Exception as e:
6662 # Rollback, then invalidate if rollback fails (mirrors get_db cleanup).
6663 try:
6664 db.rollback()
6665 except Exception:
6666 try:
6667 db.invalidate()
6668 except Exception:
6669 pass # nosec B110 - Best effort cleanup on connection failure
6670 return str(e)
6671 finally:
6672 db.close()
6674 # Run the blocking DB check in a thread to avoid blocking the event loop.
6675 error = await asyncio.to_thread(_check_db)
6676 if error:
6677 error_message = f"Readiness check failed: {error}"
6678 logger.error(error_message)
6679 return ORJSONResponse(content={"status": "not ready", "error": error_message}, status_code=503)
6680 return ORJSONResponse(content={"status": "ready"}, status_code=200)
6683@app.get("/health/security", tags=["health"])
6684async def security_health(request: Request):
6685 """
6686 Get the security configuration health status.
6688 Args:
6689 request (Request): The incoming HTTP request containing headers for authentication.
6691 Returns:
6692 dict: A dictionary containing the overall security health status, score,
6693 individual checks, warning count, and timestamp. Warnings are included
6694 only if authentication passes or when running in development mode.
6696 Raises:
6697 HTTPException: If authentication is required and the request does not
6698 include a valid bearer token in the Authorization header.
6699 """
6700 # Check authentication
6701 if settings.auth_required:
6702 # Verify the request is authenticated
6703 auth_header = request.headers.get("authorization")
6704 if not auth_header or not auth_header.startswith("Bearer "): 6704 ↛ 6707line 6704 didn't jump to line 6707 because the condition on line 6704 was always true
6705 raise HTTPException(401, "Authentication required for security health")
6707 security_status = settings.get_security_status()
6709 # Determine overall health
6710 score = security_status["security_score"]
6711 is_healthy = score >= 60 # Minimum acceptable score
6713 # Build response
6714 response = {
6715 "status": "healthy" if is_healthy else "unhealthy",
6716 "score": score,
6717 "checks": {
6718 "authentication": security_status["auth_enabled"],
6719 "secure_secrets": security_status["secure_secrets"],
6720 "ssl_verification": security_status["ssl_verification"],
6721 "debug_disabled": security_status["debug_disabled"],
6722 "cors_restricted": security_status["cors_restricted"],
6723 "ui_protected": security_status["ui_protected"],
6724 },
6725 "warning_count": len(security_status["warnings"]),
6726 "timestamp": datetime.now(timezone.utc).isoformat(),
6727 }
6729 # Include warnings only if authenticated or in dev mode
6730 if settings.dev_mode:
6731 response["warnings"] = security_status["warnings"]
6733 return response
6736####################
6737# Tag Endpoints #
6738####################
6741@tag_router.get("", response_model=List[TagInfo])
6742@tag_router.get("/", response_model=List[TagInfo])
6743@require_permission("tags.read")
6744async def list_tags(
6745 entity_types: Optional[str] = None,
6746 include_entities: bool = False,
6747 db: Session = Depends(get_db),
6748 user=Depends(get_current_user_with_permissions),
6749) -> List[TagInfo]:
6750 """
6751 Retrieve all unique tags across specified entity types.
6753 Args:
6754 entity_types: Comma-separated list of entity types to filter by
6755 (e.g., "tools,resources,prompts,servers,gateways").
6756 If not provided, returns tags from all entity types.
6757 include_entities: Whether to include the list of entities that have each tag
6758 db: Database session
6759 user: Authenticated user
6761 Returns:
6762 List of TagInfo objects containing tag names, statistics, and optionally entities
6764 Raises:
6765 HTTPException: If tag retrieval fails
6766 """
6767 # Parse entity types parameter if provided
6768 entity_types_list = None
6769 if entity_types:
6770 entity_types_list = [et.strip().lower() for et in entity_types.split(",") if et.strip()]
6772 logger.debug(f"User {user} is retrieving tags for entity types: {entity_types_list}, include_entities: {include_entities}")
6774 try:
6775 tags = await tag_service.get_all_tags(db, entity_types=entity_types_list, include_entities=include_entities)
6776 return tags
6777 except Exception as e:
6778 logger.error(f"Failed to retrieve tags: {str(e)}")
6779 raise HTTPException(status_code=500, detail=f"Failed to retrieve tags: {str(e)}")
6782@tag_router.get("/{tag_name}/entities", response_model=List[TaggedEntity])
6783@require_permission("tags.read")
6784async def get_entities_by_tag(
6785 tag_name: str,
6786 entity_types: Optional[str] = None,
6787 db: Session = Depends(get_db),
6788 user=Depends(get_current_user_with_permissions),
6789) -> List[TaggedEntity]:
6790 """
6791 Get all entities that have a specific tag.
6793 Args:
6794 tag_name: The tag to search for
6795 entity_types: Comma-separated list of entity types to filter by
6796 (e.g., "tools,resources,prompts,servers,gateways").
6797 If not provided, returns entities from all types.
6798 db: Database session
6799 user: Authenticated user
6801 Returns:
6802 List of TaggedEntity objects
6804 Raises:
6805 HTTPException: If entity retrieval fails
6806 """
6807 # Parse entity types parameter if provided
6808 entity_types_list = None
6809 if entity_types:
6810 entity_types_list = [et.strip().lower() for et in entity_types.split(",") if et.strip()]
6812 logger.debug(f"User {user} is retrieving entities for tag '{tag_name}' with entity types: {entity_types_list}")
6814 try:
6815 entities = await tag_service.get_entities_by_tag(db, tag_name=tag_name, entity_types=entity_types_list)
6816 return entities
6817 except Exception as e:
6818 logger.error(f"Failed to retrieve entities for tag '{tag_name}': {str(e)}")
6819 raise HTTPException(status_code=500, detail=f"Failed to retrieve entities: {str(e)}")
6822####################
6823# Export/Import #
6824####################
6827@export_import_router.get("/export", response_model=Dict[str, Any])
6828@require_permission("admin.export")
6829async def export_configuration(
6830 request: Request, # pylint: disable=unused-argument
6831 export_format: str = "json", # pylint: disable=unused-argument
6832 types: Optional[str] = None,
6833 exclude_types: Optional[str] = None,
6834 tags: Optional[str] = None,
6835 include_inactive: bool = False,
6836 include_dependencies: bool = True,
6837 db: Session = Depends(get_db),
6838 user=Depends(get_current_user_with_permissions),
6839) -> Dict[str, Any]:
6840 """
6841 Export gateway configuration to JSON format.
6843 Args:
6844 request: FastAPI request object for extracting root path
6845 export_format: Export format (currently only 'json' supported)
6846 types: Comma-separated list of entity types to include (tools,gateways,servers,prompts,resources,roots)
6847 exclude_types: Comma-separated list of entity types to exclude
6848 tags: Comma-separated list of tags to filter by
6849 include_inactive: Whether to include inactive entities
6850 include_dependencies: Whether to include dependent entities
6851 db: Database session
6852 user: Authenticated user
6854 Returns:
6855 Export data in the specified format
6857 Raises:
6858 HTTPException: If export fails
6859 """
6860 try:
6861 logger.info(f"User {user} requested configuration export")
6862 username: Optional[str] = None
6863 # Parse parameters
6864 include_types = None
6865 if types: 6865 ↛ 6868line 6865 didn't jump to line 6868 because the condition on line 6865 was always true
6866 include_types = [t.strip() for t in types.split(",") if t.strip()]
6868 exclude_types_list = None
6869 if exclude_types:
6870 exclude_types_list = [t.strip() for t in exclude_types.split(",") if t.strip()]
6872 tags_list = None
6873 if tags:
6874 tags_list = [t.strip() for t in tags.split(",") if t.strip()]
6876 # Extract username from user (which is now an EmailUser object)
6877 if hasattr(user, "email"):
6878 username = getattr(user, "email", None)
6879 elif isinstance(user, dict):
6880 username = user.get("email", None)
6881 else:
6882 username = None
6884 # Get root path for URL construction - prefer configured APP_ROOT_PATH
6885 root_path = settings.app_root_path
6887 # Perform export
6888 export_data = await export_service.export_configuration(
6889 db=db,
6890 include_types=include_types,
6891 exclude_types=exclude_types_list,
6892 tags=tags_list,
6893 include_inactive=include_inactive,
6894 include_dependencies=include_dependencies,
6895 exported_by=username or "unknown",
6896 root_path=root_path,
6897 )
6899 return export_data
6901 except ExportError as e:
6902 logger.error(f"Export failed for user {user}: {str(e)}")
6903 raise HTTPException(status_code=400, detail=str(e))
6904 except Exception as e:
6905 logger.error(f"Unexpected export error for user {user}: {str(e)}")
6906 raise HTTPException(status_code=500, detail=f"Export failed: {str(e)}")
6909@export_import_router.post("/export/selective", response_model=Dict[str, Any])
6910@require_permission("admin.export")
6911async def export_selective_configuration(
6912 entity_selections: Dict[str, List[str]] = Body(...), include_dependencies: bool = True, db: Session = Depends(get_db), user=Depends(get_current_user_with_permissions)
6913) -> Dict[str, Any]:
6914 """
6915 Export specific entities by their IDs/names.
6917 Args:
6918 entity_selections: Dict mapping entity types to lists of IDs/names to export
6919 include_dependencies: Whether to include dependent entities
6920 db: Database session
6921 user: Authenticated user
6923 Returns:
6924 Selective export data
6926 Raises:
6927 HTTPException: If export fails
6929 Example request body:
6930 {
6931 "tools": ["tool1", "tool2"],
6932 "servers": ["server1"],
6933 "prompts": ["prompt1"]
6934 }
6935 """
6936 try:
6937 logger.info(f"User {user} requested selective configuration export")
6939 username: Optional[str] = None
6940 # Extract username from user (which is now an EmailUser object)
6941 if hasattr(user, "email"):
6942 username = getattr(user, "email", None)
6943 elif isinstance(user, dict): 6943 ↛ 6947line 6943 didn't jump to line 6947 because the condition on line 6943 was always true
6944 username = user.get("email")
6946 # Get root path for URL construction - prefer configured APP_ROOT_PATH
6947 root_path = settings.app_root_path
6949 export_data = await export_service.export_selective(
6950 db=db, entity_selections=entity_selections, include_dependencies=include_dependencies, exported_by=username or "unknown", root_path=root_path
6951 )
6953 return export_data
6955 except ExportError as e:
6956 logger.error(f"Selective export failed for user {user}: {str(e)}")
6957 raise HTTPException(status_code=400, detail=str(e))
6958 except Exception as e:
6959 logger.error(f"Unexpected selective export error for user {user}: {str(e)}")
6960 raise HTTPException(status_code=500, detail=f"Export failed: {str(e)}")
6963@export_import_router.post("/import", response_model=Dict[str, Any])
6964@require_permission("admin.import")
6965async def import_configuration(
6966 import_data: Dict[str, Any] = Body(...),
6967 conflict_strategy: str = "update",
6968 dry_run: bool = False,
6969 rekey_secret: Optional[str] = None,
6970 selected_entities: Optional[Dict[str, List[str]]] = None,
6971 db: Session = Depends(get_db),
6972 user=Depends(get_current_user_with_permissions),
6973) -> Dict[str, Any]:
6974 """
6975 Import configuration data with conflict resolution.
6977 Args:
6978 import_data: The configuration data to import
6979 conflict_strategy: How to handle conflicts: skip, update, rename, fail
6980 dry_run: If true, validate but don't make changes
6981 rekey_secret: New encryption secret for cross-environment imports
6982 selected_entities: Dict of entity types to specific entity names/ids to import
6983 db: Database session
6984 user: Authenticated user
6986 Returns:
6987 Import status and results
6989 Raises:
6990 HTTPException: If import fails or validation errors occur
6991 """
6992 try:
6993 logger.info(f"User {user} requested configuration import (dry_run={dry_run})")
6995 # Validate conflict strategy
6996 try:
6997 strategy = ConflictStrategy(conflict_strategy.lower())
6998 except ValueError:
6999 raise HTTPException(status_code=400, detail=f"Invalid conflict strategy. Must be one of: {[s.value for s in list(ConflictStrategy)]}")
7001 # Extract username from user (which is now an EmailUser object)
7002 if hasattr(user, "email"):
7003 username = getattr(user, "email", None)
7004 elif isinstance(user, dict):
7005 username = user.get("email", None)
7006 else:
7007 username = None
7009 # Perform import
7010 import_status = await import_service.import_configuration(
7011 db=db, import_data=import_data, conflict_strategy=strategy, dry_run=dry_run, rekey_secret=rekey_secret, imported_by=username or "unknown", selected_entities=selected_entities
7012 )
7014 return import_status.to_dict()
7016 except ImportValidationError as e:
7017 logger.error(f"Import validation failed for user {user}: {str(e)}")
7018 raise HTTPException(status_code=422, detail=f"Validation error: {str(e)}")
7019 except ImportConflictError as e:
7020 logger.error(f"Import conflict for user {user}: {str(e)}")
7021 raise HTTPException(status_code=409, detail=f"Conflict error: {str(e)}")
7022 except ImportServiceError as e:
7023 logger.error(f"Import failed for user {user}: {str(e)}")
7024 raise HTTPException(status_code=400, detail=str(e))
7025 except Exception as e:
7026 logger.error(f"Unexpected import error for user {user}: {str(e)}")
7027 raise HTTPException(status_code=500, detail=f"Import failed: {str(e)}")
7030@export_import_router.get("/import/status/{import_id}", response_model=Dict[str, Any])
7031@require_permission("admin.import")
7032async def get_import_status(import_id: str, user=Depends(get_current_user_with_permissions)) -> Dict[str, Any]:
7033 """
7034 Get the status of an import operation.
7036 Args:
7037 import_id: The import operation ID
7038 user: Authenticated user
7040 Returns:
7041 Import status information
7043 Raises:
7044 HTTPException: If import not found
7045 """
7046 logger.debug(f"User {user} requested import status for {import_id}")
7048 import_status = import_service.get_import_status(import_id)
7049 if not import_status:
7050 raise HTTPException(status_code=404, detail=f"Import {import_id} not found")
7052 return import_status.to_dict()
7055@export_import_router.get("/import/status", response_model=List[Dict[str, Any]])
7056@require_permission("admin.import")
7057async def list_import_statuses(user=Depends(get_current_user_with_permissions)) -> List[Dict[str, Any]]:
7058 """
7059 List all import operation statuses.
7061 Args:
7062 user: Authenticated user
7064 Returns:
7065 List of import status information
7066 """
7067 logger.debug(f"User {user} requested all import statuses")
7069 statuses = import_service.list_import_statuses()
7070 return [status.to_dict() for status in statuses]
7073@export_import_router.post("/import/cleanup", response_model=Dict[str, Any])
7074@require_permission("admin.import")
7075async def cleanup_import_statuses(max_age_hours: int = 24, user=Depends(get_current_user_with_permissions)) -> Dict[str, Any]:
7076 """
7077 Clean up completed import statuses older than specified age.
7079 Args:
7080 max_age_hours: Maximum age in hours for keeping completed imports
7081 user: Authenticated user
7083 Returns:
7084 Cleanup results
7085 """
7086 logger.info(f"User {user} requested import status cleanup (max_age_hours={max_age_hours})")
7088 removed_count = import_service.cleanup_completed_imports(max_age_hours)
7089 return {"status": "success", "message": f"Cleaned up {removed_count} completed import statuses", "removed_count": removed_count}
7092# Mount static files
7093# app.mount("/static", StaticFiles(directory=str(settings.static_dir)), name="static")
7095# Include routers
7096app.include_router(version_router)
7097app.include_router(protocol_router)
7098app.include_router(tool_router)
7099app.include_router(resource_router)
7100app.include_router(prompt_router)
7101app.include_router(gateway_router)
7102app.include_router(root_router)
7103app.include_router(utility_router)
7104app.include_router(server_router)
7105app.include_router(server_well_known_router, prefix="/servers")
7106app.include_router(metrics_router)
7107app.include_router(tag_router)
7108app.include_router(export_import_router)
7110# Include log search router if structured logging is enabled
7111if getattr(settings, "structured_logging_enabled", True):
7112 try:
7113 # First-Party
7114 from mcpgateway.routers.log_search import router as log_search_router
7116 app.include_router(log_search_router)
7117 logger.info("Log search router included - structured logging enabled")
7118 except ImportError as e:
7119 logger.warning(f"Failed to import log search router: {e}")
7120else:
7121 logger.info("Log search router not included - structured logging disabled")
7123# Conditionally include observability router if enabled
7124if settings.observability_enabled:
7125 # First-Party
7126 from mcpgateway.routers.observability import router as observability_router
7128 app.include_router(observability_router)
7129 logger.info("Observability router included - observability API endpoints enabled")
7130else:
7131 logger.info("Observability router not included - observability disabled")
7133# Conditionally include metrics maintenance router if cleanup or rollup is enabled
7134if settings.metrics_cleanup_enabled or settings.metrics_rollup_enabled: 7134 ↛ 7142line 7134 didn't jump to line 7142 because the condition on line 7134 was always true
7135 # First-Party
7136 from mcpgateway.routers.metrics_maintenance import router as metrics_maintenance_router
7138 app.include_router(metrics_maintenance_router)
7139 logger.info("Metrics maintenance router included - cleanup/rollup API endpoints enabled")
7141# Conditionally include A2A router if A2A features are enabled
7142if settings.mcpgateway_a2a_enabled:
7143 app.include_router(a2a_router)
7144 logger.info("A2A router included - A2A features enabled")
7145else:
7146 logger.info("A2A router not included - A2A features disabled")
7148app.include_router(well_known_router)
7150# Include Email Authentication router if enabled
7151if settings.email_auth_enabled:
7152 try:
7153 # First-Party
7154 from mcpgateway.routers.auth import auth_router
7155 from mcpgateway.routers.email_auth import email_auth_router
7157 app.include_router(email_auth_router, prefix="/auth/email", tags=["Email Authentication"])
7158 app.include_router(auth_router, tags=["Main Authentication"])
7159 logger.info("Authentication routers included - Auth enabled")
7161 # Include SSO router if enabled
7162 if settings.sso_enabled:
7163 try:
7164 # First-Party
7165 from mcpgateway.routers.sso import sso_router
7167 app.include_router(sso_router, tags=["SSO Authentication"])
7168 logger.info("SSO router included - SSO authentication enabled")
7169 except ImportError as e:
7170 logger.error(f"SSO router not available: {e}")
7171 else:
7172 logger.info("SSO router not included - SSO authentication disabled")
7173 except ImportError as e:
7174 logger.error(f"Authentication routers not available: {e}")
7175else:
7176 logger.info("Email authentication router not included - Email auth disabled")
7178# Include Team Management router if email auth is enabled
7179if settings.email_auth_enabled:
7180 try:
7181 # First-Party
7182 from mcpgateway.routers.teams import teams_router
7184 app.include_router(teams_router, prefix="/teams", tags=["Teams"])
7185 logger.info("Team management router included - Teams enabled with email auth")
7186 except ImportError as e:
7187 logger.error(f"Team management router not available: {e}")
7188else:
7189 logger.info("Team management router not included - Email auth disabled")
7191# Include JWT Token Catalog router if email auth is enabled
7192if settings.email_auth_enabled:
7193 try:
7194 # First-Party
7195 from mcpgateway.routers.tokens import router as tokens_router
7197 app.include_router(tokens_router, tags=["JWT Token Catalog"])
7198 logger.info("JWT Token Catalog router included - Token management enabled with email auth")
7199 except ImportError as e:
7200 logger.error(f"JWT Token Catalog router not available: {e}")
7201else:
7202 logger.info("JWT Token Catalog router not included - Email auth disabled")
7204# Include RBAC router if email auth is enabled
7205if settings.email_auth_enabled:
7206 try:
7207 # First-Party
7208 from mcpgateway.routers.rbac import router as rbac_router
7210 app.include_router(rbac_router, tags=["RBAC"])
7211 logger.info("RBAC router included - Role-based access control enabled")
7212 except ImportError as e:
7213 logger.error(f"RBAC router not available: {e}")
7214else:
7215 logger.info("RBAC router not included - Email auth disabled")
7217# Include OAuth router
7218try:
7219 # First-Party
7220 from mcpgateway.routers.oauth_router import oauth_router
7222 app.include_router(oauth_router)
7223 logger.info("OAuth router included")
7224except ImportError:
7225 logger.debug("OAuth router not available")
7227# Include reverse proxy router if enabled
7228try:
7229 # First-Party
7230 from mcpgateway.routers.reverse_proxy import router as reverse_proxy_router
7232 app.include_router(reverse_proxy_router)
7233 logger.info("Reverse proxy router included")
7234except ImportError:
7235 logger.debug("Reverse proxy router not available")
7237# Include LLMChat router
7238if settings.llmchat_enabled:
7239 try:
7240 # First-Party
7241 from mcpgateway.routers.llmchat_router import llmchat_router
7243 app.include_router(llmchat_router)
7244 logger.info("LLM Chat router included")
7245 except ImportError:
7246 logger.debug("LLM Chat router not available")
7248 # Include LLM configuration and proxy routers (internal API)
7249 try:
7250 # First-Party
7251 from mcpgateway.routers.llm_admin_router import llm_admin_router
7252 from mcpgateway.routers.llm_config_router import llm_config_router
7253 from mcpgateway.routers.llm_proxy_router import llm_proxy_router
7255 app.include_router(llm_config_router, prefix="/llm", tags=["LLM Configuration"])
7256 app.include_router(llm_proxy_router, prefix=settings.llm_api_prefix, tags=["LLM Proxy"])
7257 app.include_router(llm_admin_router, prefix="/admin/llm", tags=["LLM Admin"])
7258 logger.info("LLM configuration, proxy, and admin routers included")
7259 except ImportError as e:
7260 logger.debug(f"LLM routers not available: {e}")
7262# Include Toolops router
7263if settings.toolops_enabled:
7264 try:
7265 # First-Party
7266 from mcpgateway.routers.toolops_router import toolops_router
7268 app.include_router(toolops_router)
7269 logger.info("Toolops router included")
7270 except ImportError:
7271 logger.debug("Toolops router not available")
7273# Cancellation router (tool cancellation endpoints)
7274if settings.mcpgateway_tool_cancellation_enabled:
7275 try:
7276 # First-Party
7277 from mcpgateway.routers.cancellation_router import router as cancellation_router
7279 app.include_router(cancellation_router)
7280 logger.info("Cancellation router included (tool cancellation enabled)")
7281 except ImportError:
7282 logger.debug("Orchestrate router not available")
7283else:
7284 logger.info("Tool cancellation feature disabled - cancellation endpoints not available")
7286# Feature flags for admin UI and API
7287UI_ENABLED = settings.mcpgateway_ui_enabled
7288ADMIN_API_ENABLED = settings.mcpgateway_admin_api_enabled
7289logger.info(f"Admin UI enabled: {UI_ENABLED}")
7290logger.info(f"Admin API enabled: {ADMIN_API_ENABLED}")
7292# Conditional UI and admin API handling
7293if ADMIN_API_ENABLED:
7294 logger.info("Including admin_router - Admin API enabled")
7295 app.include_router(admin_router) # Admin routes imported from admin.py
7296else:
7297 logger.warning("Admin API routes not mounted - Admin API disabled via MCPGATEWAY_ADMIN_API_ENABLED=False")
7299# Streamable http Mount
7300app.mount("/mcp", app=streamable_http_session.handle_streamable_http)
7302# Conditional static files mounting and root redirect
7303if UI_ENABLED:
7304 # Mount static files for UI
7305 logger.info("Mounting static files - UI enabled")
7306 try:
7307 # Create a sub-application for static files that will respect root_path
7308 static_app = StaticFiles(directory=str(settings.static_dir))
7309 STATIC_PATH = "/static"
7311 app.mount(
7312 STATIC_PATH,
7313 static_app,
7314 name="static",
7315 )
7316 logger.info("Static assets served from %s at %s", settings.static_dir, STATIC_PATH)
7317 except RuntimeError as exc:
7318 logger.warning(
7319 "Static dir %s not found - Admin UI disabled (%s)",
7320 settings.static_dir,
7321 exc,
7322 )
7324 # Redirect root path to admin UI
7325 @app.get("/")
7326 async def root_redirect():
7327 """
7328 Redirects the root path ("/") to "/admin/".
7330 Logs a debug message before redirecting.
7332 Returns:
7333 RedirectResponse: Redirects to /admin/.
7335 Raises:
7336 HTTPException: If there is an error during redirection.
7337 """
7338 logger.debug("Redirecting root path to /admin/")
7339 root_path = settings.app_root_path
7340 return RedirectResponse(f"{root_path}/admin/", status_code=303)
7341 # return RedirectResponse(request.url_for("admin_home"))
7343 # Redirect /favicon.ico to /static/favicon.ico for browser compatibility
7344 @app.get("/favicon.ico", include_in_schema=False)
7345 async def favicon_redirect() -> RedirectResponse:
7346 """Redirect /favicon.ico to /static/favicon.ico for browser compatibility.
7348 Returns:
7349 RedirectResponse: 301 redirect to /static/favicon.ico.
7350 """
7351 root_path = settings.app_root_path
7352 return RedirectResponse(f"{root_path}/static/favicon.ico", status_code=301)
7354else:
7355 # If UI is disabled, provide API info at root
7356 logger.warning("Static files not mounted - UI disabled via MCPGATEWAY_UI_ENABLED=False")
7358 @app.get("/")
7359 async def root_info():
7360 """
7361 Returns basic API information at the root path.
7363 Logs an info message indicating UI is disabled and provides details
7364 about the app, including its name, version, and whether the UI and
7365 admin API are enabled.
7367 Returns:
7368 dict: API info with app name, version, and UI/admin API status.
7369 """
7370 logger.info("UI disabled, serving API info at root path")
7371 return {"name": settings.app_name, "version": __version__, "description": f"{settings.app_name} API - UI is disabled", "ui_enabled": False, "admin_api_enabled": ADMIN_API_ENABLED}
7374# Expose some endpoints at the root level as well
7375app.post("/initialize")(initialize)
7376app.post("/notifications")(handle_notification)