Coverage for mcpgateway / translate.py: 99%
817 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
3'''Location: ./mcpgateway/translate.py
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
6Authors: Mihai Criveti, Manav Gupta
8r"""Bridges between different MCP transport protocols.
9This module provides bidirectional bridging between MCP servers that communicate
10via different transport protocols: stdio/JSON-RPC, HTTP/SSE, and streamable HTTP.
11It enables exposing local MCP servers over HTTP or consuming remote endpoints
12as local stdio servers.
14The bridge supports multiple modes of operation:
15- stdio to SSE: Expose a local stdio MCP server over HTTP/SSE
16- SSE to stdio: Bridge a remote SSE endpoint to local stdio
17- stdio to streamable HTTP: Expose a local stdio MCP server via streamable HTTP
18- streamable HTTP to stdio: Bridge a remote streamable HTTP endpoint to local stdio
20Examples:
21 Programmatic usage:
23 >>> import asyncio
24 >>> from mcpgateway.translate import start_stdio
25 >>> asyncio.run(start_stdio("uvx mcp-server-git", 9000, "info", None, "127.0.0.1")) # doctest: +SKIP
27 Test imports and configuration:
29 >>> from mcpgateway.translate import MCPServer, StreamableHTTPSessionManager
30 >>> isinstance(MCPServer, type)
31 True
32 >>> isinstance(StreamableHTTPSessionManager, type)
33 True
34 >>> from mcpgateway.translate import KEEP_ALIVE_INTERVAL
35 >>> KEEP_ALIVE_INTERVAL > 0
36 True
37 >>> from mcpgateway.translate import DEFAULT_KEEPALIVE_ENABLED
38 >>> isinstance(DEFAULT_KEEPALIVE_ENABLED, bool)
39 True
41 Test Starlette imports:
43 >>> from mcpgateway.translate import Starlette, Route
44 >>> isinstance(Starlette, type)
45 True
46 >>> isinstance(Route, type)
47 True
49 Test logging setup:
51 >>> from mcpgateway.translate import LOGGER, logging_service
52 >>> LOGGER is not None
53 True
54 >>> logging_service is not None
55 True
56 >>> hasattr(LOGGER, 'info')
57 True
58 >>> hasattr(LOGGER, 'error')
59 True
60 >>> hasattr(LOGGER, 'debug')
61 True
63 Test utility classes:
65 >>> from mcpgateway.translate import _PubSub, StdIOEndpoint
66 >>> pubsub = _PubSub()
67 >>> hasattr(pubsub, 'publish')
68 True
69 >>> hasattr(pubsub, 'subscribe')
70 True
71 >>> hasattr(pubsub, 'unsubscribe')
72 True
74Usage:
75 Command line usage::
77 # 1. Expose an MCP server that talks JSON-RPC on stdio at :9000/sse
78 python3 -m mcpgateway.translate --stdio "uvx mcp-server-git" --port 9000
80 # 2. Bridge a remote SSE endpoint to local stdio
81 python3 -m mcpgateway.translate --sse "https://example.com/sse" \
82 --stdioCommand "uvx mcp-client"
84 # 3. Expose stdio server via streamable HTTP at :9000/mcp
85 python3 -m mcpgateway.translate --streamableHttp "uvx mcp-server-git" \
86 --port 9000 --stateless --jsonResponse
88 # 4. Connect to remote streamable HTTP endpoint
89 python3 -m mcpgateway.translate \
90 --streamableHttp "https://example.com/mcp" \
91 --oauth2Bearer "your-token"
93 # 5. Test SSE endpoint
94 curl -N http://localhost:9000/sse # receive the stream
96 # 6. Send a test echo request to SSE endpoint
97 curl -X POST http://localhost:9000/message \
98 -H 'Content-Type: application/json' \
99 -d '{"jsonrpc":"2.0","id":1,"method":"echo","params":{"value":"hi"}}'
101 # 7. Test streamable HTTP endpoint
102 curl -X POST http://localhost:9000/mcp \
103 -H 'Content-Type: application/json' \
104 -d '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{},"clientInfo":{"name":"demo","version":"0.0.1"}}}'
106 The SSE stream emits JSON-RPC responses as ``event: message`` frames and sends
107 regular ``event: keepalive`` frames (default every 30s) to prevent timeouts.
109 Streamable HTTP supports both stateful (with session management) and stateless
110 modes, and can return either JSON responses or SSE streams.
111"""
112'''
114# Future
115from __future__ import annotations
117# Standard
118import argparse
119import asyncio
120from contextlib import suppress
121import logging
122import os
123import shlex
124import signal
125import sys
126from typing import Any, AsyncIterator, Dict, List, Optional, Sequence, Tuple
127from urllib.parse import urlencode
128import uuid
130# Third-Party
131import anyio
132from fastapi import FastAPI, Request, Response, status
133from fastapi.middleware.cors import CORSMiddleware
134from fastapi.responses import PlainTextResponse
135from mcp.server import Server as MCPServer
136from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
137import orjson
138from starlette.applications import Starlette
139from starlette.routing import Route
140from starlette.types import Receive, Scope, Send
141import uvicorn
143try:
144 # Third-Party
145 import httpx
146except ImportError:
147 httpx = None # type: ignore[assignment]
149# First-Party
150from mcpgateway.services.logging_service import LoggingService
151from mcpgateway.translate_header_utils import extract_env_vars_from_headers, NormalizedMappings, parse_header_mappings
153# Use patched EventSourceResponse with CPU spin protection (anyio#695 fix)
154from mcpgateway.transports.sse_transport import EventSourceResponse
155from mcpgateway.utils.orjson_response import ORJSONResponse
157# Initialize logging service first
158logging_service = LoggingService()
159LOGGER = logging_service.get_logger("mcpgateway.translate")
160CONTENT_TYPE = os.getenv("FORGE_CONTENT_TYPE", "application/json")
161# headers = {"Content-Type": CONTENT_TYPE}
162# Import settings for default keepalive interval
163try:
164 # First-Party
165 from mcpgateway.config import settings
167 DEFAULT_KEEP_ALIVE_INTERVAL = settings.sse_keepalive_interval
168 DEFAULT_KEEPALIVE_ENABLED = settings.sse_keepalive_enabled
169 DEFAULT_SSL_VERIFY = not settings.skip_ssl_verify
170except ImportError:
171 # Fallback if config not available
172 DEFAULT_KEEP_ALIVE_INTERVAL = 30
173 DEFAULT_KEEPALIVE_ENABLED = True
174 DEFAULT_SSL_VERIFY = True # Verify SSL by default when config unavailable
176KEEP_ALIVE_INTERVAL = DEFAULT_KEEP_ALIVE_INTERVAL # seconds - from config or fallback to 30
178# Buffer limit for subprocess stdout (default 64KB is too small for large tool responses)
179# Set to 16MB to handle tools that return large amounts of data (e.g., search results)
180STDIO_BUFFER_LIMIT = 16 * 1024 * 1024 # 16MB
182__all__ = ["main"] # for console-script entry-point
185# ---------------------------------------------------------------------------#
186# Helpers - trivial in-process Pub/Sub #
187# ---------------------------------------------------------------------------#
188class _PubSub:
189 """Very small fan-out helper - one async Queue per subscriber.
191 This class implements a simple publish-subscribe pattern using asyncio queues
192 for distributing messages from stdio subprocess to multiple SSE clients.
194 Examples:
195 >>> import asyncio
196 >>> async def test_pubsub():
197 ... pubsub = _PubSub()
198 ... q = pubsub.subscribe()
199 ... await pubsub.publish("hello")
200 ... result = await q.get()
201 ... pubsub.unsubscribe(q)
202 ... return result
203 >>> asyncio.run(test_pubsub())
204 'hello'
205 """
207 def __init__(self) -> None:
208 """Initialize a new publish-subscribe system.
210 Creates an empty list of subscriber queues. Each subscriber will
211 receive their own asyncio.Queue for receiving published messages.
213 Examples:
214 >>> pubsub = _PubSub()
215 >>> isinstance(pubsub._subscribers, list)
216 True
217 >>> len(pubsub._subscribers)
218 0
219 >>> hasattr(pubsub, '_subscribers')
220 True
221 """
222 self._subscribers: List[asyncio.Queue[str]] = []
224 async def publish(self, data: str) -> None:
225 """Publish data to all subscribers.
227 Dead queues (full) are automatically removed from the subscriber list.
229 Args:
230 data: The data string to publish to all subscribers.
232 Examples:
233 >>> import asyncio
234 >>> async def test_publish():
235 ... pubsub = _PubSub()
236 ... await pubsub.publish("test") # No subscribers, no error
237 ... return True
238 >>> asyncio.run(test_publish())
239 True
241 >>> # Test queue full handling
242 >>> async def test_full_queue():
243 ... pubsub = _PubSub()
244 ... # Create a queue with size 1
245 ... q = asyncio.Queue(maxsize=1)
246 ... pubsub._subscribers = [q]
247 ... # Fill the queue
248 ... await q.put("first")
249 ... # This should remove the full queue
250 ... await pubsub.publish("second")
251 ... return len(pubsub._subscribers)
252 >>> asyncio.run(test_full_queue())
253 0
254 """
255 dead: List[asyncio.Queue[str]] = []
256 for q in self._subscribers:
257 try:
258 q.put_nowait(data)
259 except asyncio.QueueFull:
260 dead.append(q)
261 for q in dead:
262 with suppress(ValueError):
263 self._subscribers.remove(q)
265 def subscribe(self) -> "asyncio.Queue[str]":
266 """Subscribe to published data.
268 Creates a new queue for receiving published messages with a maximum
269 size of 1024 items.
271 Returns:
272 asyncio.Queue[str]: A queue that will receive published data.
274 Examples:
275 >>> pubsub = _PubSub()
276 >>> q = pubsub.subscribe()
277 >>> isinstance(q, asyncio.Queue)
278 True
279 >>> q.maxsize
280 1024
281 >>> len(pubsub._subscribers)
282 1
283 >>> pubsub._subscribers[0] is q
284 True
285 """
286 q: asyncio.Queue[str] = asyncio.Queue(maxsize=1024)
287 self._subscribers.append(q)
288 return q
290 def unsubscribe(self, q: "asyncio.Queue[str]") -> None:
291 """Unsubscribe from published data.
293 Removes the queue from the subscriber list. Safe to call even if
294 the queue is not in the list.
296 Args:
297 q: The queue to unsubscribe from published data.
299 Examples:
300 >>> pubsub = _PubSub()
301 >>> q = pubsub.subscribe()
302 >>> pubsub.unsubscribe(q)
303 >>> pubsub.unsubscribe(q) # No error on double unsubscribe
304 """
305 with suppress(ValueError):
306 self._subscribers.remove(q)
309# ---------------------------------------------------------------------------#
310# StdIO endpoint (child process ↔ async queues) #
311# ---------------------------------------------------------------------------#
312class StdIOEndpoint:
313 """Wrap a child process whose stdin/stdout speak line-delimited JSON-RPC.
315 This class manages a subprocess that communicates via stdio using JSON-RPC
316 protocol, pumping messages between the subprocess and a pubsub system.
318 Examples:
319 >>> import asyncio
320 >>> async def test_stdio():
321 ... pubsub = _PubSub()
322 ... stdio = StdIOEndpoint("echo hello", pubsub)
323 ... # Would start a real subprocess
324 ... return isinstance(stdio, StdIOEndpoint)
325 >>> asyncio.run(test_stdio())
326 True
327 """
329 def __init__(self, cmd: str, pubsub: _PubSub, env_vars: Optional[Dict[str, str]] = None, header_mappings: Optional[NormalizedMappings] = None) -> None:
330 """Initialize a stdio endpoint for subprocess communication.
332 Sets up the endpoint with the command to run and the pubsub system
333 for message distribution. The subprocess is not started until start()
334 is called.
336 Args:
337 cmd: The command string to execute as a subprocess.
338 pubsub: The publish-subscribe system for distributing subprocess
339 output to SSE clients.
340 env_vars: Optional dictionary of environment variables to set
341 when starting the subprocess.
342 header_mappings: Optional mapping of HTTP headers to environment variable names
343 for dynamic environment injection.
345 Examples:
346 >>> pubsub = _PubSub()
347 >>> endpoint = StdIOEndpoint("echo hello", pubsub)
348 >>> endpoint._cmd
349 'echo hello'
350 >>> endpoint._proc is None
351 True
352 >>> isinstance(endpoint._pubsub, _PubSub)
353 True
354 >>> endpoint._stdin is None
355 True
356 >>> endpoint._pump_task is None
357 True
358 >>> endpoint._pubsub is pubsub
359 True
360 """
361 self._cmd = cmd
362 self._pubsub = pubsub
363 self._env_vars = env_vars or {}
364 self._header_mappings = header_mappings
365 self._proc: Optional[asyncio.subprocess.Process] = None
366 self._stdin: Optional[asyncio.StreamWriter] = None
367 self._pump_task: Optional[asyncio.Task[None]] = None
369 async def start(self, additional_env_vars: Optional[Dict[str, str]] = None) -> None:
370 """Start the stdio subprocess with custom environment variables.
372 Creates the subprocess and starts the stdout pump task. The subprocess
373 is created with stdin/stdout pipes and stderr passed through.
375 Args:
376 additional_env_vars: Optional dictionary of additional environment
377 variables to set when starting the subprocess. These will be
378 combined with the environment variables set during initialization.
380 Raises:
381 RuntimeError: If the subprocess fails to create stdin/stdout pipes.
383 Examples:
384 >>> import asyncio # doctest: +SKIP
385 >>> async def test_start(): # doctest: +SKIP
386 ... pubsub = _PubSub()
387 ... stdio = StdIOEndpoint("cat", pubsub)
388 ... # await stdio.start() # doctest: +SKIP
389 ... return True
390 >>> asyncio.run(test_start()) # doctest: +SKIP
391 True
392 """
393 # Stop existing subprocess before starting a new one
394 if self._proc is not None:
395 await self.stop()
397 LOGGER.info(f"Starting stdio subprocess: {self._cmd}")
399 # Build environment from base + configured + additional
400 env = os.environ.copy()
401 env.update(self._env_vars)
402 if additional_env_vars:
403 env.update(additional_env_vars)
405 # System-critical environment variables that must never be cleared
406 system_critical_vars = {"PATH", "HOME", "TMPDIR", "TEMP", "TMP", "USER", "LOGNAME", "SHELL", "LANG", "LC_ALL", "LC_CTYPE", "PYTHONHOME", "PYTHONPATH"}
408 # Clear any mapped env vars that weren't provided in headers to avoid inheritance
409 if self._header_mappings:
410 for env_var_name in self._header_mappings.values():
411 if env_var_name not in (additional_env_vars or {}) and env_var_name not in system_critical_vars:
412 # Delete the variable instead of setting to empty string to avoid
413 # breaking subprocess initialization
414 env.pop(env_var_name, None)
416 LOGGER.debug(f"Subprocess environment variables: {list(env.keys())}")
417 self._proc = await asyncio.create_subprocess_exec(
418 *shlex.split(self._cmd),
419 stdin=asyncio.subprocess.PIPE,
420 stdout=asyncio.subprocess.PIPE,
421 stderr=sys.stderr, # passthrough for visibility
422 env=env, # 🔑 Add environment variable support
423 limit=STDIO_BUFFER_LIMIT, # Increase buffer limit for large tool responses
424 )
426 # Explicit error checking
427 if not self._proc.stdin or not self._proc.stdout:
428 raise RuntimeError(f"Failed to create subprocess with stdin/stdout pipes for command: {self._cmd}")
430 LOGGER.debug("Subprocess started successfully")
432 self._stdin = self._proc.stdin
433 self._pump_task = asyncio.create_task(self._pump_stdout())
435 async def stop(self) -> None:
436 """Stop the stdio subprocess.
438 Terminates the subprocess gracefully with a 5-second timeout,
439 then cancels the pump task.
441 Examples:
442 >>> import asyncio
443 >>> async def test_stop():
444 ... pubsub = _PubSub()
445 ... stdio = StdIOEndpoint("cat", pubsub)
446 ... await stdio.stop() # Safe to call even if not started
447 ... return True
448 >>> asyncio.run(test_stop())
449 True
450 """
451 if self._proc is None:
452 return
454 # Check if process is still running
455 try:
456 if self._proc.returncode is not None:
457 # Process already terminated
458 LOGGER.info(f"Subprocess (pid={self._proc.pid}) already terminated")
459 self._proc = None
460 self._stdin = None
461 return
462 except (ProcessLookupError, AttributeError):
463 # Process doesn't exist or is already cleaned up
464 LOGGER.info("Subprocess already cleaned up")
465 self._proc = None
466 self._stdin = None
467 return
469 LOGGER.info(f"Stopping subprocess (pid={self._proc.pid})")
470 try:
471 self._proc.terminate()
472 with suppress(asyncio.TimeoutError):
473 await asyncio.wait_for(self._proc.wait(), timeout=5)
474 except ProcessLookupError:
475 # Process already terminated
476 LOGGER.info("Subprocess already terminated")
477 finally:
478 if self._pump_task:
479 self._pump_task.cancel()
480 # Wait for pump task to actually finish, suppressing all exceptions
481 # since the pump task logs its own errors. Use BaseException to catch
482 # CancelledError which inherits from BaseException in Python 3.8+
483 with suppress(BaseException):
484 await self._pump_task
485 self._proc = None
486 self._stdin = None # Reset stdin too!
488 def is_running(self) -> bool:
489 """Check if the stdio subprocess is currently running.
491 Returns:
492 True if the subprocess is running, False otherwise.
494 Examples:
495 >>> import asyncio
496 >>> async def test_is_running():
497 ... pubsub = _PubSub()
498 ... stdio = StdIOEndpoint("cat", pubsub)
499 ... return stdio.is_running()
500 >>> asyncio.run(test_is_running())
501 False
502 """
503 return self._proc is not None
505 async def send(self, raw: str) -> None:
506 """Send data to the subprocess stdin.
508 Args:
509 raw: The raw data string to send to the subprocess.
511 Raises:
512 RuntimeError: If the stdio endpoint is not started.
514 Examples:
515 >>> import asyncio
516 >>> async def test_send():
517 ... pubsub = _PubSub()
518 ... stdio = StdIOEndpoint("cat", pubsub)
519 ... try:
520 ... await stdio.send("test")
521 ... except RuntimeError as e:
522 ... return str(e)
523 >>> asyncio.run(test_send())
524 'stdio endpoint not started'
525 """
526 if not self._stdin:
527 raise RuntimeError("stdio endpoint not started")
528 LOGGER.debug(f"→ stdio: {raw.strip()}")
529 self._stdin.write(raw.encode())
530 await self._stdin.drain()
532 async def _pump_stdout(self) -> None:
533 """Pump stdout from subprocess to pubsub.
535 Continuously reads lines from the subprocess stdout and publishes them
536 to the pubsub system. Runs until EOF or exception.
538 Raises:
539 RuntimeError: If process or stdout is not properly initialized.
540 Exception: For any other error encountered while pumping stdout.
541 """
542 if not self._proc or not self._proc.stdout:
543 raise RuntimeError("Process not properly initialized: missing stdout")
545 reader = self._proc.stdout
546 try:
547 while True:
548 line = await reader.readline()
549 if not line: # EOF
550 break
551 text = line.decode(errors="replace")
552 LOGGER.debug(f"← stdio: {text.strip()}")
553 await self._pubsub.publish(text)
554 except ConnectionResetError: # pragma: no cover --subprocess terminated
555 # Subprocess terminated abruptly - this is expected behavior
556 LOGGER.debug("stdout pump: subprocess connection closed")
557 except Exception: # pragma: no cover --best-effort logging
558 LOGGER.exception("stdout pump crashed - terminating bridge")
559 raise
562# ---------------------------------------------------------------------------#
563# SSE Event Parser #
564# ---------------------------------------------------------------------------#
565class SSEEvent:
566 """Represents a Server-Sent Event with proper field parsing.
568 Attributes:
569 event: The event type (e.g., 'message', 'keepalive', 'endpoint')
570 data: The event data payload
571 event_id: Optional event ID
572 retry: Optional retry interval in milliseconds
573 """
575 def __init__(self, event: str = "message", data: str = "", event_id: Optional[str] = None, retry: Optional[int] = None):
576 """Initialize an SSE event.
578 Args:
579 event: Event type, defaults to "message"
580 data: Event data payload
581 event_id: Optional event ID
582 retry: Optional retry interval in milliseconds
583 """
584 self.event = event
585 self.data = data
586 self.event_id = event_id
587 self.retry = retry
589 @classmethod
590 def parse_sse_line(cls, line: str, current_event: Optional["SSEEvent"] = None) -> Tuple[Optional["SSEEvent"], bool]:
591 """Parse a single SSE line and update or create an event.
593 Args:
594 line: The SSE line to parse
595 current_event: The current event being built (if any)
597 Returns:
598 Tuple of (event, is_complete) where event is the SSEEvent object
599 and is_complete indicates if the event is ready to be processed
600 """
601 line = line.rstrip("\n\r")
603 # Empty line signals end of event
604 if not line:
605 if current_event and current_event.data:
606 return current_event, True
607 return None, False
609 # Comment line
610 if line.startswith(":"):
611 return current_event, False
613 # Parse field
614 if ":" in line:
615 field, value = line.split(":", 1)
616 value = value.lstrip(" ") # Remove leading space if present
617 else:
618 field = line
619 value = ""
621 # Create event if needed
622 if current_event is None:
623 current_event = cls()
625 # Update fields
626 if field == "event":
627 current_event.event = value
628 elif field == "data":
629 if current_event.data:
630 current_event.data += "\n" + value
631 else:
632 current_event.data = value
633 elif field == "id":
634 current_event.event_id = value
635 elif field == "retry":
636 try:
637 current_event.retry = int(value)
638 except ValueError:
639 pass # Ignore invalid retry values
641 return current_event, False
644# ---------------------------------------------------------------------------#
645# FastAPI app exposing /sse & /message #
646# ---------------------------------------------------------------------------#
649def _build_fastapi(
650 pubsub: _PubSub,
651 stdio: StdIOEndpoint,
652 keep_alive: float = KEEP_ALIVE_INTERVAL,
653 sse_path: str = "/sse",
654 message_path: str = "/message",
655 cors_origins: Optional[List[str]] = None,
656 header_mappings: Optional[NormalizedMappings] = None,
657) -> FastAPI:
658 """Build FastAPI application with SSE and message endpoints.
660 Creates a FastAPI app with SSE streaming endpoint and message posting
661 endpoint for bidirectional communication with the stdio subprocess.
663 Args:
664 pubsub: The publish/subscribe system for message routing.
665 stdio: The stdio endpoint for subprocess communication.
666 keep_alive: Interval in seconds for keepalive messages. Defaults to KEEP_ALIVE_INTERVAL.
667 sse_path: Path for the SSE endpoint. Defaults to "/sse".
668 message_path: Path for the message endpoint. Defaults to "/message".
669 cors_origins: Optional list of CORS allowed origins.
670 header_mappings: Optional mapping of HTTP headers to environment variables.
672 Returns:
673 FastAPI: The configured FastAPI application.
675 Examples:
676 >>> pubsub = _PubSub()
677 >>> stdio = StdIOEndpoint("cat", pubsub)
678 >>> app = _build_fastapi(pubsub, stdio)
679 >>> isinstance(app, FastAPI)
680 True
681 >>> "/sse" in [r.path for r in app.routes]
682 True
683 >>> "/message" in [r.path for r in app.routes]
684 True
685 >>> "/healthz" in [r.path for r in app.routes]
686 True
688 >>> # Test with custom paths
689 >>> app2 = _build_fastapi(pubsub, stdio, sse_path="/events", message_path="/send")
690 >>> "/events" in [r.path for r in app2.routes]
691 True
692 >>> "/send" in [r.path for r in app2.routes]
693 True
695 >>> # Test CORS middleware is added
696 >>> app3 = _build_fastapi(pubsub, stdio, cors_origins=["http://example.com"])
697 >>> # Check that middleware stack includes CORSMiddleware
698 >>> any("CORSMiddleware" in str(m) for m in app3.user_middleware)
699 True
700 """
701 # Standard
702 import time as _time # pylint: disable=import-outside-toplevel
704 app = FastAPI()
706 # Rate limiter: minimum interval between subprocess restarts (seconds)
707 restart_min_interval_secs = 30.0
708 last_restart_ts: dict[str, float] = {"t": 0.0}
710 def _restart_allowed() -> bool:
711 """Return whether enough time elapsed to allow a subprocess restart.
713 Returns:
714 bool: ``True`` when restart is allowed, otherwise ``False``.
715 """
716 now = _time.monotonic()
717 if now - last_restart_ts["t"] < restart_min_interval_secs:
718 return False
719 last_restart_ts["t"] = now
720 return True
722 # Add CORS middleware if origins specified
723 if cors_origins:
724 app.add_middleware(
725 CORSMiddleware,
726 allow_origins=cors_origins,
727 allow_credentials=True,
728 allow_methods=["*"],
729 allow_headers=["*"],
730 )
732 # ----- GET /sse ---------------------------------------------------------#
733 @app.get(sse_path)
734 async def get_sse(request: Request) -> EventSourceResponse: # noqa: D401
735 """Stream subprocess stdout to any number of SSE clients.
737 Args:
738 request (Request): The incoming ``GET`` request that will be
739 upgraded to a Server-Sent Events (SSE) stream.
741 Returns:
742 EventSourceResponse: A streaming response that forwards JSON-RPC
743 messages from the child process and emits periodic ``keepalive``
744 frames so that clients and proxies do not time out.
745 """
746 # Extract environment variables from headers if dynamic env is enabled
747 additional_env_vars = {}
748 if header_mappings:
749 request_headers = dict(request.headers)
750 additional_env_vars = extract_env_vars_from_headers(request_headers, header_mappings)
752 # Restart stdio endpoint with new environment variables (rate-limited)
753 if additional_env_vars and _restart_allowed():
754 LOGGER.info(f"Restarting stdio endpoint with {len(additional_env_vars)} environment variables")
755 await stdio.stop()
756 await stdio.start(additional_env_vars)
758 queue = pubsub.subscribe()
759 session_id = uuid.uuid4().hex
761 async def event_gen() -> AsyncIterator[Dict[str, Any]]:
762 """Generate Server-Sent Events for the SSE stream.
764 Yields SSE events in the following sequence:
765 1. An 'endpoint' event with the message posting URL (required by MCP spec)
766 2. An immediate 'keepalive' event to confirm the stream is active
767 3. 'message' events containing JSON-RPC responses from the subprocess
768 4. Periodic 'keepalive' events to prevent timeouts
770 The generator runs until the client disconnects or the server shuts down.
771 Automatically unsubscribes from the pubsub system on completion.
773 Yields:
774 Dict[str, Any]: SSE event dictionaries containing:
775 - event: The event type ('endpoint', 'message', or 'keepalive')
776 - data: The event payload (URL, JSON-RPC message, or empty object)
777 - retry: Retry interval in milliseconds for reconnection
779 Examples:
780 >>> import asyncio
781 >>> async def test_event_gen():
782 ... # This is tested indirectly through the SSE endpoint
783 ... return True
784 >>> asyncio.run(test_event_gen())
785 True
786 """
787 # 1️⃣ Mandatory "endpoint" bootstrap required by the MCP spec
788 endpoint_url = f"{str(request.base_url).rstrip('/')}{message_path}?session_id={session_id}"
789 yield {
790 "event": "endpoint",
791 "data": endpoint_url,
792 "retry": int(keep_alive * 1000),
793 }
795 # 2️⃣ Immediate keepalive so clients know the stream is alive (if enabled in config)
796 if DEFAULT_KEEPALIVE_ENABLED:
797 yield {"event": "keepalive", "data": "{}", "retry": keep_alive * 1000}
799 try:
800 while True:
801 if await request.is_disconnected():
802 break
804 try:
805 timeout = keep_alive if DEFAULT_KEEPALIVE_ENABLED else None
806 msg = await asyncio.wait_for(queue.get(), timeout)
807 yield {"event": "message", "data": msg.rstrip()}
808 except asyncio.TimeoutError:
809 if DEFAULT_KEEPALIVE_ENABLED:
810 yield {
811 "event": "keepalive",
812 "data": "{}",
813 "retry": keep_alive * 1000,
814 }
815 finally:
816 if pubsub:
817 pubsub.unsubscribe(queue)
819 return EventSourceResponse(
820 event_gen(),
821 headers={
822 "Cache-Control": "no-cache",
823 "Connection": "keep-alive",
824 "X-Accel-Buffering": "no", # disable proxy buffering
825 },
826 )
828 # ----- POST /message ----------------------------------------------------#
829 @app.post(message_path, status_code=status.HTTP_202_ACCEPTED)
830 async def post_message(raw: Request, session_id: str | None = None) -> Response: # noqa: D401
831 """Forward a raw JSON-RPC request to the stdio subprocess.
833 Args:
834 raw (Request): The incoming ``POST`` request whose body contains
835 a single JSON-RPC message.
836 session_id (str | None): The SSE session identifier that originated
837 this back-channel call (present when the client obtained the
838 endpoint URL from an ``endpoint`` bootstrap frame).
840 Returns:
841 Response: ``202 Accepted`` if the payload is forwarded successfully,
842 or ``400 Bad Request`` when the body is not valid JSON.
843 """
844 _ = session_id # Unused but required for API compatibility
846 # Extract environment variables from headers if dynamic env is enabled
847 additional_env_vars = {}
848 if header_mappings:
849 request_headers = dict(raw.headers)
850 additional_env_vars = extract_env_vars_from_headers(request_headers, header_mappings)
852 # Restart stdio endpoint with new environment variables (rate-limited)
853 if additional_env_vars and _restart_allowed():
854 LOGGER.info(f"Restarting stdio endpoint with {len(additional_env_vars)} environment variables")
855 await stdio.stop()
856 await stdio.start(additional_env_vars)
857 await asyncio.sleep(0.5) # Give process time to initialize
859 # Ensure stdio endpoint is running
860 if not stdio.is_running():
861 LOGGER.info("Starting stdio endpoint (was not running)")
862 await stdio.start()
863 await asyncio.sleep(0.5) # Give process time to initialize
865 payload = await raw.body()
866 try:
867 orjson.loads(payload) # validate
868 except Exception as exc: # noqa: BLE001
869 return PlainTextResponse(
870 f"Invalid JSON payload: {exc}",
871 status_code=status.HTTP_400_BAD_REQUEST,
872 )
873 await stdio.send(payload.decode().rstrip() + "\n")
874 return PlainTextResponse("forwarded", status_code=status.HTTP_202_ACCEPTED)
876 # ----- Liveness ---------------------------------------------------------#
877 @app.get("/healthz")
878 async def health() -> Response: # noqa: D401
879 """Health check endpoint.
881 Returns:
882 Response: A plain text response with "ok" status.
883 """
884 return PlainTextResponse("ok")
886 return app
889# ---------------------------------------------------------------------------#
890# CLI & orchestration #
891# ---------------------------------------------------------------------------#
894def _parse_args(argv: Sequence[str]) -> argparse.Namespace:
895 """Parse command line arguments.
897 Validates mutually exclusive source options and sets defaults for
898 port and logging configuration.
900 Args:
901 argv: Sequence of command line arguments.
903 Returns:
904 argparse.Namespace: Parsed command line arguments.
906 Raises:
907 NotImplementedError: If streamableHttp option is specified.
909 Examples:
910 >>> args = _parse_args(["--stdio", "cat", "--port", "9000"])
911 >>> args.stdio
912 'cat'
913 >>> args.port
914 9000
915 >>> args.logLevel
916 'info'
917 >>> args.host
918 '127.0.0.1'
919 >>> args.cors is None
920 True
921 >>> args.oauth2Bearer is None
922 True
924 >>> # Test default parameters
925 >>> args = _parse_args(["--stdio", "cat"])
926 >>> args.port
927 8000
928 >>> args.host
929 '127.0.0.1'
930 >>> args.logLevel
931 'info'
933 >>> # Test connect-sse mode
934 >>> args = _parse_args(["--connect-sse", "http://example.com/sse"])
935 >>> args.connect_sse
936 'http://example.com/sse'
937 >>> args.stdio is None
938 True
940 >>> # Test CORS configuration
941 >>> args = _parse_args(["--stdio", "cat", "--cors", "https://app.com", "https://web.com"])
942 >>> args.cors
943 ['https://app.com', 'https://web.com']
945 >>> # Test OAuth2 Bearer token
946 >>> args = _parse_args(["--connect-sse", "http://example.com", "--oauth2Bearer", "token123"])
947 >>> args.oauth2Bearer
948 'token123'
950 >>> # Test custom host and log level
951 >>> args = _parse_args(["--stdio", "cat", "--host", "0.0.0.0", "--logLevel", "debug"])
952 >>> args.host
953 '0.0.0.0'
954 >>> args.logLevel
955 'debug'
957 >>> # Test expose protocols
958 >>> args = _parse_args(["--stdio", "uvx mcp-server-git", "--expose-sse", "--expose-streamable-http"])
959 >>> args.stdio
960 'uvx mcp-server-git'
961 >>> args.expose_sse
962 True
963 >>> args.expose_streamable_http
964 True
965 >>> args.stateless
966 False
967 >>> args.jsonResponse
968 False
970 >>> # Test new parameters
971 >>> args = _parse_args(["--stdio", "cat", "--ssePath", "/events", "--messagePath", "/send", "--keepAlive", "60"])
972 >>> args.ssePath
973 '/events'
974 >>> args.messagePath
975 '/send'
976 >>> args.keepAlive
977 60
979 >>> # Test connect-sse with stdio command
980 >>> args = _parse_args(["--connect-sse", "http://example.com/sse", "--stdioCommand", "uvx mcp-server-git"])
981 >>> args.stdioCommand
982 'uvx mcp-server-git'
984 >>> # Test connect-sse without stdio command (allowed)
985 >>> args = _parse_args(["--connect-sse", "http://example.com/sse"])
986 >>> args.stdioCommand is None
987 True
988 """
989 p = argparse.ArgumentParser(
990 prog="mcpgateway.translate",
991 description="Bridges between different MCP transport protocols: stdio, SSE, and streamable HTTP.",
992 )
994 # Source/destination options
995 p.add_argument("--stdio", help='Local command to run, e.g. "uvx mcp-server-git"')
996 p.add_argument("--connect-sse", dest="connect_sse", help="Connect to remote SSE endpoint URL")
997 p.add_argument("--connect-streamable-http", dest="connect_streamable_http", help="Connect to remote streamable HTTP endpoint URL")
998 p.add_argument("--grpc", type=str, help="gRPC server target (host:port) to expose")
999 p.add_argument("--connect-grpc", type=str, help="Remote gRPC endpoint to connect to")
1001 # Protocol exposure options (can be combined)
1002 p.add_argument("--expose-sse", action="store_true", help="Expose via SSE protocol (endpoints: /sse and /message)")
1003 p.add_argument("--expose-streamable-http", action="store_true", help="Expose via streamable HTTP protocol (endpoint: /mcp)")
1005 # gRPC configuration options
1006 p.add_argument("--grpc-tls", action="store_true", help="Enable TLS for gRPC connection")
1007 p.add_argument("--grpc-cert", type=str, help="Path to TLS certificate for gRPC")
1008 p.add_argument("--grpc-key", type=str, help="Path to TLS key for gRPC")
1009 p.add_argument("--grpc-metadata", action="append", help="gRPC metadata (KEY=VALUE, repeatable)")
1011 p.add_argument("--port", type=int, default=8000, help="HTTP port to bind")
1012 p.add_argument("--host", default="127.0.0.1", help="Host interface to bind (default: 127.0.0.1)")
1013 p.add_argument(
1014 "--logLevel",
1015 default="info",
1016 choices=["debug", "info", "warning", "error", "critical"],
1017 help="Log level",
1018 )
1019 p.add_argument(
1020 "--cors",
1021 nargs="*",
1022 help="CORS allowed origins (e.g., --cors https://app.example.com)",
1023 )
1024 p.add_argument(
1025 "--oauth2Bearer",
1026 help="OAuth2 Bearer token for authentication",
1027 )
1029 # New configuration options
1030 p.add_argument(
1031 "--ssePath",
1032 default="/sse",
1033 help="SSE endpoint path (default: /sse)",
1034 )
1035 p.add_argument(
1036 "--messagePath",
1037 default="/message",
1038 help="Message endpoint path (default: /message)",
1039 )
1040 p.add_argument(
1041 "--keepAlive",
1042 type=int,
1043 default=KEEP_ALIVE_INTERVAL,
1044 help=f"Keep-alive interval in seconds (default: {KEEP_ALIVE_INTERVAL})",
1045 )
1047 # For SSE to stdio mode
1048 p.add_argument(
1049 "--stdioCommand",
1050 help="Command to run when bridging SSE/streamableHttp to stdio (optional with --sse or --streamableHttp)",
1051 )
1053 # Dynamic environment variable injection
1054 p.add_argument("--enable-dynamic-env", action="store_true", help="Enable dynamic environment variable injection from HTTP headers")
1055 p.add_argument(
1056 "--header-to-env",
1057 action="append",
1058 default=[],
1059 help="Map HTTP header to environment variable (format: HEADER=ENV_VAR, can be used multiple times). Case-insensitive duplicates are rejected (e.g., Authorization and authorization cannot both be mapped).",
1060 )
1062 # For streamable HTTP mode
1063 p.add_argument(
1064 "--stateless",
1065 action="store_true",
1066 help="Use stateless mode for streamable HTTP (default: False)",
1067 )
1068 p.add_argument(
1069 "--jsonResponse",
1070 action="store_true",
1071 help="Return JSON responses instead of SSE streams for streamable HTTP (default: False)",
1072 )
1074 args = p.parse_args(argv)
1075 # streamableHttp is now supported, no need to raise NotImplementedError
1076 return args
1079async def _run_stdio_to_sse(
1080 cmd: str,
1081 port: int,
1082 log_level: str = "info",
1083 cors: Optional[List[str]] = None,
1084 host: str = "127.0.0.1",
1085 sse_path: str = "/sse",
1086 message_path: str = "/message",
1087 keep_alive: float = KEEP_ALIVE_INTERVAL,
1088 header_mappings: Optional[NormalizedMappings] = None,
1089) -> None:
1090 """Run stdio to SSE bridge.
1092 Starts a subprocess and exposes it via HTTP/SSE endpoints. Handles graceful
1093 shutdown on SIGINT/SIGTERM.
1095 Args:
1096 cmd: The command to run as a stdio subprocess.
1097 port: The port to bind the HTTP server to.
1098 log_level: The logging level to use. Defaults to "info".
1099 cors: Optional list of CORS allowed origins.
1100 host: The host interface to bind to. Defaults to "127.0.0.1" for security.
1101 sse_path: Path for the SSE endpoint. Defaults to "/sse".
1102 message_path: Path for the message endpoint. Defaults to "/message".
1103 keep_alive: Keep-alive interval in seconds. Defaults to KEEP_ALIVE_INTERVAL.
1104 header_mappings: Optional mapping of HTTP headers to environment variables.
1106 Examples:
1107 >>> import asyncio # doctest: +SKIP
1108 >>> async def test_run(): # doctest: +SKIP
1109 ... await _run_stdio_to_sse("cat", 9000) # doctest: +SKIP
1110 ... return True
1111 >>> asyncio.run(test_run()) # doctest: +SKIP
1112 True
1113 """
1114 pubsub = _PubSub()
1115 stdio = StdIOEndpoint(cmd, pubsub, header_mappings=header_mappings)
1116 await stdio.start()
1118 app = _build_fastapi(pubsub, stdio, keep_alive=keep_alive, sse_path=sse_path, message_path=message_path, cors_origins=cors, header_mappings=header_mappings)
1119 config = uvicorn.Config(
1120 app,
1121 host=host, # Changed from hardcoded "0.0.0.0"
1122 port=port,
1123 log_level=log_level,
1124 lifespan="off",
1125 )
1126 uvicorn_server = uvicorn.Server(config)
1128 shutting_down = asyncio.Event() # 🔄 make shutdown idempotent
1130 async def _shutdown() -> None:
1131 """Handle graceful shutdown of the stdio bridge.
1133 Performs shutdown operations in the correct order:
1134 1. Sets a flag to prevent multiple shutdown attempts
1135 2. Stops the stdio subprocess
1136 3. Shuts down the HTTP server
1138 This function is idempotent - multiple calls will only execute
1139 the shutdown sequence once.
1141 Examples:
1142 >>> import asyncio
1143 >>> async def test_shutdown():
1144 ... # Shutdown is tested as part of the main run flow
1145 ... return True
1146 >>> asyncio.run(test_shutdown())
1147 True
1148 """
1149 if shutting_down.is_set():
1150 return
1151 shutting_down.set()
1152 LOGGER.info("Shutting down ...")
1153 await stdio.stop()
1154 # Graceful shutdown by setting the shutdown event
1155 # Use getattr to safely access should_exit attribute
1156 setattr(uvicorn_server, "should_exit", getattr(uvicorn_server, "should_exit", False) or True)
1158 loop = asyncio.get_running_loop()
1159 for sig in (signal.SIGINT, signal.SIGTERM):
1160 with suppress(NotImplementedError): # Windows lacks add_signal_handler
1161 loop.add_signal_handler(sig, lambda *_: asyncio.create_task(_shutdown()))
1163 LOGGER.info(f"Bridge ready → http://{host}:{port}{sse_path}")
1164 await uvicorn_server.serve()
1165 await _shutdown() # final cleanup
1168async def _run_sse_to_stdio(url: str, oauth2_bearer: Optional[str] = None, timeout: float = 30.0, stdio_command: Optional[str] = None, max_retries: int = 5, initial_retry_delay: float = 1.0) -> None:
1169 """Run SSE to stdio bridge.
1171 Connects to a remote SSE endpoint and bridges it to local stdio.
1172 Implements proper bidirectional message flow with error handling and retries.
1174 Args:
1175 url: The SSE endpoint URL to connect to.
1176 oauth2_bearer: Optional OAuth2 bearer token for authentication. Defaults to None.
1177 timeout: HTTP client timeout in seconds. Defaults to 30.0.
1178 stdio_command: Optional command to run for local stdio processing.
1179 If not provided, will simply print SSE messages to stdout.
1180 max_retries: Maximum number of connection retry attempts. Defaults to 5.
1181 initial_retry_delay: Initial delay between retries in seconds. Defaults to 1.0.
1183 Raises:
1184 ImportError: If httpx package is not available.
1185 RuntimeError: If the subprocess fails to create stdin/stdout pipes.
1186 Exception: For any unexpected error in SSE stream processing.
1188 Examples:
1189 >>> import asyncio
1190 >>> async def test_sse():
1191 ... try:
1192 ... await _run_sse_to_stdio("http://example.com/sse", None) # doctest: +SKIP
1193 ... except ImportError as e:
1194 ... return "httpx" in str(e)
1195 >>> asyncio.run(test_sse()) # Would return True if httpx not installed # doctest: +SKIP
1196 """
1197 if not httpx:
1198 raise ImportError("httpx package is required for SSE to stdio bridging")
1200 headers = {}
1201 if oauth2_bearer:
1202 headers["Authorization"] = f"Bearer {oauth2_bearer}"
1204 # If no stdio command provided, use simple mode (just print to stdout)
1205 if not stdio_command:
1206 LOGGER.warning("No --stdioCommand provided, running in simple mode (SSE to stdout only)")
1207 # First-Party
1208 from mcpgateway.services.http_client_service import get_isolated_http_client # pylint: disable=import-outside-toplevel
1210 async with get_isolated_http_client(timeout=timeout, headers=headers, verify=DEFAULT_SSL_VERIFY, connect_timeout=10.0, write_timeout=timeout, pool_timeout=timeout) as client:
1211 await _simple_sse_pump(client, url, max_retries, initial_retry_delay)
1212 return
1214 # Start the stdio subprocess
1215 process = await asyncio.create_subprocess_exec(
1216 *shlex.split(stdio_command),
1217 stdin=asyncio.subprocess.PIPE,
1218 stdout=asyncio.subprocess.PIPE,
1219 stderr=sys.stderr,
1220 )
1222 if not process.stdin or not process.stdout:
1223 raise RuntimeError(f"Failed to create subprocess with stdin/stdout pipes for command: {stdio_command}")
1225 # Store the message endpoint URL once received
1226 message_endpoint: Optional[str] = None
1228 async def read_stdout(client: httpx.AsyncClient) -> None:
1229 """Read lines from subprocess stdout and POST to message endpoint.
1231 Continuously reads JSON-RPC requests from the subprocess stdout
1232 and POSTs them to the remote message endpoint obtained from the
1233 SSE stream's endpoint event.
1235 Args:
1236 client: The HTTP client to use for POSTing messages.
1238 Raises:
1239 RuntimeError: If the process stdout stream is not available.
1241 Examples:
1242 >>> import asyncio
1243 >>> async def test_read():
1244 ... # This is tested as part of the SSE to stdio flow
1245 ... return True
1246 >>> asyncio.run(test_read())
1247 True
1248 """
1249 if not process.stdout:
1250 raise RuntimeError("Process stdout not available")
1252 while True:
1253 if not process.stdout:
1254 raise RuntimeError("Process stdout not available")
1255 line = await process.stdout.readline()
1256 if not line:
1257 break
1259 text = line.decode().strip()
1260 if not text:
1261 continue
1263 LOGGER.debug(f"← stdio: {text}")
1265 # Wait for endpoint URL if not yet received
1266 retry_count = 0
1267 while not message_endpoint and retry_count < 30: # 30 second timeout
1268 await asyncio.sleep(1)
1269 retry_count += 1
1271 if not message_endpoint:
1272 LOGGER.error("No message endpoint received from SSE stream")
1273 continue
1275 # POST the JSON-RPC request to the message endpoint
1276 try:
1277 response = await client.post(message_endpoint, content=text, headers={"Content-Type": "application/json"})
1278 if response.status_code != 202:
1279 LOGGER.warning(f"Message endpoint returned {response.status_code}: {response.text}")
1280 except Exception as e:
1281 LOGGER.error(f"Failed to POST to message endpoint: {e}")
1283 async def pump_sse_to_stdio(client: httpx.AsyncClient) -> None:
1284 """Stream SSE data from remote endpoint to subprocess stdin.
1286 Connects to the remote SSE endpoint with retry logic and forwards
1287 message events to the subprocess stdin. Properly parses SSE events
1288 and handles endpoint, message, and keepalive event types.
1290 Args:
1291 client: The HTTP client to use for SSE streaming.
1293 Raises:
1294 HTTPStatusError: If the SSE endpoint returns a non-200 status code.
1295 Exception: For unexpected errors in SSE stream processing.
1297 Examples:
1298 >>> import asyncio
1299 >>> async def test_pump():
1300 ... # This is tested as part of the SSE to stdio flow
1301 ... return True
1302 >>> asyncio.run(test_pump())
1303 True
1304 """
1305 nonlocal message_endpoint
1306 retry_delay = initial_retry_delay
1307 retry_count = 0
1309 while retry_count < max_retries:
1310 try:
1311 LOGGER.info(f"Connecting to SSE endpoint: {url}")
1313 async with client.stream("GET", url) as response:
1314 # Check status code if available (real httpx response)
1315 if hasattr(response, "status_code") and response.status_code != 200:
1316 if httpx:
1317 raise httpx.HTTPStatusError(f"SSE endpoint returned {response.status_code}", request=response.request, response=response)
1318 raise Exception(f"SSE endpoint returned {response.status_code}")
1320 # Reset retry counter on successful connection
1321 retry_count = 0
1322 retry_delay = initial_retry_delay
1323 current_event: Optional[SSEEvent] = None
1325 async for line in response.aiter_lines():
1326 event, is_complete = SSEEvent.parse_sse_line(line, current_event)
1327 current_event = event
1329 if is_complete and current_event:
1330 LOGGER.debug(f"SSE event: {current_event.event} - {current_event.data[:100]}...")
1332 if current_event.event == "endpoint":
1333 # Store the message endpoint URL
1334 message_endpoint = current_event.data
1335 LOGGER.info(f"Received message endpoint: {message_endpoint}")
1337 elif current_event.event == "message":
1338 # Forward JSON-RPC responses to stdio
1339 if process.stdin:
1340 process.stdin.write((current_event.data + "\n").encode())
1341 await process.stdin.drain()
1342 LOGGER.debug(f"→ stdio: {current_event.data}")
1344 elif current_event.event == "keepalive":
1345 # Log keepalive but don't forward
1346 LOGGER.debug("Received keepalive")
1348 # Reset for next event
1349 current_event = None
1351 except Exception as e:
1352 # Check if it's one of the expected httpx exceptions
1353 if httpx and isinstance(e, (httpx.ConnectError, httpx.HTTPStatusError, httpx.ReadTimeout)):
1354 retry_count += 1
1355 if retry_count >= max_retries:
1356 LOGGER.error(f"Max retries ({max_retries}) exceeded. Giving up.")
1357 raise
1359 LOGGER.warning(f"Connection error: {e}. Retrying in {retry_delay}s... (attempt {retry_count}/{max_retries})")
1360 await asyncio.sleep(retry_delay)
1361 retry_delay = min(retry_delay * 2, 30) # Exponential backoff, max 30s
1362 else:
1363 LOGGER.error(f"Unexpected error in SSE stream: {e}")
1364 raise
1366 # Run both tasks concurrently
1367 # First-Party
1368 from mcpgateway.services.http_client_service import get_isolated_http_client # pylint: disable=import-outside-toplevel
1370 async with get_isolated_http_client(timeout=timeout, headers=headers, verify=DEFAULT_SSL_VERIFY, connect_timeout=10.0, write_timeout=timeout, pool_timeout=timeout) as client:
1371 try:
1372 await asyncio.gather(read_stdout(client), pump_sse_to_stdio(client))
1373 except Exception as e:
1374 LOGGER.error(f"Bridge error: {e}")
1375 raise
1376 finally:
1377 # Clean up subprocess
1378 if process.returncode is None:
1379 process.terminate()
1380 with suppress(asyncio.TimeoutError):
1381 await asyncio.wait_for(process.wait(), timeout=5)
1384async def _run_stdio_to_streamable_http(
1385 cmd: str,
1386 port: int,
1387 log_level: str = "info",
1388 cors: Optional[List[str]] = None,
1389 host: str = "127.0.0.1",
1390 stateless: bool = False,
1391 json_response: bool = False,
1392) -> None:
1393 """Run stdio to streamable HTTP bridge.
1395 Starts a subprocess and exposes it via streamable HTTP endpoint. Handles graceful
1396 shutdown on SIGINT/SIGTERM.
1398 Args:
1399 cmd: The command to run as a stdio subprocess.
1400 port: The port to bind the HTTP server to.
1401 log_level: The logging level to use. Defaults to "info".
1402 cors: Optional list of CORS allowed origins.
1403 host: The host interface to bind to. Defaults to "127.0.0.1" for security.
1404 stateless: Whether to use stateless mode for streamable HTTP. Defaults to False.
1405 json_response: Whether to return JSON responses instead of SSE streams. Defaults to False.
1407 Raises:
1408 ImportError: If MCP server components are not available.
1409 RuntimeError: If subprocess fails to create stdin/stdout pipes.
1411 Examples:
1412 >>> import asyncio
1413 >>> async def test_streamable_http():
1414 ... # Would start a real subprocess and HTTP server
1415 ... cmd = "echo hello"
1416 ... port = 9000
1417 ... # This would normally run the server
1418 ... return True
1419 >>> asyncio.run(test_streamable_http())
1420 True
1421 """
1422 # MCP components are available, proceed with setup
1424 LOGGER.info(f"Starting stdio to streamable HTTP bridge for command: {cmd}")
1426 # Create a simple MCP server that will proxy to stdio subprocess
1427 mcp_server = MCPServer(name="stdio-proxy")
1429 # Create subprocess for stdio communication
1430 process = await asyncio.create_subprocess_exec(
1431 *shlex.split(cmd),
1432 stdin=asyncio.subprocess.PIPE,
1433 stdout=asyncio.subprocess.PIPE,
1434 stderr=sys.stderr,
1435 )
1437 if not process.stdin or not process.stdout:
1438 raise RuntimeError(f"Failed to create subprocess with stdin/stdout pipes for command: {cmd}")
1440 # Set up the streamable HTTP session manager with the server
1441 session_manager = StreamableHTTPSessionManager(
1442 app=mcp_server,
1443 stateless=stateless,
1444 json_response=json_response,
1445 )
1447 # Create Starlette app to host the streamable HTTP endpoint
1448 async def handle_mcp(request: Request) -> None:
1449 """Handle MCP requests via streamable HTTP.
1451 Args:
1452 request: The incoming HTTP request from Starlette.
1454 Examples:
1455 >>> async def test_handle():
1456 ... # Mock request handling
1457 ... class MockRequest:
1458 ... scope = {"type": "http"}
1459 ... async def receive(self): return {}
1460 ... async def send(self, msg): return None
1461 ... req = MockRequest()
1462 ... # Would handle the request via session manager
1463 ... return req is not None
1464 >>> import asyncio
1465 >>> asyncio.run(test_handle())
1466 True
1467 """
1468 # The session manager handles all the protocol details - Note: I don't like accessing _send directly -JPS
1469 await session_manager.handle_request(request.scope, request.receive, request._send) # pylint: disable=W0212
1471 routes = [
1472 Route("/mcp", handle_mcp, methods=["GET", "POST"]),
1473 Route("/healthz", lambda request: PlainTextResponse("ok"), methods=["GET"]),
1474 ]
1476 app = Starlette(routes=routes)
1478 # Add CORS middleware if specified
1479 if cors:
1480 app.add_middleware(
1481 CORSMiddleware,
1482 allow_origins=cors,
1483 allow_credentials=True,
1484 allow_methods=["*"],
1485 allow_headers=["*"],
1486 )
1488 # Run the server with Uvicorn
1489 config = uvicorn.Config(
1490 app,
1491 host=host,
1492 port=port,
1493 log_level=log_level,
1494 lifespan="off",
1495 )
1496 uvicorn_server = uvicorn.Server(config)
1498 shutting_down = asyncio.Event()
1500 async def _shutdown() -> None:
1501 """Handle graceful shutdown of the streamable HTTP bridge."""
1502 if shutting_down.is_set():
1503 return
1504 shutting_down.set()
1505 LOGGER.info("Shutting down streamable HTTP bridge...")
1506 if process.returncode is None:
1507 process.terminate()
1508 with suppress(asyncio.TimeoutError):
1509 await asyncio.wait_for(process.wait(), 5)
1510 # Graceful shutdown by setting the shutdown event
1511 # Use getattr to safely access should_exit attribute
1512 setattr(uvicorn_server, "should_exit", getattr(uvicorn_server, "should_exit", False) or True)
1514 loop = asyncio.get_running_loop()
1515 for sig in (signal.SIGINT, signal.SIGTERM):
1516 with suppress(NotImplementedError): # Windows lacks add_signal_handler
1517 loop.add_signal_handler(sig, lambda *_: asyncio.create_task(_shutdown()))
1519 # Pump messages between stdio and HTTP
1520 async def pump_stdio_to_http() -> None:
1521 """Forward messages from subprocess stdout to HTTP responses.
1523 Examples:
1524 >>> async def test():
1525 ... # This would pump messages in real usage
1526 ... return True
1527 >>> import asyncio
1528 >>> asyncio.run(test())
1529 True
1530 """
1531 while True:
1532 try:
1533 if not process.stdout:
1534 raise RuntimeError("Process stdout not available")
1535 line = await process.stdout.readline()
1536 if not line:
1537 break
1538 # The session manager will handle routing to appropriate HTTP responses
1539 # This would need proper integration with session_manager's internal queue
1540 LOGGER.debug(f"Received from subprocess: {line.decode().strip()}")
1541 except Exception as e:
1542 LOGGER.error(f"Error reading from subprocess: {e}")
1543 break
1545 async def pump_http_to_stdio(data: str) -> None:
1546 """Forward HTTP requests to subprocess stdin.
1548 Args:
1549 data: The data string to send to subprocess stdin.
1551 Examples:
1552 >>> async def test_pump():
1553 ... # Would pump data to subprocess
1554 ... data = '{"method": "test"}'
1555 ... # In real use, would write to process.stdin
1556 ... return len(data) > 0
1557 >>> import asyncio
1558 >>> asyncio.run(test_pump())
1559 True
1560 """
1561 if not process.stdin:
1562 raise RuntimeError("Process stdin not available")
1563 process.stdin.write(data.encode() + b"\n")
1564 await process.stdin.drain()
1566 # Note: pump_http_to_stdio will be used when stdio-to-HTTP bridge is fully implemented
1567 _ = pump_http_to_stdio
1569 # Start the pump task
1570 pump_task = asyncio.create_task(pump_stdio_to_http())
1572 try:
1573 LOGGER.info(f"Streamable HTTP bridge ready → http://{host}:{port}/mcp")
1574 await uvicorn_server.serve()
1575 finally:
1576 pump_task.cancel()
1577 await _shutdown()
1580async def _run_streamable_http_to_stdio(
1581 url: str,
1582 oauth2_bearer: Optional[str] = None,
1583 timeout: float = 30.0,
1584 stdio_command: Optional[str] = None,
1585 max_retries: int = 5,
1586 initial_retry_delay: float = 1.0,
1587) -> None:
1588 """Run streamable HTTP to stdio bridge.
1590 Connects to a remote streamable HTTP endpoint and bridges it to local stdio.
1591 Implements proper bidirectional message flow with error handling and retries.
1593 Args:
1594 url: The streamable HTTP endpoint URL to connect to.
1595 oauth2_bearer: Optional OAuth2 bearer token for authentication. Defaults to None.
1596 timeout: HTTP client timeout in seconds. Defaults to 30.0.
1597 stdio_command: Optional command to run for local stdio processing.
1598 If not provided, will simply print messages to stdout.
1599 max_retries: Maximum number of connection retry attempts. Defaults to 5.
1600 initial_retry_delay: Initial delay between retries in seconds. Defaults to 1.0.
1602 Raises:
1603 ImportError: If httpx package is not available.
1604 RuntimeError: If the subprocess fails to create stdin/stdout pipes.
1605 Exception: For any unexpected error during bridging operations.
1606 """
1607 if not httpx:
1608 raise ImportError("httpx package is required for streamable HTTP to stdio bridging")
1610 headers = {}
1611 if oauth2_bearer:
1612 headers["Authorization"] = f"Bearer {oauth2_bearer}"
1614 # Ensure URL ends with /mcp if not already
1615 if not url.endswith("/mcp"):
1616 url = url.rstrip("/") + "/mcp"
1618 # If no stdio command provided, use simple mode (just print to stdout)
1619 if not stdio_command:
1620 LOGGER.warning("No --stdioCommand provided, running in simple mode (streamable HTTP to stdout only)")
1621 # First-Party
1622 from mcpgateway.services.http_client_service import get_isolated_http_client # pylint: disable=import-outside-toplevel
1624 async with get_isolated_http_client(timeout=timeout, headers=headers, verify=DEFAULT_SSL_VERIFY, connect_timeout=10.0, write_timeout=timeout, pool_timeout=timeout) as client:
1625 await _simple_streamable_http_pump(client, url, max_retries, initial_retry_delay)
1626 return
1628 # Start the stdio subprocess
1629 process = await asyncio.create_subprocess_exec(
1630 *shlex.split(stdio_command),
1631 stdin=asyncio.subprocess.PIPE,
1632 stdout=asyncio.subprocess.PIPE,
1633 stderr=sys.stderr,
1634 )
1636 if not process.stdin or not process.stdout:
1637 raise RuntimeError(f"Failed to create subprocess with stdin/stdout pipes for command: {stdio_command}")
1639 async def read_stdout(client: httpx.AsyncClient) -> None:
1640 """Read lines from subprocess stdout and POST to streamable HTTP endpoint.
1642 Args:
1643 client: The HTTP client to use for POSTing messages.
1645 Raises:
1646 RuntimeError: If the process stdout stream is not available.
1647 """
1648 if not process.stdout:
1649 raise RuntimeError("Process stdout not available")
1651 while True:
1652 if not process.stdout:
1653 raise RuntimeError("Process stdout not available")
1654 line = await process.stdout.readline()
1655 if not line:
1656 break
1658 text = line.decode().strip()
1659 if not text:
1660 continue
1662 LOGGER.debug(f"← stdio: {text}")
1664 # POST the JSON-RPC request to the streamable HTTP endpoint
1665 try:
1666 if CONTENT_TYPE == "application/x-www-form-urlencoded":
1667 # If text is JSON, parse and encode as form
1668 try:
1669 payload = orjson.loads(text)
1670 body = urlencode(payload)
1671 except Exception:
1672 body = text
1673 response = await client.post(url, content=body, headers=headers)
1674 else:
1675 response = await client.post(url, content=text, headers=headers)
1676 if response.status_code == 200:
1677 # Handle JSON response
1678 response_data = response.text
1679 if response_data and process.stdin:
1680 process.stdin.write((response_data + "\n").encode())
1681 await process.stdin.drain()
1682 LOGGER.debug(f"→ stdio: {response_data}")
1683 else:
1684 LOGGER.warning(f"Streamable HTTP endpoint returned {response.status_code}: {response.text}")
1685 except Exception as e:
1686 LOGGER.error(f"Failed to POST to streamable HTTP endpoint: {e}")
1688 async def pump_streamable_http_to_stdio(client: httpx.AsyncClient) -> None:
1689 """Stream data from remote streamable HTTP endpoint to subprocess stdin.
1691 Args:
1692 client: The HTTP client to use for streamable HTTP streaming.
1694 Raises:
1695 httpx.HTTPStatusError: If the streamable HTTP endpoint returns a non-200 status code.
1696 Exception: For unexpected errors in streamable HTTP stream processing.
1697 """
1698 retry_delay = initial_retry_delay
1699 retry_count = 0
1701 while retry_count < max_retries:
1702 try:
1703 LOGGER.info(f"Connecting to streamable HTTP endpoint: {url}")
1705 # For streamable HTTP, we need to handle both SSE streams and JSON responses
1706 # Try SSE first (for stateful sessions or when SSE is preferred)
1707 async with client.stream("GET", url, headers={"Accept": "text/event-stream"}) as response:
1708 if response.status_code != 200:
1709 if httpx:
1710 raise httpx.HTTPStatusError(f"Streamable HTTP endpoint returned {response.status_code}", request=response.request, response=response)
1711 raise Exception(f"Streamable HTTP endpoint returned {response.status_code}")
1713 # Reset retry counter on successful connection
1714 retry_count = 0
1715 retry_delay = initial_retry_delay
1717 async for line in response.aiter_lines():
1718 if line.startswith("data:"):
1719 data = line[5:]
1720 if data.startswith(" "):
1721 data = data[1:]
1722 if data and process.stdin:
1723 process.stdin.write((data + "\n").encode())
1724 await process.stdin.drain()
1725 LOGGER.debug(f"→ stdio: {data}")
1727 except Exception as e:
1728 if httpx and isinstance(e, (httpx.ConnectError, httpx.HTTPStatusError, httpx.ReadTimeout)):
1729 retry_count += 1
1730 if retry_count >= max_retries:
1731 LOGGER.error(f"Max retries ({max_retries}) exceeded. Giving up.")
1732 raise
1734 LOGGER.warning(f"Connection error: {e}. Retrying in {retry_delay}s... (attempt {retry_count}/{max_retries})")
1735 await asyncio.sleep(retry_delay)
1736 retry_delay = min(retry_delay * 2, 30) # Exponential backoff, max 30s
1737 else:
1738 LOGGER.error(f"Unexpected error in streamable HTTP stream: {e}")
1739 raise
1741 # Run both tasks concurrently
1742 # First-Party
1743 from mcpgateway.services.http_client_service import get_isolated_http_client # pylint: disable=import-outside-toplevel
1745 async with get_isolated_http_client(timeout=timeout, headers=headers, verify=DEFAULT_SSL_VERIFY, connect_timeout=10.0, write_timeout=timeout, pool_timeout=timeout) as client:
1746 try:
1747 await asyncio.gather(read_stdout(client), pump_streamable_http_to_stdio(client))
1748 except Exception as e:
1749 LOGGER.error(f"Bridge error: {e}")
1750 raise
1751 finally:
1752 # Clean up subprocess
1753 if process.returncode is None:
1754 process.terminate()
1755 with suppress(asyncio.TimeoutError):
1756 await asyncio.wait_for(process.wait(), timeout=5)
1759async def _simple_streamable_http_pump(client: "Any", url: str, max_retries: int, initial_retry_delay: float) -> None:
1760 """Simple streamable HTTP pump that just prints messages to stdout.
1762 Used when no stdio command is provided to bridge streamable HTTP to stdout directly.
1764 Args:
1765 client: The HTTP client to use for streamable HTTP streaming.
1766 url: The streamable HTTP endpoint URL to connect to.
1767 max_retries: Maximum number of connection retry attempts.
1768 initial_retry_delay: Initial delay between retries in seconds.
1770 Raises:
1771 Exception: For unexpected errors in streamable HTTP stream processing including
1772 HTTPStatusError if the endpoint returns a non-200 status code.
1773 """
1774 retry_delay = initial_retry_delay
1775 retry_count = 0
1777 while retry_count < max_retries:
1778 try:
1779 LOGGER.info(f"Connecting to streamable HTTP endpoint: {url}")
1781 # Try to get SSE stream
1782 async with client.stream("GET", url, headers={"Accept": "text/event-stream"}) as response:
1783 if response.status_code != 200:
1784 if httpx:
1785 raise httpx.HTTPStatusError(f"Streamable HTTP endpoint returned {response.status_code}", request=response.request, response=response)
1786 raise Exception(f"Streamable HTTP endpoint returned {response.status_code}")
1788 # Reset retry counter on successful connection
1789 retry_count = 0
1790 retry_delay = initial_retry_delay
1792 async for line in response.aiter_lines():
1793 if line.startswith("data:"):
1794 data = line[5:]
1795 if data.startswith(" "):
1796 data = data[1:]
1797 if data:
1798 print(data)
1799 LOGGER.debug(f"Received: {data}")
1801 except Exception as e:
1802 if httpx and isinstance(e, (httpx.ConnectError, httpx.HTTPStatusError, httpx.ReadTimeout)):
1803 retry_count += 1
1804 if retry_count >= max_retries:
1805 LOGGER.error(f"Max retries ({max_retries}) exceeded. Giving up.")
1806 raise
1808 LOGGER.warning(f"Connection error: {e}. Retrying in {retry_delay}s... (attempt {retry_count}/{max_retries})")
1809 await asyncio.sleep(retry_delay)
1810 retry_delay = min(retry_delay * 2, 30) # Exponential backoff, max 30s
1811 else:
1812 LOGGER.error(f"Unexpected error in streamable HTTP stream: {e}")
1813 raise
1816async def _run_multi_protocol_server( # pylint: disable=too-many-positional-arguments
1817 cmd: str,
1818 port: int,
1819 log_level: str = "info",
1820 cors: Optional[List[str]] = None,
1821 host: str = "127.0.0.1",
1822 expose_sse: bool = False,
1823 expose_streamable_http: bool = False,
1824 sse_path: str = "/sse",
1825 message_path: str = "/message",
1826 keep_alive: float = KEEP_ALIVE_INTERVAL,
1827 stateless: bool = False,
1828 json_response: bool = False,
1829 header_mappings: Optional[NormalizedMappings] = None,
1830) -> None:
1831 """Run a stdio server and expose it via multiple protocols simultaneously.
1833 Args:
1834 cmd: The command to run as a stdio subprocess.
1835 port: The port to bind the HTTP server to.
1836 log_level: The logging level to use. Defaults to "info".
1837 cors: Optional list of CORS allowed origins.
1838 host: The host interface to bind to. Defaults to "127.0.0.1".
1839 expose_sse: Whether to expose via SSE protocol.
1840 expose_streamable_http: Whether to expose via streamable HTTP protocol.
1841 sse_path: Path for SSE endpoint. Defaults to "/sse".
1842 message_path: Path for message endpoint. Defaults to "/message".
1843 keep_alive: Keep-alive interval for SSE. Defaults to KEEP_ALIVE_INTERVAL.
1844 stateless: Whether to use stateless mode for streamable HTTP.
1845 json_response: Whether to return JSON responses for streamable HTTP.
1846 header_mappings: Optional mapping of HTTP headers to environment variables.
1847 """
1848 LOGGER.info(f"Starting multi-protocol server for command: {cmd}")
1849 LOGGER.info(f"Protocols: SSE={expose_sse}, StreamableHTTP={expose_streamable_http}")
1851 # Create a shared pubsub whenever either protocol needs stdout observations
1852 pubsub = _PubSub() if (expose_sse or expose_streamable_http) else None
1854 # Create the stdio endpoint
1855 stdio = StdIOEndpoint(cmd, pubsub, header_mappings=header_mappings) if (expose_sse or expose_streamable_http) and pubsub else None
1857 # Create fastapi app and middleware
1858 app = FastAPI()
1860 # Add CORS middleware if specified
1861 if cors:
1862 app.add_middleware(
1863 CORSMiddleware,
1864 allow_origins=cors,
1865 allow_credentials=True,
1866 allow_methods=["*"],
1867 allow_headers=["*"],
1868 )
1870 # Start stdio if at least one transport requires it
1871 if stdio:
1872 await stdio.start()
1874 # SSE endpoints
1875 if expose_sse and stdio and pubsub:
1877 @app.get(sse_path)
1878 async def get_sse(request: Request) -> EventSourceResponse:
1879 """SSE endpoint.
1881 Args:
1882 request: The incoming HTTP request.
1884 Returns:
1885 EventSourceResponse: Server-sent events stream.
1886 """
1887 if not pubsub:
1888 raise RuntimeError("PubSub not available")
1890 # Extract environment variables from headers if dynamic env is enabled
1891 additional_env_vars = {}
1892 if header_mappings and stdio:
1893 request_headers = dict(request.headers)
1894 additional_env_vars = extract_env_vars_from_headers(request_headers, header_mappings)
1896 # Restart stdio endpoint with new environment variables
1897 if additional_env_vars:
1898 LOGGER.info(f"Restarting stdio endpoint with {len(additional_env_vars)} environment variables")
1899 await stdio.stop() # Stop existing process
1900 await stdio.start(additional_env_vars) # Start with new env vars
1902 queue = pubsub.subscribe()
1903 session_id = uuid.uuid4().hex
1905 async def event_gen() -> AsyncIterator[Dict[str, Any]]:
1906 """Generate SSE events for the client.
1908 Yields:
1909 Dict[str, Any]: SSE event data with event type and payload.
1910 """
1911 endpoint_url = f"{str(request.base_url).rstrip('/')}{message_path}?session_id={session_id}"
1912 yield {
1913 "event": "endpoint",
1914 "data": endpoint_url,
1915 "retry": int(keep_alive * 1000),
1916 }
1918 if DEFAULT_KEEPALIVE_ENABLED:
1919 yield {"event": "keepalive", "data": "{}", "retry": keep_alive * 1000}
1921 try:
1922 while True:
1923 if await request.is_disconnected():
1924 break
1926 try:
1927 timeout = keep_alive if DEFAULT_KEEPALIVE_ENABLED else None
1928 msg = await asyncio.wait_for(queue.get(), timeout)
1929 yield {"event": "message", "data": msg.rstrip()}
1930 except asyncio.TimeoutError:
1931 if DEFAULT_KEEPALIVE_ENABLED:
1932 yield {
1933 "event": "keepalive",
1934 "data": "{}",
1935 "retry": keep_alive * 1000,
1936 }
1937 finally:
1938 if pubsub:
1939 pubsub.unsubscribe(queue)
1941 return EventSourceResponse(
1942 event_gen(),
1943 headers={
1944 "Cache-Control": "no-cache",
1945 "Connection": "keep-alive",
1946 "X-Accel-Buffering": "no",
1947 },
1948 )
1950 @app.post(message_path, status_code=status.HTTP_202_ACCEPTED)
1951 async def post_message(raw: Request, session_id: str | None = None) -> Response:
1952 """Message endpoint for SSE.
1954 Args:
1955 raw: The incoming HTTP request.
1956 session_id: Optional session ID for correlation.
1958 Returns:
1959 Response: Acknowledgement of message receipt.
1960 """
1961 _ = session_id
1963 # Extract environment variables from headers if dynamic env is enabled
1964 additional_env_vars = {}
1965 if header_mappings and stdio:
1966 request_headers = dict(raw.headers)
1967 additional_env_vars = extract_env_vars_from_headers(request_headers, header_mappings)
1969 # Only restart if we have new environment variables
1970 if additional_env_vars:
1971 LOGGER.info(f"Restarting stdio endpoint with {len(additional_env_vars)} environment variables")
1972 await stdio.stop() # Stop existing process
1973 await stdio.start(additional_env_vars) # Start with new env vars
1974 await asyncio.sleep(0.5) # Give process time to initialize
1976 # Ensure stdio endpoint is running
1977 if stdio and not stdio.is_running():
1978 LOGGER.info("Starting stdio endpoint (was not running)")
1979 await stdio.start()
1980 await asyncio.sleep(0.5) # Give process time to initialize
1982 payload = await raw.body()
1983 try:
1984 orjson.loads(payload)
1985 except Exception as exc:
1986 return PlainTextResponse(
1987 f"Invalid JSON payload: {exc}",
1988 status_code=status.HTTP_400_BAD_REQUEST,
1989 )
1990 if not stdio:
1991 raise RuntimeError("Stdio endpoint not available")
1992 await stdio.send(payload.decode().rstrip() + "\n")
1993 return PlainTextResponse("forwarded", status_code=status.HTTP_202_ACCEPTED)
1995 # Add health check
1996 @app.get("/healthz")
1997 async def health() -> Response:
1998 """Health check endpoint.
2000 Returns:
2001 Response: Health status response.
2002 """
2003 return PlainTextResponse("ok")
2005 # Streamable HTTP support
2006 streamable_server = None
2007 streamable_manager = None
2008 streamable_context = None
2010 # Keep a reference to the original FastAPI app so we can wrap it with an ASGI
2011 # layer that delegates `/mcp` scopes to the StreamableHTTPSessionManager if present.
2012 original_app = app
2014 if expose_streamable_http:
2015 # Create an MCP server instance
2016 streamable_server = MCPServer("stdio-proxy")
2018 # Set up the streamable HTTP session manager
2019 streamable_manager = StreamableHTTPSessionManager(
2020 app=streamable_server,
2021 stateless=stateless,
2022 json_response=json_response,
2023 )
2025 # Register POST /mcp on the FastAPI app as the canonical client->server POST
2026 # path for Streamable HTTP. This forwards JSON-RPC notifications/requests to stdio.
2027 @original_app.post("/mcp")
2028 async def mcp_post(request: Request) -> Response:
2029 """
2030 Handles POST requests to the `/mcp` endpoint, forwarding JSON payloads to stdio
2031 and optionally waiting for a correlated response.
2033 The request body is expected to be a JSON object or newline-delimited JSON.
2034 If the JSON includes an "id" field, the function attempts to match it with
2035 a response from stdio using a pubsub queue, within a timeout period.
2037 Args:
2038 request (Request): The incoming FastAPI request containing the JSON payload.
2040 Returns:
2041 Response: A FastAPI Response object.
2042 - 200 OK with matched JSON response if correlation succeeds.
2043 - 202 Accepted if no matching response is received in time or for notifications.
2044 - 400 Bad Request if the payload is not valid JSON.
2046 Example:
2047 >>> import httpx
2048 >>> response = httpx.post("http://localhost:8000/mcp", json={"id": 123, "method": "ping"})
2049 >>> response.status_code in (200, 202)
2050 True
2051 >>> response.text # May be the matched JSON or "accepted"
2052 '{"id": 123, "result": "pong"}' # or "accepted"
2053 """
2054 # Read and validate JSON
2055 body = await request.body()
2056 try:
2057 obj = orjson.loads(body)
2058 except Exception as exc:
2059 return PlainTextResponse(f"Invalid JSON payload: {exc}", status_code=status.HTTP_400_BAD_REQUEST)
2061 # Forward raw newline-delimited JSON to stdio
2062 if not stdio:
2063 raise RuntimeError("Stdio endpoint not available")
2064 await stdio.send(body.decode().rstrip() + "\n")
2066 # If it's a request (has an id) -> attempt to correlate response from stdio
2067 if isinstance(obj, dict) and "id" in obj:
2068 if not pubsub:
2069 return PlainTextResponse("accepted", status_code=status.HTTP_202_ACCEPTED)
2071 queue = pubsub.subscribe()
2072 try:
2073 timeout = 10.0 # seconds; tuneable
2074 deadline = asyncio.get_event_loop().time() + timeout
2075 remaining = max(0.0, deadline - asyncio.get_event_loop().time())
2076 while remaining > 0:
2077 try:
2078 msg = await asyncio.wait_for(queue.get(), timeout=remaining)
2079 except asyncio.TimeoutError:
2080 break
2082 # stdio stdout lines may contain JSON objects or arrays
2083 try:
2084 parsed = orjson.loads(msg)
2085 except (orjson.JSONDecodeError, ValueError):
2086 # not JSON -> skip
2087 remaining = max(0.0, deadline - asyncio.get_event_loop().time())
2088 continue
2090 candidates = parsed if isinstance(parsed, list) else [parsed]
2091 for candidate in candidates:
2092 if isinstance(candidate, dict) and candidate.get("id") == obj.get("id"):
2093 # return the matched response as JSON
2094 return ORJSONResponse(candidate)
2096 remaining = max(0.0, deadline - asyncio.get_event_loop().time())
2098 # timeout -> accept and return 202
2099 return PlainTextResponse("accepted (no response yet)", status_code=status.HTTP_202_ACCEPTED)
2100 finally:
2101 if pubsub:
2102 pubsub.unsubscribe(queue)
2104 # Notification -> return 202
2105 return PlainTextResponse("accepted", status_code=status.HTTP_202_ACCEPTED)
2107 # ASGI wrapper to route GET/other /mcp scopes to streamable_manager.handle_request
2108 async def mcp_asgi_wrapper(scope: Scope, receive: Receive, send: Send) -> None:
2109 """
2110 ASGI middleware that intercepts HTTP requests to the `/mcp` endpoint.
2112 If the request is an HTTP call to `/mcp` and a `streamable_manager` is available,
2113 it can handle the request (currently commented out). All other requests are
2114 passed to the original FastAPI application.
2116 Args:
2117 scope (Scope): The ASGI scope dictionary containing request metadata.
2118 receive (Receive): An awaitable that yields incoming ASGI events.
2119 send (Send): An awaitable used to send ASGI events.
2120 """
2121 await original_app(scope, receive, send)
2123 # Replace the app used by uvicorn with the ASGI wrapper
2124 app = mcp_asgi_wrapper # type: ignore[assignment]
2126 # ---------------------- Server lifecycle ----------------------
2127 config = uvicorn.Config(
2128 app,
2129 host=host,
2130 port=port,
2131 log_level=log_level,
2132 lifespan="off",
2133 )
2134 server = uvicorn.Server(config)
2136 shutting_down = asyncio.Event()
2138 async def _shutdown() -> None:
2139 """Handle graceful shutdown."""
2140 if shutting_down.is_set():
2141 return
2142 shutting_down.set()
2143 LOGGER.info("Shutting down multi-protocol server...")
2144 if stdio:
2145 await stdio.stop()
2146 # Streamable HTTP cleanup handled by server shutdown
2147 # Graceful shutdown by setting the shutdown event
2148 # Use getattr to safely access should_exit attribute
2149 setattr(server, "should_exit", getattr(server, "should_exit", False) or True)
2151 loop = asyncio.get_running_loop()
2152 for sig in (signal.SIGINT, signal.SIGTERM):
2153 with suppress(NotImplementedError):
2154 loop.add_signal_handler(sig, lambda *_: asyncio.create_task(_shutdown()))
2156 # If we have a streamable manager, start its context so it can accept ASGI /mcp
2157 if streamable_manager:
2158 streamable_context = streamable_manager.run()
2159 await streamable_context.__aenter__() # noqa: PLC2801 # pylint: disable=unnecessary-dunder-call,no-member
2161 # Log available endpoints
2162 endpoints = []
2163 if expose_sse:
2164 endpoints.append(f"SSE: http://{host}:{port}{sse_path}")
2165 if expose_streamable_http:
2166 endpoints.append(f"StreamableHTTP: http://{host}:{port}/mcp")
2168 LOGGER.info(f"Multi-protocol server ready → {', '.join(endpoints)}")
2170 try:
2171 await server.serve()
2172 finally:
2173 await _shutdown()
2174 # Clean up streamable HTTP context with timeout to prevent spin loop
2175 # if tasks don't respond to cancellation (anyio _deliver_cancellation issue)
2176 if streamable_context:
2177 # Get cleanup timeout from config (with fallback for standalone usage)
2178 try:
2179 # First-Party
2180 from mcpgateway.config import settings as cfg # pylint: disable=import-outside-toplevel
2182 cleanup_timeout = cfg.mcp_session_pool_cleanup_timeout
2183 except Exception:
2184 cleanup_timeout = 5.0
2185 # Use anyio.move_on_after instead of asyncio.wait_for to properly propagate
2186 # cancellation through anyio's cancel scope system (prevents orphaned spinning tasks)
2187 with anyio.move_on_after(cleanup_timeout) as cleanup_scope:
2188 try:
2189 await streamable_context.__aexit__(None, None, None) # pylint: disable=unnecessary-dunder-call,no-member
2190 except Exception as e:
2191 LOGGER.debug(f"Error cleaning up streamable HTTP context: {e}")
2192 if cleanup_scope.cancelled_caught:
2193 LOGGER.warning("Streamable HTTP context cleanup timed out - proceeding anyway")
2196async def _simple_sse_pump(client: "Any", url: str, max_retries: int, initial_retry_delay: float) -> None:
2197 """Simple SSE pump that just prints messages to stdout.
2199 Used when no stdio command is provided to bridge SSE to stdout directly.
2201 Args:
2202 client: The HTTP client to use for SSE streaming.
2203 url: The SSE endpoint URL to connect to.
2204 max_retries: Maximum number of connection retry attempts.
2205 initial_retry_delay: Initial delay between retries in seconds.
2207 Raises:
2208 HTTPStatusError: If the SSE endpoint returns a non-200 status code.
2209 Exception: For unexpected errors in SSE stream processing.
2210 """
2211 retry_delay = initial_retry_delay
2212 retry_count = 0
2214 while retry_count < max_retries:
2215 try:
2216 LOGGER.info(f"Connecting to SSE endpoint: {url}")
2218 async with client.stream("GET", url) as response:
2219 # Check status code if available (real httpx response)
2220 if hasattr(response, "status_code") and response.status_code != 200:
2221 if httpx:
2222 raise httpx.HTTPStatusError(f"SSE endpoint returned {response.status_code}", request=response.request, response=response)
2223 raise Exception(f"SSE endpoint returned {response.status_code}")
2225 # Reset retry counter on successful connection
2226 retry_count = 0
2227 retry_delay = initial_retry_delay
2228 current_event: Optional[SSEEvent] = None
2230 async for line in response.aiter_lines():
2231 event, is_complete = SSEEvent.parse_sse_line(line, current_event)
2232 current_event = event
2234 if is_complete and current_event:
2235 if current_event.event == "endpoint":
2236 LOGGER.info(f"Received message endpoint: {current_event.data}")
2237 elif current_event.event == "message":
2238 # Just print the message to stdout
2239 print(current_event.data)
2240 elif current_event.event == "keepalive":
2241 LOGGER.debug("Received keepalive")
2243 # Reset for next event
2244 current_event = None
2246 except Exception as e:
2247 # Check if it's one of the expected httpx exceptions
2248 if httpx and isinstance(e, (httpx.ConnectError, httpx.HTTPStatusError, httpx.ReadTimeout)):
2249 retry_count += 1
2250 if retry_count >= max_retries:
2251 LOGGER.error(f"Max retries ({max_retries}) exceeded. Giving up.")
2252 raise
2254 LOGGER.warning(f"Connection error: {e}. Retrying in {retry_delay}s... (attempt {retry_count}/{max_retries})")
2255 await asyncio.sleep(retry_delay)
2256 retry_delay = min(retry_delay * 2, 30) # Exponential backoff, max 30s
2257 else:
2258 LOGGER.error(f"Unexpected error in SSE stream: {e}")
2259 raise
2262def start_streamable_http_stdio(
2263 cmd: str,
2264 port: int,
2265 log_level: str,
2266 cors: Optional[List[str]],
2267 host: str = "127.0.0.1",
2268 stateless: bool = False,
2269 json_response: bool = False,
2270) -> None:
2271 """Start stdio to streamable HTTP bridge.
2273 Entry point for starting a stdio to streamable HTTP bridge server.
2275 Args:
2276 cmd: The command to run as a stdio subprocess.
2277 port: The port to bind the HTTP server to.
2278 log_level: The logging level to use.
2279 cors: Optional list of CORS allowed origins.
2280 host: The host interface to bind to. Defaults to "127.0.0.1".
2281 stateless: Whether to use stateless mode. Defaults to False.
2282 json_response: Whether to return JSON responses. Defaults to False.
2284 Returns:
2285 None: This function does not return a value.
2286 """
2287 return asyncio.run(_run_stdio_to_streamable_http(cmd, port, log_level, cors, host, stateless, json_response))
2290def start_streamable_http_client(url: str, bearer_token: Optional[str] = None, timeout: float = 30.0, stdio_command: Optional[str] = None) -> None:
2291 """Start streamable HTTP to stdio bridge.
2293 Entry point for starting a streamable HTTP to stdio bridge client.
2295 Args:
2296 url: The streamable HTTP endpoint URL to connect to.
2297 bearer_token: Optional OAuth2 bearer token for authentication. Defaults to None.
2298 timeout: HTTP client timeout in seconds. Defaults to 30.0.
2299 stdio_command: Optional command to run for local stdio processing.
2301 Returns:
2302 None: This function does not return a value.
2303 """
2304 return asyncio.run(_run_streamable_http_to_stdio(url, bearer_token, timeout, stdio_command))
2307def start_stdio(
2308 cmd: str, port: int, log_level: str, cors: Optional[List[str]], host: str = "127.0.0.1", sse_path: str = "/sse", message_path: str = "/message", keep_alive: float = KEEP_ALIVE_INTERVAL
2309) -> None:
2310 """Start stdio bridge.
2312 Entry point for starting a stdio to SSE bridge server.
2314 Args:
2315 cmd: The command to run as a stdio subprocess.
2316 port: The port to bind the HTTP server to.
2317 log_level: The logging level to use.
2318 cors: Optional list of CORS allowed origins.
2319 host: The host interface to bind to. Defaults to "127.0.0.1".
2320 sse_path: Path for the SSE endpoint. Defaults to "/sse".
2321 message_path: Path for the message endpoint. Defaults to "/message".
2322 keep_alive: Keep-alive interval in seconds. Defaults to KEEP_ALIVE_INTERVAL.
2324 Returns:
2325 None: This function does not return a value.
2327 Examples:
2328 >>> # Test parameter validation
2329 >>> isinstance(KEEP_ALIVE_INTERVAL, int)
2330 True
2331 >>> KEEP_ALIVE_INTERVAL > 0
2332 True
2333 >>> start_stdio("uvx mcp-server-git", 9000, "info", None) # doctest: +SKIP
2334 """
2335 return asyncio.run(_run_stdio_to_sse(cmd, port, log_level, cors, host, sse_path, message_path, keep_alive))
2338def start_sse(url: str, bearer_token: Optional[str] = None, timeout: float = 30.0, stdio_command: Optional[str] = None) -> None:
2339 """Start SSE bridge.
2341 Entry point for starting an SSE to stdio bridge client.
2343 Examples:
2344 >>> # Test parameter defaults
2345 >>> timeout_default = 30.0
2346 >>> isinstance(timeout_default, float)
2347 True
2348 >>> timeout_default > 0
2349 True
2351 Args:
2352 url: The SSE endpoint URL to connect to.
2353 bearer_token: Optional OAuth2 bearer token for authentication. Defaults to None.
2354 timeout: HTTP client timeout in seconds. Defaults to 30.0.
2355 stdio_command: Optional command to run for local stdio processing.
2357 Returns:
2358 None: This function does not return a value.
2360 Examples:
2361 >>> start_sse("http://example.com/sse", "token123") # doctest: +SKIP
2362 """
2363 return asyncio.run(_run_sse_to_stdio(url, bearer_token, timeout, stdio_command))
2366def main(argv: Optional[Sequence[str]] | None = None) -> None:
2367 """Entry point for the translate module.
2369 Configures logging, parses arguments, and starts the appropriate bridge
2370 based on command line options. Handles keyboard interrupts gracefully.
2372 Args:
2373 argv: Optional sequence of command line arguments. If None, uses sys.argv[1:].
2375 Examples:
2376 >>> # Test argument parsing
2377 >>> try:
2378 ... main(["--stdio", "cat", "--port", "9000"]) # doctest: +SKIP
2379 ... except SystemExit:
2380 ... pass # Would normally start the server
2381 """
2382 args = _parse_args(argv or sys.argv[1:])
2383 logging.basicConfig(
2384 level=getattr(logging, args.logLevel.upper(), logging.INFO),
2385 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
2386 datefmt="%Y-%m-%dT%H:%M:%S",
2387 )
2389 # Parse header mappings if dynamic environment injection is enabled
2390 # Pre-normalize mappings once at startup for O(1) lookups per request
2391 header_mappings: NormalizedMappings | None = None
2392 if getattr(args, "enable_dynamic_env", False):
2393 try:
2394 raw_mappings = parse_header_mappings(getattr(args, "header_to_env", []))
2395 header_mappings = NormalizedMappings(raw_mappings)
2396 LOGGER.info(f"Dynamic environment injection enabled with {len(header_mappings)} header mappings")
2397 except Exception as e:
2398 LOGGER.error(f"Failed to parse header mappings: {e}")
2399 raise
2401 try:
2402 # Handle gRPC server exposure
2403 if getattr(args, "grpc", None):
2404 # First-Party
2405 from mcpgateway.translate_grpc import expose_grpc_via_sse # pylint: disable=import-outside-toplevel
2407 # Parse metadata
2408 metadata = {}
2409 if getattr(args, "grpc_metadata", None):
2410 for item in args.grpc_metadata:
2411 if "=" in item:
2412 key, value = item.split("=", 1)
2413 metadata[key] = value
2415 asyncio.run(
2416 expose_grpc_via_sse(
2417 target=args.grpc,
2418 port=args.port,
2419 tls_enabled=getattr(args, "grpc_tls", False),
2420 tls_cert=getattr(args, "grpc_cert", None),
2421 tls_key=getattr(args, "grpc_key", None),
2422 metadata=metadata,
2423 )
2424 )
2426 # Handle local stdio server exposure
2427 elif args.stdio:
2428 # Check which protocols to expose
2429 expose_sse = getattr(args, "expose_sse", False)
2430 expose_streamable_http = getattr(args, "expose_streamable_http", False)
2432 # If no protocol specified, default to SSE for backward compatibility
2433 if not expose_sse and not expose_streamable_http:
2434 expose_sse = True
2436 # Use multi-protocol server
2437 asyncio.run(
2438 _run_multi_protocol_server(
2439 cmd=args.stdio,
2440 port=args.port,
2441 log_level=args.logLevel,
2442 cors=args.cors,
2443 host=args.host,
2444 expose_sse=expose_sse,
2445 expose_streamable_http=expose_streamable_http,
2446 sse_path=getattr(args, "ssePath", "/sse"),
2447 message_path=getattr(args, "messagePath", "/message"),
2448 keep_alive=getattr(args, "keepAlive", KEEP_ALIVE_INTERVAL),
2449 stateless=getattr(args, "stateless", False),
2450 json_response=getattr(args, "jsonResponse", False),
2451 header_mappings=header_mappings,
2452 )
2453 )
2455 # Handle remote connection modes
2456 elif getattr(args, "connect_sse", None):
2457 start_sse(args.connect_sse, args.oauth2Bearer, 30.0, args.stdioCommand)
2458 elif getattr(args, "connect_streamable_http", None):
2459 start_streamable_http_client(args.connect_streamable_http, args.oauth2Bearer, 30.0, args.stdioCommand)
2460 elif getattr(args, "connect_grpc", None):
2461 print("Error: --connect-grpc mode not yet implemented. Use --grpc to expose a gRPC server.", file=sys.stderr)
2462 sys.exit(1)
2463 else:
2464 print("Error: Must specify either --stdio (to expose local server), --grpc (to expose gRPC server), or --connect-sse/--connect-streamable-http (to connect to remote)", file=sys.stderr)
2465 sys.exit(1)
2466 except KeyboardInterrupt:
2467 print("") # restore shell prompt
2468 sys.exit(0)
2469 except (NotImplementedError, ImportError) as exc:
2470 print(exc, file=sys.stderr)
2471 sys.exit(1)
2474if __name__ == "__main__": # python3 -m mcpgateway.translate ...
2475 main()