Coverage for mcpgateway / translate.py: 99%
806 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
3'''Location: ./mcpgateway/translate.py
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
6Authors: Mihai Criveti, Manav Gupta
8r"""Bridges between different MCP transport protocols.
9This module provides bidirectional bridging between MCP servers that communicate
10via different transport protocols: stdio/JSON-RPC, HTTP/SSE, and streamable HTTP.
11It enables exposing local MCP servers over HTTP or consuming remote endpoints
12as local stdio servers.
14The bridge supports multiple modes of operation:
15- stdio to SSE: Expose a local stdio MCP server over HTTP/SSE
16- SSE to stdio: Bridge a remote SSE endpoint to local stdio
17- stdio to streamable HTTP: Expose a local stdio MCP server via streamable HTTP
18- streamable HTTP to stdio: Bridge a remote streamable HTTP endpoint to local stdio
20Examples:
21 Programmatic usage:
23 >>> import asyncio
24 >>> from mcpgateway.translate import start_stdio
25 >>> asyncio.run(start_stdio("uvx mcp-server-git", 9000, "info", None, "127.0.0.1")) # doctest: +SKIP
27 Test imports and configuration:
29 >>> from mcpgateway.translate import MCPServer, StreamableHTTPSessionManager
30 >>> isinstance(MCPServer, type)
31 True
32 >>> isinstance(StreamableHTTPSessionManager, type)
33 True
34 >>> from mcpgateway.translate import KEEP_ALIVE_INTERVAL
35 >>> KEEP_ALIVE_INTERVAL > 0
36 True
37 >>> from mcpgateway.translate import DEFAULT_KEEPALIVE_ENABLED
38 >>> isinstance(DEFAULT_KEEPALIVE_ENABLED, bool)
39 True
41 Test Starlette imports:
43 >>> from mcpgateway.translate import Starlette, Route
44 >>> isinstance(Starlette, type)
45 True
46 >>> isinstance(Route, type)
47 True
49 Test logging setup:
51 >>> from mcpgateway.translate import LOGGER, logging_service
52 >>> LOGGER is not None
53 True
54 >>> logging_service is not None
55 True
56 >>> hasattr(LOGGER, 'info')
57 True
58 >>> hasattr(LOGGER, 'error')
59 True
60 >>> hasattr(LOGGER, 'debug')
61 True
63 Test utility classes:
65 >>> from mcpgateway.translate import _PubSub, StdIOEndpoint
66 >>> pubsub = _PubSub()
67 >>> hasattr(pubsub, 'publish')
68 True
69 >>> hasattr(pubsub, 'subscribe')
70 True
71 >>> hasattr(pubsub, 'unsubscribe')
72 True
74Usage:
75 Command line usage::
77 # 1. Expose an MCP server that talks JSON-RPC on stdio at :9000/sse
78 python3 -m mcpgateway.translate --stdio "uvx mcp-server-git" --port 9000
80 # 2. Bridge a remote SSE endpoint to local stdio
81 python3 -m mcpgateway.translate --sse "https://example.com/sse" \
82 --stdioCommand "uvx mcp-client"
84 # 3. Expose stdio server via streamable HTTP at :9000/mcp
85 python3 -m mcpgateway.translate --streamableHttp "uvx mcp-server-git" \
86 --port 9000 --stateless --jsonResponse
88 # 4. Connect to remote streamable HTTP endpoint
89 python3 -m mcpgateway.translate \
90 --streamableHttp "https://example.com/mcp" \
91 --oauth2Bearer "your-token"
93 # 5. Test SSE endpoint
94 curl -N http://localhost:9000/sse # receive the stream
96 # 6. Send a test echo request to SSE endpoint
97 curl -X POST http://localhost:9000/message \
98 -H 'Content-Type: application/json' \
99 -d '{"jsonrpc":"2.0","id":1,"method":"echo","params":{"value":"hi"}}'
101 # 7. Test streamable HTTP endpoint
102 curl -X POST http://localhost:9000/mcp \
103 -H 'Content-Type: application/json' \
104 -d '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{},"clientInfo":{"name":"demo","version":"0.0.1"}}}'
106 The SSE stream emits JSON-RPC responses as ``event: message`` frames and sends
107 regular ``event: keepalive`` frames (default every 30s) to prevent timeouts.
109 Streamable HTTP supports both stateful (with session management) and stateless
110 modes, and can return either JSON responses or SSE streams.
111"""
112'''
114# Future
115from __future__ import annotations
117# Standard
118import argparse
119import asyncio
120from contextlib import suppress
121import logging
122import os
123import shlex
124import signal
125import sys
126from typing import Any, AsyncIterator, Dict, List, Optional, Sequence, Tuple
127from urllib.parse import urlencode
128import uuid
130# Third-Party
131import anyio
132from fastapi import FastAPI, Request, Response, status
133from fastapi.middleware.cors import CORSMiddleware
134from fastapi.responses import PlainTextResponse
135from mcp.server import Server as MCPServer
136from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
137import orjson
138from starlette.applications import Starlette
139from starlette.routing import Route
140from starlette.types import Receive, Scope, Send
141import uvicorn
143try:
144 # Third-Party
145 import httpx
146except ImportError:
147 httpx = None # type: ignore[assignment]
149# First-Party
150from mcpgateway.services.logging_service import LoggingService
151from mcpgateway.translate_header_utils import extract_env_vars_from_headers, NormalizedMappings, parse_header_mappings
153# Use patched EventSourceResponse with CPU spin protection (anyio#695 fix)
154from mcpgateway.transports.sse_transport import EventSourceResponse
155from mcpgateway.utils.orjson_response import ORJSONResponse
157# Initialize logging service first
158logging_service = LoggingService()
159LOGGER = logging_service.get_logger("mcpgateway.translate")
160CONTENT_TYPE = os.getenv("FORGE_CONTENT_TYPE", "application/json")
161# headers = {"Content-Type": CONTENT_TYPE}
162# Import settings for default keepalive interval
163try:
164 # First-Party
165 from mcpgateway.config import settings
167 DEFAULT_KEEP_ALIVE_INTERVAL = settings.sse_keepalive_interval
168 DEFAULT_KEEPALIVE_ENABLED = settings.sse_keepalive_enabled
169 DEFAULT_SSL_VERIFY = not settings.skip_ssl_verify
170except ImportError:
171 # Fallback if config not available
172 DEFAULT_KEEP_ALIVE_INTERVAL = 30
173 DEFAULT_KEEPALIVE_ENABLED = True
174 DEFAULT_SSL_VERIFY = True # Verify SSL by default when config unavailable
176KEEP_ALIVE_INTERVAL = DEFAULT_KEEP_ALIVE_INTERVAL # seconds - from config or fallback to 30
178# Buffer limit for subprocess stdout (default 64KB is too small for large tool responses)
179# Set to 16MB to handle tools that return large amounts of data (e.g., search results)
180STDIO_BUFFER_LIMIT = 16 * 1024 * 1024 # 16MB
182__all__ = ["main"] # for console-script entry-point
185# ---------------------------------------------------------------------------#
186# Helpers - trivial in-process Pub/Sub #
187# ---------------------------------------------------------------------------#
188class _PubSub:
189 """Very small fan-out helper - one async Queue per subscriber.
191 This class implements a simple publish-subscribe pattern using asyncio queues
192 for distributing messages from stdio subprocess to multiple SSE clients.
194 Examples:
195 >>> import asyncio
196 >>> async def test_pubsub():
197 ... pubsub = _PubSub()
198 ... q = pubsub.subscribe()
199 ... await pubsub.publish("hello")
200 ... result = await q.get()
201 ... pubsub.unsubscribe(q)
202 ... return result
203 >>> asyncio.run(test_pubsub())
204 'hello'
205 """
207 def __init__(self) -> None:
208 """Initialize a new publish-subscribe system.
210 Creates an empty list of subscriber queues. Each subscriber will
211 receive their own asyncio.Queue for receiving published messages.
213 Examples:
214 >>> pubsub = _PubSub()
215 >>> isinstance(pubsub._subscribers, list)
216 True
217 >>> len(pubsub._subscribers)
218 0
219 >>> hasattr(pubsub, '_subscribers')
220 True
221 """
222 self._subscribers: List[asyncio.Queue[str]] = []
224 async def publish(self, data: str) -> None:
225 """Publish data to all subscribers.
227 Dead queues (full) are automatically removed from the subscriber list.
229 Args:
230 data: The data string to publish to all subscribers.
232 Examples:
233 >>> import asyncio
234 >>> async def test_publish():
235 ... pubsub = _PubSub()
236 ... await pubsub.publish("test") # No subscribers, no error
237 ... return True
238 >>> asyncio.run(test_publish())
239 True
241 >>> # Test queue full handling
242 >>> async def test_full_queue():
243 ... pubsub = _PubSub()
244 ... # Create a queue with size 1
245 ... q = asyncio.Queue(maxsize=1)
246 ... pubsub._subscribers = [q]
247 ... # Fill the queue
248 ... await q.put("first")
249 ... # This should remove the full queue
250 ... await pubsub.publish("second")
251 ... return len(pubsub._subscribers)
252 >>> asyncio.run(test_full_queue())
253 0
254 """
255 dead: List[asyncio.Queue[str]] = []
256 for q in self._subscribers:
257 try:
258 q.put_nowait(data)
259 except asyncio.QueueFull:
260 dead.append(q)
261 for q in dead:
262 with suppress(ValueError):
263 self._subscribers.remove(q)
265 def subscribe(self) -> "asyncio.Queue[str]":
266 """Subscribe to published data.
268 Creates a new queue for receiving published messages with a maximum
269 size of 1024 items.
271 Returns:
272 asyncio.Queue[str]: A queue that will receive published data.
274 Examples:
275 >>> pubsub = _PubSub()
276 >>> q = pubsub.subscribe()
277 >>> isinstance(q, asyncio.Queue)
278 True
279 >>> q.maxsize
280 1024
281 >>> len(pubsub._subscribers)
282 1
283 >>> pubsub._subscribers[0] is q
284 True
285 """
286 q: asyncio.Queue[str] = asyncio.Queue(maxsize=1024)
287 self._subscribers.append(q)
288 return q
290 def unsubscribe(self, q: "asyncio.Queue[str]") -> None:
291 """Unsubscribe from published data.
293 Removes the queue from the subscriber list. Safe to call even if
294 the queue is not in the list.
296 Args:
297 q: The queue to unsubscribe from published data.
299 Examples:
300 >>> pubsub = _PubSub()
301 >>> q = pubsub.subscribe()
302 >>> pubsub.unsubscribe(q)
303 >>> pubsub.unsubscribe(q) # No error on double unsubscribe
304 """
305 with suppress(ValueError):
306 self._subscribers.remove(q)
309# ---------------------------------------------------------------------------#
310# StdIO endpoint (child process ↔ async queues) #
311# ---------------------------------------------------------------------------#
312class StdIOEndpoint:
313 """Wrap a child process whose stdin/stdout speak line-delimited JSON-RPC.
315 This class manages a subprocess that communicates via stdio using JSON-RPC
316 protocol, pumping messages between the subprocess and a pubsub system.
318 Examples:
319 >>> import asyncio
320 >>> async def test_stdio():
321 ... pubsub = _PubSub()
322 ... stdio = StdIOEndpoint("echo hello", pubsub)
323 ... # Would start a real subprocess
324 ... return isinstance(stdio, StdIOEndpoint)
325 >>> asyncio.run(test_stdio())
326 True
327 """
329 def __init__(self, cmd: str, pubsub: _PubSub, env_vars: Optional[Dict[str, str]] = None, header_mappings: Optional[NormalizedMappings] = None) -> None:
330 """Initialize a stdio endpoint for subprocess communication.
332 Sets up the endpoint with the command to run and the pubsub system
333 for message distribution. The subprocess is not started until start()
334 is called.
336 Args:
337 cmd: The command string to execute as a subprocess.
338 pubsub: The publish-subscribe system for distributing subprocess
339 output to SSE clients.
340 env_vars: Optional dictionary of environment variables to set
341 when starting the subprocess.
342 header_mappings: Optional mapping of HTTP headers to environment variable names
343 for dynamic environment injection.
345 Examples:
346 >>> pubsub = _PubSub()
347 >>> endpoint = StdIOEndpoint("echo hello", pubsub)
348 >>> endpoint._cmd
349 'echo hello'
350 >>> endpoint._proc is None
351 True
352 >>> isinstance(endpoint._pubsub, _PubSub)
353 True
354 >>> endpoint._stdin is None
355 True
356 >>> endpoint._pump_task is None
357 True
358 >>> endpoint._pubsub is pubsub
359 True
360 """
361 self._cmd = cmd
362 self._pubsub = pubsub
363 self._env_vars = env_vars or {}
364 self._header_mappings = header_mappings
365 self._proc: Optional[asyncio.subprocess.Process] = None
366 self._stdin: Optional[asyncio.StreamWriter] = None
367 self._pump_task: Optional[asyncio.Task[None]] = None
369 async def start(self, additional_env_vars: Optional[Dict[str, str]] = None) -> None:
370 """Start the stdio subprocess with custom environment variables.
372 Creates the subprocess and starts the stdout pump task. The subprocess
373 is created with stdin/stdout pipes and stderr passed through.
375 Args:
376 additional_env_vars: Optional dictionary of additional environment
377 variables to set when starting the subprocess. These will be
378 combined with the environment variables set during initialization.
380 Raises:
381 RuntimeError: If the subprocess fails to create stdin/stdout pipes.
383 Examples:
384 >>> import asyncio # doctest: +SKIP
385 >>> async def test_start(): # doctest: +SKIP
386 ... pubsub = _PubSub()
387 ... stdio = StdIOEndpoint("cat", pubsub)
388 ... # await stdio.start() # doctest: +SKIP
389 ... return True
390 >>> asyncio.run(test_start()) # doctest: +SKIP
391 True
392 """
393 # Stop existing subprocess before starting a new one
394 if self._proc is not None:
395 await self.stop()
397 LOGGER.info(f"Starting stdio subprocess: {self._cmd}")
399 # Build environment from base + configured + additional
400 env = os.environ.copy()
401 env.update(self._env_vars)
402 if additional_env_vars:
403 env.update(additional_env_vars)
405 # System-critical environment variables that must never be cleared
406 system_critical_vars = {"PATH", "HOME", "TMPDIR", "TEMP", "TMP", "USER", "LOGNAME", "SHELL", "LANG", "LC_ALL", "LC_CTYPE", "PYTHONHOME", "PYTHONPATH"}
408 # Clear any mapped env vars that weren't provided in headers to avoid inheritance
409 if self._header_mappings:
410 for env_var_name in self._header_mappings.values():
411 if env_var_name not in (additional_env_vars or {}) and env_var_name not in system_critical_vars:
412 # Delete the variable instead of setting to empty string to avoid
413 # breaking subprocess initialization
414 env.pop(env_var_name, None)
416 LOGGER.debug(f"Subprocess environment variables: {list(env.keys())}")
417 self._proc = await asyncio.create_subprocess_exec(
418 *shlex.split(self._cmd),
419 stdin=asyncio.subprocess.PIPE,
420 stdout=asyncio.subprocess.PIPE,
421 stderr=sys.stderr, # passthrough for visibility
422 env=env, # 🔑 Add environment variable support
423 limit=STDIO_BUFFER_LIMIT, # Increase buffer limit for large tool responses
424 )
426 # Explicit error checking
427 if not self._proc.stdin or not self._proc.stdout:
428 raise RuntimeError(f"Failed to create subprocess with stdin/stdout pipes for command: {self._cmd}")
430 LOGGER.debug("Subprocess started successfully")
432 self._stdin = self._proc.stdin
433 self._pump_task = asyncio.create_task(self._pump_stdout())
435 async def stop(self) -> None:
436 """Stop the stdio subprocess.
438 Terminates the subprocess gracefully with a 5-second timeout,
439 then cancels the pump task.
441 Examples:
442 >>> import asyncio
443 >>> async def test_stop():
444 ... pubsub = _PubSub()
445 ... stdio = StdIOEndpoint("cat", pubsub)
446 ... await stdio.stop() # Safe to call even if not started
447 ... return True
448 >>> asyncio.run(test_stop())
449 True
450 """
451 if self._proc is None:
452 return
454 # Check if process is still running
455 try:
456 if self._proc.returncode is not None:
457 # Process already terminated
458 LOGGER.info(f"Subprocess (pid={self._proc.pid}) already terminated")
459 self._proc = None
460 self._stdin = None
461 return
462 except (ProcessLookupError, AttributeError):
463 # Process doesn't exist or is already cleaned up
464 LOGGER.info("Subprocess already cleaned up")
465 self._proc = None
466 self._stdin = None
467 return
469 LOGGER.info(f"Stopping subprocess (pid={self._proc.pid})")
470 try:
471 self._proc.terminate()
472 with suppress(asyncio.TimeoutError):
473 await asyncio.wait_for(self._proc.wait(), timeout=5)
474 except ProcessLookupError:
475 # Process already terminated
476 LOGGER.info("Subprocess already terminated")
477 finally:
478 if self._pump_task:
479 self._pump_task.cancel()
480 # Wait for pump task to actually finish, suppressing all exceptions
481 # since the pump task logs its own errors. Use BaseException to catch
482 # CancelledError which inherits from BaseException in Python 3.8+
483 with suppress(BaseException):
484 await self._pump_task
485 self._proc = None
486 self._stdin = None # Reset stdin too!
488 def is_running(self) -> bool:
489 """Check if the stdio subprocess is currently running.
491 Returns:
492 True if the subprocess is running, False otherwise.
494 Examples:
495 >>> import asyncio
496 >>> async def test_is_running():
497 ... pubsub = _PubSub()
498 ... stdio = StdIOEndpoint("cat", pubsub)
499 ... return stdio.is_running()
500 >>> asyncio.run(test_is_running())
501 False
502 """
503 return self._proc is not None
505 async def send(self, raw: str) -> None:
506 """Send data to the subprocess stdin.
508 Args:
509 raw: The raw data string to send to the subprocess.
511 Raises:
512 RuntimeError: If the stdio endpoint is not started.
514 Examples:
515 >>> import asyncio
516 >>> async def test_send():
517 ... pubsub = _PubSub()
518 ... stdio = StdIOEndpoint("cat", pubsub)
519 ... try:
520 ... await stdio.send("test")
521 ... except RuntimeError as e:
522 ... return str(e)
523 >>> asyncio.run(test_send())
524 'stdio endpoint not started'
525 """
526 if not self._stdin:
527 raise RuntimeError("stdio endpoint not started")
528 LOGGER.debug(f"→ stdio: {raw.strip()}")
529 self._stdin.write(raw.encode())
530 await self._stdin.drain()
532 async def _pump_stdout(self) -> None:
533 """Pump stdout from subprocess to pubsub.
535 Continuously reads lines from the subprocess stdout and publishes them
536 to the pubsub system. Runs until EOF or exception.
538 Raises:
539 RuntimeError: If process or stdout is not properly initialized.
540 Exception: For any other error encountered while pumping stdout.
541 """
542 if not self._proc or not self._proc.stdout:
543 raise RuntimeError("Process not properly initialized: missing stdout")
545 reader = self._proc.stdout
546 try:
547 while True:
548 line = await reader.readline()
549 if not line: # EOF
550 break
551 text = line.decode(errors="replace")
552 LOGGER.debug(f"← stdio: {text.strip()}")
553 await self._pubsub.publish(text)
554 except ConnectionResetError: # pragma: no cover --subprocess terminated
555 # Subprocess terminated abruptly - this is expected behavior
556 LOGGER.debug("stdout pump: subprocess connection closed")
557 except Exception: # pragma: no cover --best-effort logging
558 LOGGER.exception("stdout pump crashed - terminating bridge")
559 raise
562# ---------------------------------------------------------------------------#
563# SSE Event Parser #
564# ---------------------------------------------------------------------------#
565class SSEEvent:
566 """Represents a Server-Sent Event with proper field parsing.
568 Attributes:
569 event: The event type (e.g., 'message', 'keepalive', 'endpoint')
570 data: The event data payload
571 event_id: Optional event ID
572 retry: Optional retry interval in milliseconds
573 """
575 def __init__(self, event: str = "message", data: str = "", event_id: Optional[str] = None, retry: Optional[int] = None):
576 """Initialize an SSE event.
578 Args:
579 event: Event type, defaults to "message"
580 data: Event data payload
581 event_id: Optional event ID
582 retry: Optional retry interval in milliseconds
583 """
584 self.event = event
585 self.data = data
586 self.event_id = event_id
587 self.retry = retry
589 @classmethod
590 def parse_sse_line(cls, line: str, current_event: Optional["SSEEvent"] = None) -> Tuple[Optional["SSEEvent"], bool]:
591 """Parse a single SSE line and update or create an event.
593 Args:
594 line: The SSE line to parse
595 current_event: The current event being built (if any)
597 Returns:
598 Tuple of (event, is_complete) where event is the SSEEvent object
599 and is_complete indicates if the event is ready to be processed
600 """
601 line = line.rstrip("\n\r")
603 # Empty line signals end of event
604 if not line:
605 if current_event and current_event.data:
606 return current_event, True
607 return None, False
609 # Comment line
610 if line.startswith(":"):
611 return current_event, False
613 # Parse field
614 if ":" in line:
615 field, value = line.split(":", 1)
616 value = value.lstrip(" ") # Remove leading space if present
617 else:
618 field = line
619 value = ""
621 # Create event if needed
622 if current_event is None:
623 current_event = cls()
625 # Update fields
626 if field == "event":
627 current_event.event = value
628 elif field == "data":
629 if current_event.data:
630 current_event.data += "\n" + value
631 else:
632 current_event.data = value
633 elif field == "id":
634 current_event.event_id = value
635 elif field == "retry":
636 try:
637 current_event.retry = int(value)
638 except ValueError:
639 pass # Ignore invalid retry values
641 return current_event, False
644# ---------------------------------------------------------------------------#
645# FastAPI app exposing /sse & /message #
646# ---------------------------------------------------------------------------#
649def _build_fastapi(
650 pubsub: _PubSub,
651 stdio: StdIOEndpoint,
652 keep_alive: float = KEEP_ALIVE_INTERVAL,
653 sse_path: str = "/sse",
654 message_path: str = "/message",
655 cors_origins: Optional[List[str]] = None,
656 header_mappings: Optional[NormalizedMappings] = None,
657) -> FastAPI:
658 """Build FastAPI application with SSE and message endpoints.
660 Creates a FastAPI app with SSE streaming endpoint and message posting
661 endpoint for bidirectional communication with the stdio subprocess.
663 Args:
664 pubsub: The publish/subscribe system for message routing.
665 stdio: The stdio endpoint for subprocess communication.
666 keep_alive: Interval in seconds for keepalive messages. Defaults to KEEP_ALIVE_INTERVAL.
667 sse_path: Path for the SSE endpoint. Defaults to "/sse".
668 message_path: Path for the message endpoint. Defaults to "/message".
669 cors_origins: Optional list of CORS allowed origins.
670 header_mappings: Optional mapping of HTTP headers to environment variables.
672 Returns:
673 FastAPI: The configured FastAPI application.
675 Examples:
676 >>> pubsub = _PubSub()
677 >>> stdio = StdIOEndpoint("cat", pubsub)
678 >>> app = _build_fastapi(pubsub, stdio)
679 >>> isinstance(app, FastAPI)
680 True
681 >>> "/sse" in [r.path for r in app.routes]
682 True
683 >>> "/message" in [r.path for r in app.routes]
684 True
685 >>> "/healthz" in [r.path for r in app.routes]
686 True
688 >>> # Test with custom paths
689 >>> app2 = _build_fastapi(pubsub, stdio, sse_path="/events", message_path="/send")
690 >>> "/events" in [r.path for r in app2.routes]
691 True
692 >>> "/send" in [r.path for r in app2.routes]
693 True
695 >>> # Test CORS middleware is added
696 >>> app3 = _build_fastapi(pubsub, stdio, cors_origins=["http://example.com"])
697 >>> # Check that middleware stack includes CORSMiddleware
698 >>> any("CORSMiddleware" in str(m) for m in app3.user_middleware)
699 True
700 """
701 app = FastAPI()
703 # Add CORS middleware if origins specified
704 if cors_origins:
705 app.add_middleware(
706 CORSMiddleware,
707 allow_origins=cors_origins,
708 allow_credentials=True,
709 allow_methods=["*"],
710 allow_headers=["*"],
711 )
713 # ----- GET /sse ---------------------------------------------------------#
714 @app.get(sse_path)
715 async def get_sse(request: Request) -> EventSourceResponse: # noqa: D401
716 """Stream subprocess stdout to any number of SSE clients.
718 Args:
719 request (Request): The incoming ``GET`` request that will be
720 upgraded to a Server-Sent Events (SSE) stream.
722 Returns:
723 EventSourceResponse: A streaming response that forwards JSON-RPC
724 messages from the child process and emits periodic ``keepalive``
725 frames so that clients and proxies do not time out.
726 """
727 # Extract environment variables from headers if dynamic env is enabled
728 additional_env_vars = {}
729 if header_mappings:
730 request_headers = dict(request.headers)
731 additional_env_vars = extract_env_vars_from_headers(request_headers, header_mappings)
733 # Restart stdio endpoint with new environment variables
734 if additional_env_vars:
735 LOGGER.info(f"Restarting stdio endpoint with {len(additional_env_vars)} environment variables")
736 await stdio.stop() # Stop existing process
737 await stdio.start(additional_env_vars) # Start with new env vars
739 queue = pubsub.subscribe()
740 session_id = uuid.uuid4().hex
742 async def event_gen() -> AsyncIterator[Dict[str, Any]]:
743 """Generate Server-Sent Events for the SSE stream.
745 Yields SSE events in the following sequence:
746 1. An 'endpoint' event with the message posting URL (required by MCP spec)
747 2. An immediate 'keepalive' event to confirm the stream is active
748 3. 'message' events containing JSON-RPC responses from the subprocess
749 4. Periodic 'keepalive' events to prevent timeouts
751 The generator runs until the client disconnects or the server shuts down.
752 Automatically unsubscribes from the pubsub system on completion.
754 Yields:
755 Dict[str, Any]: SSE event dictionaries containing:
756 - event: The event type ('endpoint', 'message', or 'keepalive')
757 - data: The event payload (URL, JSON-RPC message, or empty object)
758 - retry: Retry interval in milliseconds for reconnection
760 Examples:
761 >>> import asyncio
762 >>> async def test_event_gen():
763 ... # This is tested indirectly through the SSE endpoint
764 ... return True
765 >>> asyncio.run(test_event_gen())
766 True
767 """
768 # 1️⃣ Mandatory "endpoint" bootstrap required by the MCP spec
769 endpoint_url = f"{str(request.base_url).rstrip('/')}{message_path}?session_id={session_id}"
770 yield {
771 "event": "endpoint",
772 "data": endpoint_url,
773 "retry": int(keep_alive * 1000),
774 }
776 # 2️⃣ Immediate keepalive so clients know the stream is alive (if enabled in config)
777 if DEFAULT_KEEPALIVE_ENABLED:
778 yield {"event": "keepalive", "data": "{}", "retry": keep_alive * 1000}
780 try:
781 while True:
782 if await request.is_disconnected():
783 break
785 try:
786 timeout = keep_alive if DEFAULT_KEEPALIVE_ENABLED else None
787 msg = await asyncio.wait_for(queue.get(), timeout)
788 yield {"event": "message", "data": msg.rstrip()}
789 except asyncio.TimeoutError:
790 if DEFAULT_KEEPALIVE_ENABLED:
791 yield {
792 "event": "keepalive",
793 "data": "{}",
794 "retry": keep_alive * 1000,
795 }
796 finally:
797 if pubsub:
798 pubsub.unsubscribe(queue)
800 return EventSourceResponse(
801 event_gen(),
802 headers={
803 "Cache-Control": "no-cache",
804 "Connection": "keep-alive",
805 "X-Accel-Buffering": "no", # disable proxy buffering
806 },
807 )
809 # ----- POST /message ----------------------------------------------------#
810 @app.post(message_path, status_code=status.HTTP_202_ACCEPTED)
811 async def post_message(raw: Request, session_id: str | None = None) -> Response: # noqa: D401
812 """Forward a raw JSON-RPC request to the stdio subprocess.
814 Args:
815 raw (Request): The incoming ``POST`` request whose body contains
816 a single JSON-RPC message.
817 session_id (str | None): The SSE session identifier that originated
818 this back-channel call (present when the client obtained the
819 endpoint URL from an ``endpoint`` bootstrap frame).
821 Returns:
822 Response: ``202 Accepted`` if the payload is forwarded successfully,
823 or ``400 Bad Request`` when the body is not valid JSON.
824 """
825 _ = session_id # Unused but required for API compatibility
827 # Extract environment variables from headers if dynamic env is enabled
828 additional_env_vars = {}
829 if header_mappings:
830 request_headers = dict(raw.headers)
831 additional_env_vars = extract_env_vars_from_headers(request_headers, header_mappings)
833 # Restart stdio endpoint with new environment variables
834 if additional_env_vars:
835 LOGGER.info(f"Restarting stdio endpoint with {len(additional_env_vars)} environment variables")
836 await stdio.stop() # Stop existing process
837 await stdio.start(additional_env_vars) # Start with new env vars
838 await asyncio.sleep(0.5) # Give process time to initialize
840 # Ensure stdio endpoint is running
841 if not stdio.is_running():
842 LOGGER.info("Starting stdio endpoint (was not running)")
843 await stdio.start()
844 await asyncio.sleep(0.5) # Give process time to initialize
846 payload = await raw.body()
847 try:
848 orjson.loads(payload) # validate
849 except Exception as exc: # noqa: BLE001
850 return PlainTextResponse(
851 f"Invalid JSON payload: {exc}",
852 status_code=status.HTTP_400_BAD_REQUEST,
853 )
854 await stdio.send(payload.decode().rstrip() + "\n")
855 return PlainTextResponse("forwarded", status_code=status.HTTP_202_ACCEPTED)
857 # ----- Liveness ---------------------------------------------------------#
858 @app.get("/healthz")
859 async def health() -> Response: # noqa: D401
860 """Health check endpoint.
862 Returns:
863 Response: A plain text response with "ok" status.
864 """
865 return PlainTextResponse("ok")
867 return app
870# ---------------------------------------------------------------------------#
871# CLI & orchestration #
872# ---------------------------------------------------------------------------#
875def _parse_args(argv: Sequence[str]) -> argparse.Namespace:
876 """Parse command line arguments.
878 Validates mutually exclusive source options and sets defaults for
879 port and logging configuration.
881 Args:
882 argv: Sequence of command line arguments.
884 Returns:
885 argparse.Namespace: Parsed command line arguments.
887 Raises:
888 NotImplementedError: If streamableHttp option is specified.
890 Examples:
891 >>> args = _parse_args(["--stdio", "cat", "--port", "9000"])
892 >>> args.stdio
893 'cat'
894 >>> args.port
895 9000
896 >>> args.logLevel
897 'info'
898 >>> args.host
899 '127.0.0.1'
900 >>> args.cors is None
901 True
902 >>> args.oauth2Bearer is None
903 True
905 >>> # Test default parameters
906 >>> args = _parse_args(["--stdio", "cat"])
907 >>> args.port
908 8000
909 >>> args.host
910 '127.0.0.1'
911 >>> args.logLevel
912 'info'
914 >>> # Test connect-sse mode
915 >>> args = _parse_args(["--connect-sse", "http://example.com/sse"])
916 >>> args.connect_sse
917 'http://example.com/sse'
918 >>> args.stdio is None
919 True
921 >>> # Test CORS configuration
922 >>> args = _parse_args(["--stdio", "cat", "--cors", "https://app.com", "https://web.com"])
923 >>> args.cors
924 ['https://app.com', 'https://web.com']
926 >>> # Test OAuth2 Bearer token
927 >>> args = _parse_args(["--connect-sse", "http://example.com", "--oauth2Bearer", "token123"])
928 >>> args.oauth2Bearer
929 'token123'
931 >>> # Test custom host and log level
932 >>> args = _parse_args(["--stdio", "cat", "--host", "0.0.0.0", "--logLevel", "debug"])
933 >>> args.host
934 '0.0.0.0'
935 >>> args.logLevel
936 'debug'
938 >>> # Test expose protocols
939 >>> args = _parse_args(["--stdio", "uvx mcp-server-git", "--expose-sse", "--expose-streamable-http"])
940 >>> args.stdio
941 'uvx mcp-server-git'
942 >>> args.expose_sse
943 True
944 >>> args.expose_streamable_http
945 True
946 >>> args.stateless
947 False
948 >>> args.jsonResponse
949 False
951 >>> # Test new parameters
952 >>> args = _parse_args(["--stdio", "cat", "--ssePath", "/events", "--messagePath", "/send", "--keepAlive", "60"])
953 >>> args.ssePath
954 '/events'
955 >>> args.messagePath
956 '/send'
957 >>> args.keepAlive
958 60
960 >>> # Test connect-sse with stdio command
961 >>> args = _parse_args(["--connect-sse", "http://example.com/sse", "--stdioCommand", "uvx mcp-server-git"])
962 >>> args.stdioCommand
963 'uvx mcp-server-git'
965 >>> # Test connect-sse without stdio command (allowed)
966 >>> args = _parse_args(["--connect-sse", "http://example.com/sse"])
967 >>> args.stdioCommand is None
968 True
969 """
970 p = argparse.ArgumentParser(
971 prog="mcpgateway.translate",
972 description="Bridges between different MCP transport protocols: stdio, SSE, and streamable HTTP.",
973 )
975 # Source/destination options
976 p.add_argument("--stdio", help='Local command to run, e.g. "uvx mcp-server-git"')
977 p.add_argument("--connect-sse", dest="connect_sse", help="Connect to remote SSE endpoint URL")
978 p.add_argument("--connect-streamable-http", dest="connect_streamable_http", help="Connect to remote streamable HTTP endpoint URL")
979 p.add_argument("--grpc", type=str, help="gRPC server target (host:port) to expose")
980 p.add_argument("--connect-grpc", type=str, help="Remote gRPC endpoint to connect to")
982 # Protocol exposure options (can be combined)
983 p.add_argument("--expose-sse", action="store_true", help="Expose via SSE protocol (endpoints: /sse and /message)")
984 p.add_argument("--expose-streamable-http", action="store_true", help="Expose via streamable HTTP protocol (endpoint: /mcp)")
986 # gRPC configuration options
987 p.add_argument("--grpc-tls", action="store_true", help="Enable TLS for gRPC connection")
988 p.add_argument("--grpc-cert", type=str, help="Path to TLS certificate for gRPC")
989 p.add_argument("--grpc-key", type=str, help="Path to TLS key for gRPC")
990 p.add_argument("--grpc-metadata", action="append", help="gRPC metadata (KEY=VALUE, repeatable)")
992 p.add_argument("--port", type=int, default=8000, help="HTTP port to bind")
993 p.add_argument("--host", default="127.0.0.1", help="Host interface to bind (default: 127.0.0.1)")
994 p.add_argument(
995 "--logLevel",
996 default="info",
997 choices=["debug", "info", "warning", "error", "critical"],
998 help="Log level",
999 )
1000 p.add_argument(
1001 "--cors",
1002 nargs="*",
1003 help="CORS allowed origins (e.g., --cors https://app.example.com)",
1004 )
1005 p.add_argument(
1006 "--oauth2Bearer",
1007 help="OAuth2 Bearer token for authentication",
1008 )
1010 # New configuration options
1011 p.add_argument(
1012 "--ssePath",
1013 default="/sse",
1014 help="SSE endpoint path (default: /sse)",
1015 )
1016 p.add_argument(
1017 "--messagePath",
1018 default="/message",
1019 help="Message endpoint path (default: /message)",
1020 )
1021 p.add_argument(
1022 "--keepAlive",
1023 type=int,
1024 default=KEEP_ALIVE_INTERVAL,
1025 help=f"Keep-alive interval in seconds (default: {KEEP_ALIVE_INTERVAL})",
1026 )
1028 # For SSE to stdio mode
1029 p.add_argument(
1030 "--stdioCommand",
1031 help="Command to run when bridging SSE/streamableHttp to stdio (optional with --sse or --streamableHttp)",
1032 )
1034 # Dynamic environment variable injection
1035 p.add_argument("--enable-dynamic-env", action="store_true", help="Enable dynamic environment variable injection from HTTP headers")
1036 p.add_argument(
1037 "--header-to-env",
1038 action="append",
1039 default=[],
1040 help="Map HTTP header to environment variable (format: HEADER=ENV_VAR, can be used multiple times). Case-insensitive duplicates are rejected (e.g., Authorization and authorization cannot both be mapped).",
1041 )
1043 # For streamable HTTP mode
1044 p.add_argument(
1045 "--stateless",
1046 action="store_true",
1047 help="Use stateless mode for streamable HTTP (default: False)",
1048 )
1049 p.add_argument(
1050 "--jsonResponse",
1051 action="store_true",
1052 help="Return JSON responses instead of SSE streams for streamable HTTP (default: False)",
1053 )
1055 args = p.parse_args(argv)
1056 # streamableHttp is now supported, no need to raise NotImplementedError
1057 return args
1060async def _run_stdio_to_sse(
1061 cmd: str,
1062 port: int,
1063 log_level: str = "info",
1064 cors: Optional[List[str]] = None,
1065 host: str = "127.0.0.1",
1066 sse_path: str = "/sse",
1067 message_path: str = "/message",
1068 keep_alive: float = KEEP_ALIVE_INTERVAL,
1069 header_mappings: Optional[NormalizedMappings] = None,
1070) -> None:
1071 """Run stdio to SSE bridge.
1073 Starts a subprocess and exposes it via HTTP/SSE endpoints. Handles graceful
1074 shutdown on SIGINT/SIGTERM.
1076 Args:
1077 cmd: The command to run as a stdio subprocess.
1078 port: The port to bind the HTTP server to.
1079 log_level: The logging level to use. Defaults to "info".
1080 cors: Optional list of CORS allowed origins.
1081 host: The host interface to bind to. Defaults to "127.0.0.1" for security.
1082 sse_path: Path for the SSE endpoint. Defaults to "/sse".
1083 message_path: Path for the message endpoint. Defaults to "/message".
1084 keep_alive: Keep-alive interval in seconds. Defaults to KEEP_ALIVE_INTERVAL.
1085 header_mappings: Optional mapping of HTTP headers to environment variables.
1087 Examples:
1088 >>> import asyncio # doctest: +SKIP
1089 >>> async def test_run(): # doctest: +SKIP
1090 ... await _run_stdio_to_sse("cat", 9000) # doctest: +SKIP
1091 ... return True
1092 >>> asyncio.run(test_run()) # doctest: +SKIP
1093 True
1094 """
1095 pubsub = _PubSub()
1096 stdio = StdIOEndpoint(cmd, pubsub, header_mappings=header_mappings)
1097 await stdio.start()
1099 app = _build_fastapi(pubsub, stdio, keep_alive=keep_alive, sse_path=sse_path, message_path=message_path, cors_origins=cors, header_mappings=header_mappings)
1100 config = uvicorn.Config(
1101 app,
1102 host=host, # Changed from hardcoded "0.0.0.0"
1103 port=port,
1104 log_level=log_level,
1105 lifespan="off",
1106 )
1107 uvicorn_server = uvicorn.Server(config)
1109 shutting_down = asyncio.Event() # 🔄 make shutdown idempotent
1111 async def _shutdown() -> None:
1112 """Handle graceful shutdown of the stdio bridge.
1114 Performs shutdown operations in the correct order:
1115 1. Sets a flag to prevent multiple shutdown attempts
1116 2. Stops the stdio subprocess
1117 3. Shuts down the HTTP server
1119 This function is idempotent - multiple calls will only execute
1120 the shutdown sequence once.
1122 Examples:
1123 >>> import asyncio
1124 >>> async def test_shutdown():
1125 ... # Shutdown is tested as part of the main run flow
1126 ... return True
1127 >>> asyncio.run(test_shutdown())
1128 True
1129 """
1130 if shutting_down.is_set():
1131 return
1132 shutting_down.set()
1133 LOGGER.info("Shutting down ...")
1134 await stdio.stop()
1135 # Graceful shutdown by setting the shutdown event
1136 # Use getattr to safely access should_exit attribute
1137 setattr(uvicorn_server, "should_exit", getattr(uvicorn_server, "should_exit", False) or True)
1139 loop = asyncio.get_running_loop()
1140 for sig in (signal.SIGINT, signal.SIGTERM):
1141 with suppress(NotImplementedError): # Windows lacks add_signal_handler
1142 loop.add_signal_handler(sig, lambda *_: asyncio.create_task(_shutdown()))
1144 LOGGER.info(f"Bridge ready → http://{host}:{port}{sse_path}")
1145 await uvicorn_server.serve()
1146 await _shutdown() # final cleanup
1149async def _run_sse_to_stdio(url: str, oauth2_bearer: Optional[str] = None, timeout: float = 30.0, stdio_command: Optional[str] = None, max_retries: int = 5, initial_retry_delay: float = 1.0) -> None:
1150 """Run SSE to stdio bridge.
1152 Connects to a remote SSE endpoint and bridges it to local stdio.
1153 Implements proper bidirectional message flow with error handling and retries.
1155 Args:
1156 url: The SSE endpoint URL to connect to.
1157 oauth2_bearer: Optional OAuth2 bearer token for authentication. Defaults to None.
1158 timeout: HTTP client timeout in seconds. Defaults to 30.0.
1159 stdio_command: Optional command to run for local stdio processing.
1160 If not provided, will simply print SSE messages to stdout.
1161 max_retries: Maximum number of connection retry attempts. Defaults to 5.
1162 initial_retry_delay: Initial delay between retries in seconds. Defaults to 1.0.
1164 Raises:
1165 ImportError: If httpx package is not available.
1166 RuntimeError: If the subprocess fails to create stdin/stdout pipes.
1167 Exception: For any unexpected error in SSE stream processing.
1169 Examples:
1170 >>> import asyncio
1171 >>> async def test_sse():
1172 ... try:
1173 ... await _run_sse_to_stdio("http://example.com/sse", None) # doctest: +SKIP
1174 ... except ImportError as e:
1175 ... return "httpx" in str(e)
1176 >>> asyncio.run(test_sse()) # Would return True if httpx not installed # doctest: +SKIP
1177 """
1178 if not httpx:
1179 raise ImportError("httpx package is required for SSE to stdio bridging")
1181 headers = {}
1182 if oauth2_bearer:
1183 headers["Authorization"] = f"Bearer {oauth2_bearer}"
1185 # If no stdio command provided, use simple mode (just print to stdout)
1186 if not stdio_command:
1187 LOGGER.warning("No --stdioCommand provided, running in simple mode (SSE to stdout only)")
1188 # First-Party
1189 from mcpgateway.services.http_client_service import get_isolated_http_client # pylint: disable=import-outside-toplevel
1191 async with get_isolated_http_client(timeout=timeout, headers=headers, verify=DEFAULT_SSL_VERIFY, connect_timeout=10.0, write_timeout=timeout, pool_timeout=timeout) as client:
1192 await _simple_sse_pump(client, url, max_retries, initial_retry_delay)
1193 return
1195 # Start the stdio subprocess
1196 process = await asyncio.create_subprocess_exec(
1197 *shlex.split(stdio_command),
1198 stdin=asyncio.subprocess.PIPE,
1199 stdout=asyncio.subprocess.PIPE,
1200 stderr=sys.stderr,
1201 )
1203 if not process.stdin or not process.stdout:
1204 raise RuntimeError(f"Failed to create subprocess with stdin/stdout pipes for command: {stdio_command}")
1206 # Store the message endpoint URL once received
1207 message_endpoint: Optional[str] = None
1209 async def read_stdout(client: httpx.AsyncClient) -> None:
1210 """Read lines from subprocess stdout and POST to message endpoint.
1212 Continuously reads JSON-RPC requests from the subprocess stdout
1213 and POSTs them to the remote message endpoint obtained from the
1214 SSE stream's endpoint event.
1216 Args:
1217 client: The HTTP client to use for POSTing messages.
1219 Raises:
1220 RuntimeError: If the process stdout stream is not available.
1222 Examples:
1223 >>> import asyncio
1224 >>> async def test_read():
1225 ... # This is tested as part of the SSE to stdio flow
1226 ... return True
1227 >>> asyncio.run(test_read())
1228 True
1229 """
1230 if not process.stdout:
1231 raise RuntimeError("Process stdout not available")
1233 while True:
1234 if not process.stdout:
1235 raise RuntimeError("Process stdout not available")
1236 line = await process.stdout.readline()
1237 if not line:
1238 break
1240 text = line.decode().strip()
1241 if not text:
1242 continue
1244 LOGGER.debug(f"← stdio: {text}")
1246 # Wait for endpoint URL if not yet received
1247 retry_count = 0
1248 while not message_endpoint and retry_count < 30: # 30 second timeout
1249 await asyncio.sleep(1)
1250 retry_count += 1
1252 if not message_endpoint:
1253 LOGGER.error("No message endpoint received from SSE stream")
1254 continue
1256 # POST the JSON-RPC request to the message endpoint
1257 try:
1258 response = await client.post(message_endpoint, content=text, headers={"Content-Type": "application/json"})
1259 if response.status_code != 202:
1260 LOGGER.warning(f"Message endpoint returned {response.status_code}: {response.text}")
1261 except Exception as e:
1262 LOGGER.error(f"Failed to POST to message endpoint: {e}")
1264 async def pump_sse_to_stdio(client: httpx.AsyncClient) -> None:
1265 """Stream SSE data from remote endpoint to subprocess stdin.
1267 Connects to the remote SSE endpoint with retry logic and forwards
1268 message events to the subprocess stdin. Properly parses SSE events
1269 and handles endpoint, message, and keepalive event types.
1271 Args:
1272 client: The HTTP client to use for SSE streaming.
1274 Raises:
1275 HTTPStatusError: If the SSE endpoint returns a non-200 status code.
1276 Exception: For unexpected errors in SSE stream processing.
1278 Examples:
1279 >>> import asyncio
1280 >>> async def test_pump():
1281 ... # This is tested as part of the SSE to stdio flow
1282 ... return True
1283 >>> asyncio.run(test_pump())
1284 True
1285 """
1286 nonlocal message_endpoint
1287 retry_delay = initial_retry_delay
1288 retry_count = 0
1290 while retry_count < max_retries:
1291 try:
1292 LOGGER.info(f"Connecting to SSE endpoint: {url}")
1294 async with client.stream("GET", url) as response:
1295 # Check status code if available (real httpx response)
1296 if hasattr(response, "status_code") and response.status_code != 200:
1297 if httpx:
1298 raise httpx.HTTPStatusError(f"SSE endpoint returned {response.status_code}", request=response.request, response=response)
1299 raise Exception(f"SSE endpoint returned {response.status_code}")
1301 # Reset retry counter on successful connection
1302 retry_count = 0
1303 retry_delay = initial_retry_delay
1304 current_event: Optional[SSEEvent] = None
1306 async for line in response.aiter_lines():
1307 event, is_complete = SSEEvent.parse_sse_line(line, current_event)
1308 current_event = event
1310 if is_complete and current_event:
1311 LOGGER.debug(f"SSE event: {current_event.event} - {current_event.data[:100]}...")
1313 if current_event.event == "endpoint":
1314 # Store the message endpoint URL
1315 message_endpoint = current_event.data
1316 LOGGER.info(f"Received message endpoint: {message_endpoint}")
1318 elif current_event.event == "message":
1319 # Forward JSON-RPC responses to stdio
1320 if process.stdin:
1321 process.stdin.write((current_event.data + "\n").encode())
1322 await process.stdin.drain()
1323 LOGGER.debug(f"→ stdio: {current_event.data}")
1325 elif current_event.event == "keepalive":
1326 # Log keepalive but don't forward
1327 LOGGER.debug("Received keepalive")
1329 # Reset for next event
1330 current_event = None
1332 except Exception as e:
1333 # Check if it's one of the expected httpx exceptions
1334 if httpx and isinstance(e, (httpx.ConnectError, httpx.HTTPStatusError, httpx.ReadTimeout)):
1335 retry_count += 1
1336 if retry_count >= max_retries:
1337 LOGGER.error(f"Max retries ({max_retries}) exceeded. Giving up.")
1338 raise
1340 LOGGER.warning(f"Connection error: {e}. Retrying in {retry_delay}s... (attempt {retry_count}/{max_retries})")
1341 await asyncio.sleep(retry_delay)
1342 retry_delay = min(retry_delay * 2, 30) # Exponential backoff, max 30s
1343 else:
1344 LOGGER.error(f"Unexpected error in SSE stream: {e}")
1345 raise
1347 # Run both tasks concurrently
1348 # First-Party
1349 from mcpgateway.services.http_client_service import get_isolated_http_client # pylint: disable=import-outside-toplevel
1351 async with get_isolated_http_client(timeout=timeout, headers=headers, verify=DEFAULT_SSL_VERIFY, connect_timeout=10.0, write_timeout=timeout, pool_timeout=timeout) as client:
1352 try:
1353 await asyncio.gather(read_stdout(client), pump_sse_to_stdio(client))
1354 except Exception as e:
1355 LOGGER.error(f"Bridge error: {e}")
1356 raise
1357 finally:
1358 # Clean up subprocess
1359 if process.returncode is None:
1360 process.terminate()
1361 with suppress(asyncio.TimeoutError):
1362 await asyncio.wait_for(process.wait(), timeout=5)
1365async def _run_stdio_to_streamable_http(
1366 cmd: str,
1367 port: int,
1368 log_level: str = "info",
1369 cors: Optional[List[str]] = None,
1370 host: str = "127.0.0.1",
1371 stateless: bool = False,
1372 json_response: bool = False,
1373) -> None:
1374 """Run stdio to streamable HTTP bridge.
1376 Starts a subprocess and exposes it via streamable HTTP endpoint. Handles graceful
1377 shutdown on SIGINT/SIGTERM.
1379 Args:
1380 cmd: The command to run as a stdio subprocess.
1381 port: The port to bind the HTTP server to.
1382 log_level: The logging level to use. Defaults to "info".
1383 cors: Optional list of CORS allowed origins.
1384 host: The host interface to bind to. Defaults to "127.0.0.1" for security.
1385 stateless: Whether to use stateless mode for streamable HTTP. Defaults to False.
1386 json_response: Whether to return JSON responses instead of SSE streams. Defaults to False.
1388 Raises:
1389 ImportError: If MCP server components are not available.
1390 RuntimeError: If subprocess fails to create stdin/stdout pipes.
1392 Examples:
1393 >>> import asyncio
1394 >>> async def test_streamable_http():
1395 ... # Would start a real subprocess and HTTP server
1396 ... cmd = "echo hello"
1397 ... port = 9000
1398 ... # This would normally run the server
1399 ... return True
1400 >>> asyncio.run(test_streamable_http())
1401 True
1402 """
1403 # MCP components are available, proceed with setup
1405 LOGGER.info(f"Starting stdio to streamable HTTP bridge for command: {cmd}")
1407 # Create a simple MCP server that will proxy to stdio subprocess
1408 mcp_server = MCPServer(name="stdio-proxy")
1410 # Create subprocess for stdio communication
1411 process = await asyncio.create_subprocess_exec(
1412 *shlex.split(cmd),
1413 stdin=asyncio.subprocess.PIPE,
1414 stdout=asyncio.subprocess.PIPE,
1415 stderr=sys.stderr,
1416 )
1418 if not process.stdin or not process.stdout:
1419 raise RuntimeError(f"Failed to create subprocess with stdin/stdout pipes for command: {cmd}")
1421 # Set up the streamable HTTP session manager with the server
1422 session_manager = StreamableHTTPSessionManager(
1423 app=mcp_server,
1424 stateless=stateless,
1425 json_response=json_response,
1426 )
1428 # Create Starlette app to host the streamable HTTP endpoint
1429 async def handle_mcp(request: Request) -> None:
1430 """Handle MCP requests via streamable HTTP.
1432 Args:
1433 request: The incoming HTTP request from Starlette.
1435 Examples:
1436 >>> async def test_handle():
1437 ... # Mock request handling
1438 ... class MockRequest:
1439 ... scope = {"type": "http"}
1440 ... async def receive(self): return {}
1441 ... async def send(self, msg): return None
1442 ... req = MockRequest()
1443 ... # Would handle the request via session manager
1444 ... return req is not None
1445 >>> import asyncio
1446 >>> asyncio.run(test_handle())
1447 True
1448 """
1449 # The session manager handles all the protocol details - Note: I don't like accessing _send directly -JPS
1450 await session_manager.handle_request(request.scope, request.receive, request._send) # pylint: disable=W0212
1452 routes = [
1453 Route("/mcp", handle_mcp, methods=["GET", "POST"]),
1454 Route("/healthz", lambda request: PlainTextResponse("ok"), methods=["GET"]),
1455 ]
1457 app = Starlette(routes=routes)
1459 # Add CORS middleware if specified
1460 if cors:
1461 app.add_middleware(
1462 CORSMiddleware,
1463 allow_origins=cors,
1464 allow_credentials=True,
1465 allow_methods=["*"],
1466 allow_headers=["*"],
1467 )
1469 # Run the server with Uvicorn
1470 config = uvicorn.Config(
1471 app,
1472 host=host,
1473 port=port,
1474 log_level=log_level,
1475 lifespan="off",
1476 )
1477 uvicorn_server = uvicorn.Server(config)
1479 shutting_down = asyncio.Event()
1481 async def _shutdown() -> None:
1482 """Handle graceful shutdown of the streamable HTTP bridge."""
1483 if shutting_down.is_set():
1484 return
1485 shutting_down.set()
1486 LOGGER.info("Shutting down streamable HTTP bridge...")
1487 if process.returncode is None:
1488 process.terminate()
1489 with suppress(asyncio.TimeoutError):
1490 await asyncio.wait_for(process.wait(), 5)
1491 # Graceful shutdown by setting the shutdown event
1492 # Use getattr to safely access should_exit attribute
1493 setattr(uvicorn_server, "should_exit", getattr(uvicorn_server, "should_exit", False) or True)
1495 loop = asyncio.get_running_loop()
1496 for sig in (signal.SIGINT, signal.SIGTERM):
1497 with suppress(NotImplementedError): # Windows lacks add_signal_handler
1498 loop.add_signal_handler(sig, lambda *_: asyncio.create_task(_shutdown()))
1500 # Pump messages between stdio and HTTP
1501 async def pump_stdio_to_http() -> None:
1502 """Forward messages from subprocess stdout to HTTP responses.
1504 Examples:
1505 >>> async def test():
1506 ... # This would pump messages in real usage
1507 ... return True
1508 >>> import asyncio
1509 >>> asyncio.run(test())
1510 True
1511 """
1512 while True:
1513 try:
1514 if not process.stdout:
1515 raise RuntimeError("Process stdout not available")
1516 line = await process.stdout.readline()
1517 if not line:
1518 break
1519 # The session manager will handle routing to appropriate HTTP responses
1520 # This would need proper integration with session_manager's internal queue
1521 LOGGER.debug(f"Received from subprocess: {line.decode().strip()}")
1522 except Exception as e:
1523 LOGGER.error(f"Error reading from subprocess: {e}")
1524 break
1526 async def pump_http_to_stdio(data: str) -> None:
1527 """Forward HTTP requests to subprocess stdin.
1529 Args:
1530 data: The data string to send to subprocess stdin.
1532 Examples:
1533 >>> async def test_pump():
1534 ... # Would pump data to subprocess
1535 ... data = '{"method": "test"}'
1536 ... # In real use, would write to process.stdin
1537 ... return len(data) > 0
1538 >>> import asyncio
1539 >>> asyncio.run(test_pump())
1540 True
1541 """
1542 if not process.stdin:
1543 raise RuntimeError("Process stdin not available")
1544 process.stdin.write(data.encode() + b"\n")
1545 await process.stdin.drain()
1547 # Note: pump_http_to_stdio will be used when stdio-to-HTTP bridge is fully implemented
1548 _ = pump_http_to_stdio
1550 # Start the pump task
1551 pump_task = asyncio.create_task(pump_stdio_to_http())
1553 try:
1554 LOGGER.info(f"Streamable HTTP bridge ready → http://{host}:{port}/mcp")
1555 await uvicorn_server.serve()
1556 finally:
1557 pump_task.cancel()
1558 await _shutdown()
1561async def _run_streamable_http_to_stdio(
1562 url: str,
1563 oauth2_bearer: Optional[str] = None,
1564 timeout: float = 30.0,
1565 stdio_command: Optional[str] = None,
1566 max_retries: int = 5,
1567 initial_retry_delay: float = 1.0,
1568) -> None:
1569 """Run streamable HTTP to stdio bridge.
1571 Connects to a remote streamable HTTP endpoint and bridges it to local stdio.
1572 Implements proper bidirectional message flow with error handling and retries.
1574 Args:
1575 url: The streamable HTTP endpoint URL to connect to.
1576 oauth2_bearer: Optional OAuth2 bearer token for authentication. Defaults to None.
1577 timeout: HTTP client timeout in seconds. Defaults to 30.0.
1578 stdio_command: Optional command to run for local stdio processing.
1579 If not provided, will simply print messages to stdout.
1580 max_retries: Maximum number of connection retry attempts. Defaults to 5.
1581 initial_retry_delay: Initial delay between retries in seconds. Defaults to 1.0.
1583 Raises:
1584 ImportError: If httpx package is not available.
1585 RuntimeError: If the subprocess fails to create stdin/stdout pipes.
1586 Exception: For any unexpected error during bridging operations.
1587 """
1588 if not httpx:
1589 raise ImportError("httpx package is required for streamable HTTP to stdio bridging")
1591 headers = {}
1592 if oauth2_bearer:
1593 headers["Authorization"] = f"Bearer {oauth2_bearer}"
1595 # Ensure URL ends with /mcp if not already
1596 if not url.endswith("/mcp"):
1597 url = url.rstrip("/") + "/mcp"
1599 # If no stdio command provided, use simple mode (just print to stdout)
1600 if not stdio_command:
1601 LOGGER.warning("No --stdioCommand provided, running in simple mode (streamable HTTP to stdout only)")
1602 # First-Party
1603 from mcpgateway.services.http_client_service import get_isolated_http_client # pylint: disable=import-outside-toplevel
1605 async with get_isolated_http_client(timeout=timeout, headers=headers, verify=DEFAULT_SSL_VERIFY, connect_timeout=10.0, write_timeout=timeout, pool_timeout=timeout) as client:
1606 await _simple_streamable_http_pump(client, url, max_retries, initial_retry_delay)
1607 return
1609 # Start the stdio subprocess
1610 process = await asyncio.create_subprocess_exec(
1611 *shlex.split(stdio_command),
1612 stdin=asyncio.subprocess.PIPE,
1613 stdout=asyncio.subprocess.PIPE,
1614 stderr=sys.stderr,
1615 )
1617 if not process.stdin or not process.stdout:
1618 raise RuntimeError(f"Failed to create subprocess with stdin/stdout pipes for command: {stdio_command}")
1620 async def read_stdout(client: httpx.AsyncClient) -> None:
1621 """Read lines from subprocess stdout and POST to streamable HTTP endpoint.
1623 Args:
1624 client: The HTTP client to use for POSTing messages.
1626 Raises:
1627 RuntimeError: If the process stdout stream is not available.
1628 """
1629 if not process.stdout:
1630 raise RuntimeError("Process stdout not available")
1632 while True:
1633 if not process.stdout:
1634 raise RuntimeError("Process stdout not available")
1635 line = await process.stdout.readline()
1636 if not line:
1637 break
1639 text = line.decode().strip()
1640 if not text:
1641 continue
1643 LOGGER.debug(f"← stdio: {text}")
1645 # POST the JSON-RPC request to the streamable HTTP endpoint
1646 try:
1647 if CONTENT_TYPE == "application/x-www-form-urlencoded":
1648 # If text is JSON, parse and encode as form
1649 try:
1650 payload = orjson.loads(text)
1651 body = urlencode(payload)
1652 except Exception:
1653 body = text
1654 response = await client.post(url, content=body, headers=headers)
1655 else:
1656 response = await client.post(url, content=text, headers=headers)
1657 if response.status_code == 200:
1658 # Handle JSON response
1659 response_data = response.text
1660 if response_data and process.stdin:
1661 process.stdin.write((response_data + "\n").encode())
1662 await process.stdin.drain()
1663 LOGGER.debug(f"→ stdio: {response_data}")
1664 else:
1665 LOGGER.warning(f"Streamable HTTP endpoint returned {response.status_code}: {response.text}")
1666 except Exception as e:
1667 LOGGER.error(f"Failed to POST to streamable HTTP endpoint: {e}")
1669 async def pump_streamable_http_to_stdio(client: httpx.AsyncClient) -> None:
1670 """Stream data from remote streamable HTTP endpoint to subprocess stdin.
1672 Args:
1673 client: The HTTP client to use for streamable HTTP streaming.
1675 Raises:
1676 httpx.HTTPStatusError: If the streamable HTTP endpoint returns a non-200 status code.
1677 Exception: For unexpected errors in streamable HTTP stream processing.
1678 """
1679 retry_delay = initial_retry_delay
1680 retry_count = 0
1682 while retry_count < max_retries:
1683 try:
1684 LOGGER.info(f"Connecting to streamable HTTP endpoint: {url}")
1686 # For streamable HTTP, we need to handle both SSE streams and JSON responses
1687 # Try SSE first (for stateful sessions or when SSE is preferred)
1688 async with client.stream("GET", url, headers={"Accept": "text/event-stream"}) as response:
1689 if response.status_code != 200:
1690 if httpx:
1691 raise httpx.HTTPStatusError(f"Streamable HTTP endpoint returned {response.status_code}", request=response.request, response=response)
1692 raise Exception(f"Streamable HTTP endpoint returned {response.status_code}")
1694 # Reset retry counter on successful connection
1695 retry_count = 0
1696 retry_delay = initial_retry_delay
1698 async for line in response.aiter_lines():
1699 if line.startswith("data: "):
1700 data = line[6:] # Remove "data: " prefix
1701 if data and process.stdin:
1702 process.stdin.write((data + "\n").encode())
1703 await process.stdin.drain()
1704 LOGGER.debug(f"→ stdio: {data}")
1706 except Exception as e:
1707 if httpx and isinstance(e, (httpx.ConnectError, httpx.HTTPStatusError, httpx.ReadTimeout)):
1708 retry_count += 1
1709 if retry_count >= max_retries:
1710 LOGGER.error(f"Max retries ({max_retries}) exceeded. Giving up.")
1711 raise
1713 LOGGER.warning(f"Connection error: {e}. Retrying in {retry_delay}s... (attempt {retry_count}/{max_retries})")
1714 await asyncio.sleep(retry_delay)
1715 retry_delay = min(retry_delay * 2, 30) # Exponential backoff, max 30s
1716 else:
1717 LOGGER.error(f"Unexpected error in streamable HTTP stream: {e}")
1718 raise
1720 # Run both tasks concurrently
1721 # First-Party
1722 from mcpgateway.services.http_client_service import get_isolated_http_client # pylint: disable=import-outside-toplevel
1724 async with get_isolated_http_client(timeout=timeout, headers=headers, verify=DEFAULT_SSL_VERIFY, connect_timeout=10.0, write_timeout=timeout, pool_timeout=timeout) as client:
1725 try:
1726 await asyncio.gather(read_stdout(client), pump_streamable_http_to_stdio(client))
1727 except Exception as e:
1728 LOGGER.error(f"Bridge error: {e}")
1729 raise
1730 finally:
1731 # Clean up subprocess
1732 if process.returncode is None:
1733 process.terminate()
1734 with suppress(asyncio.TimeoutError):
1735 await asyncio.wait_for(process.wait(), timeout=5)
1738async def _simple_streamable_http_pump(client: "Any", url: str, max_retries: int, initial_retry_delay: float) -> None:
1739 """Simple streamable HTTP pump that just prints messages to stdout.
1741 Used when no stdio command is provided to bridge streamable HTTP to stdout directly.
1743 Args:
1744 client: The HTTP client to use for streamable HTTP streaming.
1745 url: The streamable HTTP endpoint URL to connect to.
1746 max_retries: Maximum number of connection retry attempts.
1747 initial_retry_delay: Initial delay between retries in seconds.
1749 Raises:
1750 Exception: For unexpected errors in streamable HTTP stream processing including
1751 HTTPStatusError if the endpoint returns a non-200 status code.
1752 """
1753 retry_delay = initial_retry_delay
1754 retry_count = 0
1756 while retry_count < max_retries:
1757 try:
1758 LOGGER.info(f"Connecting to streamable HTTP endpoint: {url}")
1760 # Try to get SSE stream
1761 async with client.stream("GET", url, headers={"Accept": "text/event-stream"}) as response:
1762 if response.status_code != 200:
1763 if httpx:
1764 raise httpx.HTTPStatusError(f"Streamable HTTP endpoint returned {response.status_code}", request=response.request, response=response)
1765 raise Exception(f"Streamable HTTP endpoint returned {response.status_code}")
1767 # Reset retry counter on successful connection
1768 retry_count = 0
1769 retry_delay = initial_retry_delay
1771 async for line in response.aiter_lines():
1772 if line.startswith("data: "):
1773 data = line[6:] # Remove "data: " prefix
1774 if data:
1775 print(data)
1776 LOGGER.debug(f"Received: {data}")
1778 except Exception as e:
1779 if httpx and isinstance(e, (httpx.ConnectError, httpx.HTTPStatusError, httpx.ReadTimeout)):
1780 retry_count += 1
1781 if retry_count >= max_retries:
1782 LOGGER.error(f"Max retries ({max_retries}) exceeded. Giving up.")
1783 raise
1785 LOGGER.warning(f"Connection error: {e}. Retrying in {retry_delay}s... (attempt {retry_count}/{max_retries})")
1786 await asyncio.sleep(retry_delay)
1787 retry_delay = min(retry_delay * 2, 30) # Exponential backoff, max 30s
1788 else:
1789 LOGGER.error(f"Unexpected error in streamable HTTP stream: {e}")
1790 raise
1793async def _run_multi_protocol_server( # pylint: disable=too-many-positional-arguments
1794 cmd: str,
1795 port: int,
1796 log_level: str = "info",
1797 cors: Optional[List[str]] = None,
1798 host: str = "127.0.0.1",
1799 expose_sse: bool = False,
1800 expose_streamable_http: bool = False,
1801 sse_path: str = "/sse",
1802 message_path: str = "/message",
1803 keep_alive: float = KEEP_ALIVE_INTERVAL,
1804 stateless: bool = False,
1805 json_response: bool = False,
1806 header_mappings: Optional[NormalizedMappings] = None,
1807) -> None:
1808 """Run a stdio server and expose it via multiple protocols simultaneously.
1810 Args:
1811 cmd: The command to run as a stdio subprocess.
1812 port: The port to bind the HTTP server to.
1813 log_level: The logging level to use. Defaults to "info".
1814 cors: Optional list of CORS allowed origins.
1815 host: The host interface to bind to. Defaults to "127.0.0.1".
1816 expose_sse: Whether to expose via SSE protocol.
1817 expose_streamable_http: Whether to expose via streamable HTTP protocol.
1818 sse_path: Path for SSE endpoint. Defaults to "/sse".
1819 message_path: Path for message endpoint. Defaults to "/message".
1820 keep_alive: Keep-alive interval for SSE. Defaults to KEEP_ALIVE_INTERVAL.
1821 stateless: Whether to use stateless mode for streamable HTTP.
1822 json_response: Whether to return JSON responses for streamable HTTP.
1823 header_mappings: Optional mapping of HTTP headers to environment variables.
1824 """
1825 LOGGER.info(f"Starting multi-protocol server for command: {cmd}")
1826 LOGGER.info(f"Protocols: SSE={expose_sse}, StreamableHTTP={expose_streamable_http}")
1828 # Create a shared pubsub whenever either protocol needs stdout observations
1829 pubsub = _PubSub() if (expose_sse or expose_streamable_http) else None
1831 # Create the stdio endpoint
1832 stdio = StdIOEndpoint(cmd, pubsub, header_mappings=header_mappings) if (expose_sse or expose_streamable_http) and pubsub else None
1834 # Create fastapi app and middleware
1835 app = FastAPI()
1837 # Add CORS middleware if specified
1838 if cors:
1839 app.add_middleware(
1840 CORSMiddleware,
1841 allow_origins=cors,
1842 allow_credentials=True,
1843 allow_methods=["*"],
1844 allow_headers=["*"],
1845 )
1847 # Start stdio if at least one transport requires it
1848 if stdio:
1849 await stdio.start()
1851 # SSE endpoints
1852 if expose_sse and stdio and pubsub:
1854 @app.get(sse_path)
1855 async def get_sse(request: Request) -> EventSourceResponse:
1856 """SSE endpoint.
1858 Args:
1859 request: The incoming HTTP request.
1861 Returns:
1862 EventSourceResponse: Server-sent events stream.
1863 """
1864 if not pubsub:
1865 raise RuntimeError("PubSub not available")
1867 # Extract environment variables from headers if dynamic env is enabled
1868 additional_env_vars = {}
1869 if header_mappings and stdio:
1870 request_headers = dict(request.headers)
1871 additional_env_vars = extract_env_vars_from_headers(request_headers, header_mappings)
1873 # Restart stdio endpoint with new environment variables
1874 if additional_env_vars:
1875 LOGGER.info(f"Restarting stdio endpoint with {len(additional_env_vars)} environment variables")
1876 await stdio.stop() # Stop existing process
1877 await stdio.start(additional_env_vars) # Start with new env vars
1879 queue = pubsub.subscribe()
1880 session_id = uuid.uuid4().hex
1882 async def event_gen() -> AsyncIterator[Dict[str, Any]]:
1883 """Generate SSE events for the client.
1885 Yields:
1886 Dict[str, Any]: SSE event data with event type and payload.
1887 """
1888 endpoint_url = f"{str(request.base_url).rstrip('/')}{message_path}?session_id={session_id}"
1889 yield {
1890 "event": "endpoint",
1891 "data": endpoint_url,
1892 "retry": int(keep_alive * 1000),
1893 }
1895 if DEFAULT_KEEPALIVE_ENABLED:
1896 yield {"event": "keepalive", "data": "{}", "retry": keep_alive * 1000}
1898 try:
1899 while True:
1900 if await request.is_disconnected():
1901 break
1903 try:
1904 timeout = keep_alive if DEFAULT_KEEPALIVE_ENABLED else None
1905 msg = await asyncio.wait_for(queue.get(), timeout)
1906 yield {"event": "message", "data": msg.rstrip()}
1907 except asyncio.TimeoutError:
1908 if DEFAULT_KEEPALIVE_ENABLED:
1909 yield {
1910 "event": "keepalive",
1911 "data": "{}",
1912 "retry": keep_alive * 1000,
1913 }
1914 finally:
1915 if pubsub:
1916 pubsub.unsubscribe(queue)
1918 return EventSourceResponse(
1919 event_gen(),
1920 headers={
1921 "Cache-Control": "no-cache",
1922 "Connection": "keep-alive",
1923 "X-Accel-Buffering": "no",
1924 },
1925 )
1927 @app.post(message_path, status_code=status.HTTP_202_ACCEPTED)
1928 async def post_message(raw: Request, session_id: str | None = None) -> Response:
1929 """Message endpoint for SSE.
1931 Args:
1932 raw: The incoming HTTP request.
1933 session_id: Optional session ID for correlation.
1935 Returns:
1936 Response: Acknowledgement of message receipt.
1937 """
1938 _ = session_id
1940 # Extract environment variables from headers if dynamic env is enabled
1941 additional_env_vars = {}
1942 if header_mappings and stdio:
1943 request_headers = dict(raw.headers)
1944 additional_env_vars = extract_env_vars_from_headers(request_headers, header_mappings)
1946 # Only restart if we have new environment variables
1947 if additional_env_vars:
1948 LOGGER.info(f"Restarting stdio endpoint with {len(additional_env_vars)} environment variables")
1949 await stdio.stop() # Stop existing process
1950 await stdio.start(additional_env_vars) # Start with new env vars
1951 await asyncio.sleep(0.5) # Give process time to initialize
1953 # Ensure stdio endpoint is running
1954 if stdio and not stdio.is_running():
1955 LOGGER.info("Starting stdio endpoint (was not running)")
1956 await stdio.start()
1957 await asyncio.sleep(0.5) # Give process time to initialize
1959 payload = await raw.body()
1960 try:
1961 orjson.loads(payload)
1962 except Exception as exc:
1963 return PlainTextResponse(
1964 f"Invalid JSON payload: {exc}",
1965 status_code=status.HTTP_400_BAD_REQUEST,
1966 )
1967 if not stdio:
1968 raise RuntimeError("Stdio endpoint not available")
1969 await stdio.send(payload.decode().rstrip() + "\n")
1970 return PlainTextResponse("forwarded", status_code=status.HTTP_202_ACCEPTED)
1972 # Add health check
1973 @app.get("/healthz")
1974 async def health() -> Response:
1975 """Health check endpoint.
1977 Returns:
1978 Response: Health status response.
1979 """
1980 return PlainTextResponse("ok")
1982 # Streamable HTTP support
1983 streamable_server = None
1984 streamable_manager = None
1985 streamable_context = None
1987 # Keep a reference to the original FastAPI app so we can wrap it with an ASGI
1988 # layer that delegates `/mcp` scopes to the StreamableHTTPSessionManager if present.
1989 original_app = app
1991 if expose_streamable_http:
1992 # Create an MCP server instance
1993 streamable_server = MCPServer("stdio-proxy")
1995 # Set up the streamable HTTP session manager
1996 streamable_manager = StreamableHTTPSessionManager(
1997 app=streamable_server,
1998 stateless=stateless,
1999 json_response=json_response,
2000 )
2002 # Register POST /mcp on the FastAPI app as the canonical client->server POST
2003 # path for Streamable HTTP. This forwards JSON-RPC notifications/requests to stdio.
2004 @original_app.post("/mcp")
2005 async def mcp_post(request: Request) -> Response:
2006 """
2007 Handles POST requests to the `/mcp` endpoint, forwarding JSON payloads to stdio
2008 and optionally waiting for a correlated response.
2010 The request body is expected to be a JSON object or newline-delimited JSON.
2011 If the JSON includes an "id" field, the function attempts to match it with
2012 a response from stdio using a pubsub queue, within a timeout period.
2014 Args:
2015 request (Request): The incoming FastAPI request containing the JSON payload.
2017 Returns:
2018 Response: A FastAPI Response object.
2019 - 200 OK with matched JSON response if correlation succeeds.
2020 - 202 Accepted if no matching response is received in time or for notifications.
2021 - 400 Bad Request if the payload is not valid JSON.
2023 Example:
2024 >>> import httpx
2025 >>> response = httpx.post("http://localhost:8000/mcp", json={"id": 123, "method": "ping"})
2026 >>> response.status_code in (200, 202)
2027 True
2028 >>> response.text # May be the matched JSON or "accepted"
2029 '{"id": 123, "result": "pong"}' # or "accepted"
2030 """
2031 # Read and validate JSON
2032 body = await request.body()
2033 try:
2034 obj = orjson.loads(body)
2035 except Exception as exc:
2036 return PlainTextResponse(f"Invalid JSON payload: {exc}", status_code=status.HTTP_400_BAD_REQUEST)
2038 # Forward raw newline-delimited JSON to stdio
2039 if not stdio:
2040 raise RuntimeError("Stdio endpoint not available")
2041 await stdio.send(body.decode().rstrip() + "\n")
2043 # If it's a request (has an id) -> attempt to correlate response from stdio
2044 if isinstance(obj, dict) and "id" in obj:
2045 if not pubsub:
2046 return PlainTextResponse("accepted", status_code=status.HTTP_202_ACCEPTED)
2048 queue = pubsub.subscribe()
2049 try:
2050 timeout = 10.0 # seconds; tuneable
2051 deadline = asyncio.get_event_loop().time() + timeout
2052 while True:
2053 remaining = max(0.0, deadline - asyncio.get_event_loop().time())
2054 if remaining == 0:
2055 break
2056 try:
2057 msg = await asyncio.wait_for(queue.get(), timeout=remaining)
2058 except asyncio.TimeoutError:
2059 break
2061 # stdio stdout lines may contain JSON objects or arrays
2062 try:
2063 parsed = orjson.loads(msg)
2064 except (orjson.JSONDecodeError, ValueError):
2065 # not JSON -> skip
2066 continue
2068 candidates = parsed if isinstance(parsed, list) else [parsed]
2069 for candidate in candidates:
2070 if isinstance(candidate, dict) and candidate.get("id") == obj.get("id"):
2071 # return the matched response as JSON
2072 return ORJSONResponse(candidate)
2074 # timeout -> accept and return 202
2075 return PlainTextResponse("accepted (no response yet)", status_code=status.HTTP_202_ACCEPTED)
2076 finally:
2077 if pubsub:
2078 pubsub.unsubscribe(queue)
2080 # Notification -> return 202
2081 return PlainTextResponse("accepted", status_code=status.HTTP_202_ACCEPTED)
2083 # ASGI wrapper to route GET/other /mcp scopes to streamable_manager.handle_request
2084 async def mcp_asgi_wrapper(scope: Scope, receive: Receive, send: Send) -> None:
2085 """
2086 ASGI middleware that intercepts HTTP requests to the `/mcp` endpoint.
2088 If the request is an HTTP call to `/mcp` and a `streamable_manager` is available,
2089 it can handle the request (currently commented out). All other requests are
2090 passed to the original FastAPI application.
2092 Args:
2093 scope (Scope): The ASGI scope dictionary containing request metadata.
2094 receive (Receive): An awaitable that yields incoming ASGI events.
2095 send (Send): An awaitable used to send ASGI events.
2096 """
2097 if scope.get("type") == "http" and scope.get("path") == "/mcp" and streamable_manager:
2098 # Let StreamableHTTPSessionManager handle session-oriented streaming
2099 # await streamable_manager.handle_request(scope, receive, send)
2100 await original_app(scope, receive, send)
2101 else:
2102 # Delegate everything else to the original FastAPI app
2103 await original_app(scope, receive, send)
2105 # Replace the app used by uvicorn with the ASGI wrapper
2106 app = mcp_asgi_wrapper # type: ignore[assignment]
2108 # ---------------------- Server lifecycle ----------------------
2109 config = uvicorn.Config(
2110 app,
2111 host=host,
2112 port=port,
2113 log_level=log_level,
2114 lifespan="off",
2115 )
2116 server = uvicorn.Server(config)
2118 shutting_down = asyncio.Event()
2120 async def _shutdown() -> None:
2121 """Handle graceful shutdown."""
2122 if shutting_down.is_set():
2123 return
2124 shutting_down.set()
2125 LOGGER.info("Shutting down multi-protocol server...")
2126 if stdio:
2127 await stdio.stop()
2128 # Streamable HTTP cleanup handled by server shutdown
2129 # Graceful shutdown by setting the shutdown event
2130 # Use getattr to safely access should_exit attribute
2131 setattr(server, "should_exit", getattr(server, "should_exit", False) or True)
2133 loop = asyncio.get_running_loop()
2134 for sig in (signal.SIGINT, signal.SIGTERM):
2135 with suppress(NotImplementedError):
2136 loop.add_signal_handler(sig, lambda *_: asyncio.create_task(_shutdown()))
2138 # If we have a streamable manager, start its context so it can accept ASGI /mcp
2139 if streamable_manager:
2140 streamable_context = streamable_manager.run()
2141 await streamable_context.__aenter__() # pylint: disable=unnecessary-dunder-call,no-member
2143 # Log available endpoints
2144 endpoints = []
2145 if expose_sse:
2146 endpoints.append(f"SSE: http://{host}:{port}{sse_path}")
2147 if expose_streamable_http:
2148 endpoints.append(f"StreamableHTTP: http://{host}:{port}/mcp")
2150 LOGGER.info(f"Multi-protocol server ready → {', '.join(endpoints)}")
2152 try:
2153 await server.serve()
2154 finally:
2155 await _shutdown()
2156 # Clean up streamable HTTP context with timeout to prevent spin loop
2157 # if tasks don't respond to cancellation (anyio _deliver_cancellation issue)
2158 if streamable_context:
2159 # Get cleanup timeout from config (with fallback for standalone usage)
2160 try:
2161 # First-Party
2162 from mcpgateway.config import settings as cfg # pylint: disable=import-outside-toplevel
2164 cleanup_timeout = cfg.mcp_session_pool_cleanup_timeout
2165 except Exception:
2166 cleanup_timeout = 5.0
2167 # Use anyio.move_on_after instead of asyncio.wait_for to properly propagate
2168 # cancellation through anyio's cancel scope system (prevents orphaned spinning tasks)
2169 with anyio.move_on_after(cleanup_timeout) as cleanup_scope:
2170 try:
2171 await streamable_context.__aexit__(None, None, None) # pylint: disable=unnecessary-dunder-call,no-member
2172 except Exception as e:
2173 LOGGER.debug(f"Error cleaning up streamable HTTP context: {e}")
2174 if cleanup_scope.cancelled_caught:
2175 LOGGER.warning("Streamable HTTP context cleanup timed out - proceeding anyway")
2178async def _simple_sse_pump(client: "Any", url: str, max_retries: int, initial_retry_delay: float) -> None:
2179 """Simple SSE pump that just prints messages to stdout.
2181 Used when no stdio command is provided to bridge SSE to stdout directly.
2183 Args:
2184 client: The HTTP client to use for SSE streaming.
2185 url: The SSE endpoint URL to connect to.
2186 max_retries: Maximum number of connection retry attempts.
2187 initial_retry_delay: Initial delay between retries in seconds.
2189 Raises:
2190 HTTPStatusError: If the SSE endpoint returns a non-200 status code.
2191 Exception: For unexpected errors in SSE stream processing.
2192 """
2193 retry_delay = initial_retry_delay
2194 retry_count = 0
2196 while retry_count < max_retries:
2197 try:
2198 LOGGER.info(f"Connecting to SSE endpoint: {url}")
2200 async with client.stream("GET", url) as response:
2201 # Check status code if available (real httpx response)
2202 if hasattr(response, "status_code") and response.status_code != 200:
2203 if httpx:
2204 raise httpx.HTTPStatusError(f"SSE endpoint returned {response.status_code}", request=response.request, response=response)
2205 raise Exception(f"SSE endpoint returned {response.status_code}")
2207 # Reset retry counter on successful connection
2208 retry_count = 0
2209 retry_delay = initial_retry_delay
2210 current_event: Optional[SSEEvent] = None
2212 async for line in response.aiter_lines():
2213 event, is_complete = SSEEvent.parse_sse_line(line, current_event)
2214 current_event = event
2216 if is_complete and current_event:
2217 if current_event.event == "endpoint":
2218 LOGGER.info(f"Received message endpoint: {current_event.data}")
2219 elif current_event.event == "message":
2220 # Just print the message to stdout
2221 print(current_event.data)
2222 elif current_event.event == "keepalive":
2223 LOGGER.debug("Received keepalive")
2225 # Reset for next event
2226 current_event = None
2228 except Exception as e:
2229 # Check if it's one of the expected httpx exceptions
2230 if httpx and isinstance(e, (httpx.ConnectError, httpx.HTTPStatusError, httpx.ReadTimeout)):
2231 retry_count += 1
2232 if retry_count >= max_retries:
2233 LOGGER.error(f"Max retries ({max_retries}) exceeded. Giving up.")
2234 raise
2236 LOGGER.warning(f"Connection error: {e}. Retrying in {retry_delay}s... (attempt {retry_count}/{max_retries})")
2237 await asyncio.sleep(retry_delay)
2238 retry_delay = min(retry_delay * 2, 30) # Exponential backoff, max 30s
2239 else:
2240 LOGGER.error(f"Unexpected error in SSE stream: {e}")
2241 raise
2244def start_streamable_http_stdio(
2245 cmd: str,
2246 port: int,
2247 log_level: str,
2248 cors: Optional[List[str]],
2249 host: str = "127.0.0.1",
2250 stateless: bool = False,
2251 json_response: bool = False,
2252) -> None:
2253 """Start stdio to streamable HTTP bridge.
2255 Entry point for starting a stdio to streamable HTTP bridge server.
2257 Args:
2258 cmd: The command to run as a stdio subprocess.
2259 port: The port to bind the HTTP server to.
2260 log_level: The logging level to use.
2261 cors: Optional list of CORS allowed origins.
2262 host: The host interface to bind to. Defaults to "127.0.0.1".
2263 stateless: Whether to use stateless mode. Defaults to False.
2264 json_response: Whether to return JSON responses. Defaults to False.
2266 Returns:
2267 None: This function does not return a value.
2268 """
2269 return asyncio.run(_run_stdio_to_streamable_http(cmd, port, log_level, cors, host, stateless, json_response))
2272def start_streamable_http_client(url: str, bearer_token: Optional[str] = None, timeout: float = 30.0, stdio_command: Optional[str] = None) -> None:
2273 """Start streamable HTTP to stdio bridge.
2275 Entry point for starting a streamable HTTP to stdio bridge client.
2277 Args:
2278 url: The streamable HTTP endpoint URL to connect to.
2279 bearer_token: Optional OAuth2 bearer token for authentication. Defaults to None.
2280 timeout: HTTP client timeout in seconds. Defaults to 30.0.
2281 stdio_command: Optional command to run for local stdio processing.
2283 Returns:
2284 None: This function does not return a value.
2285 """
2286 return asyncio.run(_run_streamable_http_to_stdio(url, bearer_token, timeout, stdio_command))
2289def start_stdio(
2290 cmd: str, port: int, log_level: str, cors: Optional[List[str]], host: str = "127.0.0.1", sse_path: str = "/sse", message_path: str = "/message", keep_alive: float = KEEP_ALIVE_INTERVAL
2291) -> None:
2292 """Start stdio bridge.
2294 Entry point for starting a stdio to SSE bridge server.
2296 Args:
2297 cmd: The command to run as a stdio subprocess.
2298 port: The port to bind the HTTP server to.
2299 log_level: The logging level to use.
2300 cors: Optional list of CORS allowed origins.
2301 host: The host interface to bind to. Defaults to "127.0.0.1".
2302 sse_path: Path for the SSE endpoint. Defaults to "/sse".
2303 message_path: Path for the message endpoint. Defaults to "/message".
2304 keep_alive: Keep-alive interval in seconds. Defaults to KEEP_ALIVE_INTERVAL.
2306 Returns:
2307 None: This function does not return a value.
2309 Examples:
2310 >>> # Test parameter validation
2311 >>> isinstance(KEEP_ALIVE_INTERVAL, int)
2312 True
2313 >>> KEEP_ALIVE_INTERVAL > 0
2314 True
2315 >>> start_stdio("uvx mcp-server-git", 9000, "info", None) # doctest: +SKIP
2316 """
2317 return asyncio.run(_run_stdio_to_sse(cmd, port, log_level, cors, host, sse_path, message_path, keep_alive))
2320def start_sse(url: str, bearer_token: Optional[str] = None, timeout: float = 30.0, stdio_command: Optional[str] = None) -> None:
2321 """Start SSE bridge.
2323 Entry point for starting an SSE to stdio bridge client.
2325 Examples:
2326 >>> # Test parameter defaults
2327 >>> timeout_default = 30.0
2328 >>> isinstance(timeout_default, float)
2329 True
2330 >>> timeout_default > 0
2331 True
2333 Args:
2334 url: The SSE endpoint URL to connect to.
2335 bearer_token: Optional OAuth2 bearer token for authentication. Defaults to None.
2336 timeout: HTTP client timeout in seconds. Defaults to 30.0.
2337 stdio_command: Optional command to run for local stdio processing.
2339 Returns:
2340 None: This function does not return a value.
2342 Examples:
2343 >>> start_sse("http://example.com/sse", "token123") # doctest: +SKIP
2344 """
2345 return asyncio.run(_run_sse_to_stdio(url, bearer_token, timeout, stdio_command))
2348def main(argv: Optional[Sequence[str]] | None = None) -> None:
2349 """Entry point for the translate module.
2351 Configures logging, parses arguments, and starts the appropriate bridge
2352 based on command line options. Handles keyboard interrupts gracefully.
2354 Args:
2355 argv: Optional sequence of command line arguments. If None, uses sys.argv[1:].
2357 Examples:
2358 >>> # Test argument parsing
2359 >>> try:
2360 ... main(["--stdio", "cat", "--port", "9000"]) # doctest: +SKIP
2361 ... except SystemExit:
2362 ... pass # Would normally start the server
2363 """
2364 args = _parse_args(argv or sys.argv[1:])
2365 logging.basicConfig(
2366 level=getattr(logging, args.logLevel.upper(), logging.INFO),
2367 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
2368 datefmt="%Y-%m-%dT%H:%M:%S",
2369 )
2371 # Parse header mappings if dynamic environment injection is enabled
2372 # Pre-normalize mappings once at startup for O(1) lookups per request
2373 header_mappings: NormalizedMappings | None = None
2374 if getattr(args, "enable_dynamic_env", False):
2375 try:
2376 raw_mappings = parse_header_mappings(getattr(args, "header_to_env", []))
2377 header_mappings = NormalizedMappings(raw_mappings)
2378 LOGGER.info(f"Dynamic environment injection enabled with {len(header_mappings)} header mappings")
2379 except Exception as e:
2380 LOGGER.error(f"Failed to parse header mappings: {e}")
2381 raise
2383 try:
2384 # Handle gRPC server exposure
2385 if getattr(args, "grpc", None):
2386 # First-Party
2387 from mcpgateway.translate_grpc import expose_grpc_via_sse # pylint: disable=import-outside-toplevel
2389 # Parse metadata
2390 metadata = {}
2391 if getattr(args, "grpc_metadata", None):
2392 for item in args.grpc_metadata:
2393 if "=" in item:
2394 key, value = item.split("=", 1)
2395 metadata[key] = value
2397 asyncio.run(
2398 expose_grpc_via_sse(
2399 target=args.grpc,
2400 port=args.port,
2401 tls_enabled=getattr(args, "grpc_tls", False),
2402 tls_cert=getattr(args, "grpc_cert", None),
2403 tls_key=getattr(args, "grpc_key", None),
2404 metadata=metadata,
2405 )
2406 )
2408 # Handle local stdio server exposure
2409 elif args.stdio:
2410 # Check which protocols to expose
2411 expose_sse = getattr(args, "expose_sse", False)
2412 expose_streamable_http = getattr(args, "expose_streamable_http", False)
2414 # If no protocol specified, default to SSE for backward compatibility
2415 if not expose_sse and not expose_streamable_http:
2416 expose_sse = True
2418 # Use multi-protocol server
2419 asyncio.run(
2420 _run_multi_protocol_server(
2421 cmd=args.stdio,
2422 port=args.port,
2423 log_level=args.logLevel,
2424 cors=args.cors,
2425 host=args.host,
2426 expose_sse=expose_sse,
2427 expose_streamable_http=expose_streamable_http,
2428 sse_path=getattr(args, "ssePath", "/sse"),
2429 message_path=getattr(args, "messagePath", "/message"),
2430 keep_alive=getattr(args, "keepAlive", KEEP_ALIVE_INTERVAL),
2431 stateless=getattr(args, "stateless", False),
2432 json_response=getattr(args, "jsonResponse", False),
2433 header_mappings=header_mappings,
2434 )
2435 )
2437 # Handle remote connection modes
2438 elif getattr(args, "connect_sse", None):
2439 start_sse(args.connect_sse, args.oauth2Bearer, 30.0, args.stdioCommand)
2440 elif getattr(args, "connect_streamable_http", None):
2441 start_streamable_http_client(args.connect_streamable_http, args.oauth2Bearer, 30.0, args.stdioCommand)
2442 elif getattr(args, "connect_grpc", None):
2443 print("Error: --connect-grpc mode not yet implemented. Use --grpc to expose a gRPC server.", file=sys.stderr)
2444 sys.exit(1)
2445 else:
2446 print("Error: Must specify either --stdio (to expose local server), --grpc (to expose gRPC server), or --connect-sse/--connect-streamable-http (to connect to remote)", file=sys.stderr)
2447 sys.exit(1)
2448 except KeyboardInterrupt:
2449 print("") # restore shell prompt
2450 sys.exit(0)
2451 except (NotImplementedError, ImportError) as exc:
2452 print(exc, file=sys.stderr)
2453 sys.exit(1)
2456if __name__ == "__main__": # python3 -m mcpgateway.translate ...
2457 main()