Coverage for mcpgateway / translate.py: 99%

806 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2 

3'''Location: ./mcpgateway/translate.py 

4Copyright 2025 

5SPDX-License-Identifier: Apache-2.0 

6Authors: Mihai Criveti, Manav Gupta 

7 

8r"""Bridges between different MCP transport protocols. 

9This module provides bidirectional bridging between MCP servers that communicate 

10via different transport protocols: stdio/JSON-RPC, HTTP/SSE, and streamable HTTP. 

11It enables exposing local MCP servers over HTTP or consuming remote endpoints 

12as local stdio servers. 

13 

14The bridge supports multiple modes of operation: 

15- stdio to SSE: Expose a local stdio MCP server over HTTP/SSE 

16- SSE to stdio: Bridge a remote SSE endpoint to local stdio 

17- stdio to streamable HTTP: Expose a local stdio MCP server via streamable HTTP 

18- streamable HTTP to stdio: Bridge a remote streamable HTTP endpoint to local stdio 

19 

20Examples: 

21 Programmatic usage: 

22 

23 >>> import asyncio 

24 >>> from mcpgateway.translate import start_stdio 

25 >>> asyncio.run(start_stdio("uvx mcp-server-git", 9000, "info", None, "127.0.0.1")) # doctest: +SKIP 

26 

27 Test imports and configuration: 

28 

29 >>> from mcpgateway.translate import MCPServer, StreamableHTTPSessionManager 

30 >>> isinstance(MCPServer, type) 

31 True 

32 >>> isinstance(StreamableHTTPSessionManager, type) 

33 True 

34 >>> from mcpgateway.translate import KEEP_ALIVE_INTERVAL 

35 >>> KEEP_ALIVE_INTERVAL > 0 

36 True 

37 >>> from mcpgateway.translate import DEFAULT_KEEPALIVE_ENABLED 

38 >>> isinstance(DEFAULT_KEEPALIVE_ENABLED, bool) 

39 True 

40 

41 Test Starlette imports: 

42 

43 >>> from mcpgateway.translate import Starlette, Route 

44 >>> isinstance(Starlette, type) 

45 True 

46 >>> isinstance(Route, type) 

47 True 

48 

49 Test logging setup: 

50 

51 >>> from mcpgateway.translate import LOGGER, logging_service 

52 >>> LOGGER is not None 

53 True 

54 >>> logging_service is not None 

55 True 

56 >>> hasattr(LOGGER, 'info') 

57 True 

58 >>> hasattr(LOGGER, 'error') 

59 True 

60 >>> hasattr(LOGGER, 'debug') 

61 True 

62 

63 Test utility classes: 

64 

65 >>> from mcpgateway.translate import _PubSub, StdIOEndpoint 

66 >>> pubsub = _PubSub() 

67 >>> hasattr(pubsub, 'publish') 

68 True 

69 >>> hasattr(pubsub, 'subscribe') 

70 True 

71 >>> hasattr(pubsub, 'unsubscribe') 

72 True 

73 

74Usage: 

75 Command line usage:: 

76 

77 # 1. Expose an MCP server that talks JSON-RPC on stdio at :9000/sse 

78 python3 -m mcpgateway.translate --stdio "uvx mcp-server-git" --port 9000 

79 

80 # 2. Bridge a remote SSE endpoint to local stdio 

81 python3 -m mcpgateway.translate --sse "https://example.com/sse" \ 

82 --stdioCommand "uvx mcp-client" 

83 

84 # 3. Expose stdio server via streamable HTTP at :9000/mcp 

85 python3 -m mcpgateway.translate --streamableHttp "uvx mcp-server-git" \ 

86 --port 9000 --stateless --jsonResponse 

87 

88 # 4. Connect to remote streamable HTTP endpoint 

89 python3 -m mcpgateway.translate \ 

90 --streamableHttp "https://example.com/mcp" \ 

91 --oauth2Bearer "your-token" 

92 

93 # 5. Test SSE endpoint 

94 curl -N http://localhost:9000/sse # receive the stream 

95 

96 # 6. Send a test echo request to SSE endpoint 

97 curl -X POST http://localhost:9000/message \ 

98 -H 'Content-Type: application/json' \ 

99 -d '{"jsonrpc":"2.0","id":1,"method":"echo","params":{"value":"hi"}}' 

100 

101 # 7. Test streamable HTTP endpoint 

102 curl -X POST http://localhost:9000/mcp \ 

103 -H 'Content-Type: application/json' \ 

104 -d '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{},"clientInfo":{"name":"demo","version":"0.0.1"}}}' 

105 

106 The SSE stream emits JSON-RPC responses as ``event: message`` frames and sends 

107 regular ``event: keepalive`` frames (default every 30s) to prevent timeouts. 

108 

109 Streamable HTTP supports both stateful (with session management) and stateless 

110 modes, and can return either JSON responses or SSE streams. 

111""" 

112''' 

113 

114# Future 

115from __future__ import annotations 

116 

117# Standard 

118import argparse 

119import asyncio 

120from contextlib import suppress 

121import logging 

122import os 

123import shlex 

124import signal 

125import sys 

126from typing import Any, AsyncIterator, Dict, List, Optional, Sequence, Tuple 

127from urllib.parse import urlencode 

128import uuid 

129 

130# Third-Party 

131import anyio 

132from fastapi import FastAPI, Request, Response, status 

133from fastapi.middleware.cors import CORSMiddleware 

134from fastapi.responses import PlainTextResponse 

135from mcp.server import Server as MCPServer 

136from mcp.server.streamable_http_manager import StreamableHTTPSessionManager 

137import orjson 

138from starlette.applications import Starlette 

139from starlette.routing import Route 

140from starlette.types import Receive, Scope, Send 

141import uvicorn 

142 

143try: 

144 # Third-Party 

145 import httpx 

146except ImportError: 

147 httpx = None # type: ignore[assignment] 

148 

149# First-Party 

150from mcpgateway.services.logging_service import LoggingService 

151from mcpgateway.translate_header_utils import extract_env_vars_from_headers, NormalizedMappings, parse_header_mappings 

152 

153# Use patched EventSourceResponse with CPU spin protection (anyio#695 fix) 

154from mcpgateway.transports.sse_transport import EventSourceResponse 

155from mcpgateway.utils.orjson_response import ORJSONResponse 

156 

157# Initialize logging service first 

158logging_service = LoggingService() 

159LOGGER = logging_service.get_logger("mcpgateway.translate") 

160CONTENT_TYPE = os.getenv("FORGE_CONTENT_TYPE", "application/json") 

161# headers = {"Content-Type": CONTENT_TYPE} 

162# Import settings for default keepalive interval 

163try: 

164 # First-Party 

165 from mcpgateway.config import settings 

166 

167 DEFAULT_KEEP_ALIVE_INTERVAL = settings.sse_keepalive_interval 

168 DEFAULT_KEEPALIVE_ENABLED = settings.sse_keepalive_enabled 

169 DEFAULT_SSL_VERIFY = not settings.skip_ssl_verify 

170except ImportError: 

171 # Fallback if config not available 

172 DEFAULT_KEEP_ALIVE_INTERVAL = 30 

173 DEFAULT_KEEPALIVE_ENABLED = True 

174 DEFAULT_SSL_VERIFY = True # Verify SSL by default when config unavailable 

175 

176KEEP_ALIVE_INTERVAL = DEFAULT_KEEP_ALIVE_INTERVAL # seconds - from config or fallback to 30 

177 

178# Buffer limit for subprocess stdout (default 64KB is too small for large tool responses) 

179# Set to 16MB to handle tools that return large amounts of data (e.g., search results) 

180STDIO_BUFFER_LIMIT = 16 * 1024 * 1024 # 16MB 

181 

182__all__ = ["main"] # for console-script entry-point 

183 

184 

185# ---------------------------------------------------------------------------# 

186# Helpers - trivial in-process Pub/Sub # 

187# ---------------------------------------------------------------------------# 

188class _PubSub: 

189 """Very small fan-out helper - one async Queue per subscriber. 

190 

191 This class implements a simple publish-subscribe pattern using asyncio queues 

192 for distributing messages from stdio subprocess to multiple SSE clients. 

193 

194 Examples: 

195 >>> import asyncio 

196 >>> async def test_pubsub(): 

197 ... pubsub = _PubSub() 

198 ... q = pubsub.subscribe() 

199 ... await pubsub.publish("hello") 

200 ... result = await q.get() 

201 ... pubsub.unsubscribe(q) 

202 ... return result 

203 >>> asyncio.run(test_pubsub()) 

204 'hello' 

205 """ 

206 

207 def __init__(self) -> None: 

208 """Initialize a new publish-subscribe system. 

209 

210 Creates an empty list of subscriber queues. Each subscriber will 

211 receive their own asyncio.Queue for receiving published messages. 

212 

213 Examples: 

214 >>> pubsub = _PubSub() 

215 >>> isinstance(pubsub._subscribers, list) 

216 True 

217 >>> len(pubsub._subscribers) 

218 0 

219 >>> hasattr(pubsub, '_subscribers') 

220 True 

221 """ 

222 self._subscribers: List[asyncio.Queue[str]] = [] 

223 

224 async def publish(self, data: str) -> None: 

225 """Publish data to all subscribers. 

226 

227 Dead queues (full) are automatically removed from the subscriber list. 

228 

229 Args: 

230 data: The data string to publish to all subscribers. 

231 

232 Examples: 

233 >>> import asyncio 

234 >>> async def test_publish(): 

235 ... pubsub = _PubSub() 

236 ... await pubsub.publish("test") # No subscribers, no error 

237 ... return True 

238 >>> asyncio.run(test_publish()) 

239 True 

240 

241 >>> # Test queue full handling 

242 >>> async def test_full_queue(): 

243 ... pubsub = _PubSub() 

244 ... # Create a queue with size 1 

245 ... q = asyncio.Queue(maxsize=1) 

246 ... pubsub._subscribers = [q] 

247 ... # Fill the queue 

248 ... await q.put("first") 

249 ... # This should remove the full queue 

250 ... await pubsub.publish("second") 

251 ... return len(pubsub._subscribers) 

252 >>> asyncio.run(test_full_queue()) 

253 0 

254 """ 

255 dead: List[asyncio.Queue[str]] = [] 

256 for q in self._subscribers: 

257 try: 

258 q.put_nowait(data) 

259 except asyncio.QueueFull: 

260 dead.append(q) 

261 for q in dead: 

262 with suppress(ValueError): 

263 self._subscribers.remove(q) 

264 

265 def subscribe(self) -> "asyncio.Queue[str]": 

266 """Subscribe to published data. 

267 

268 Creates a new queue for receiving published messages with a maximum 

269 size of 1024 items. 

270 

271 Returns: 

272 asyncio.Queue[str]: A queue that will receive published data. 

273 

274 Examples: 

275 >>> pubsub = _PubSub() 

276 >>> q = pubsub.subscribe() 

277 >>> isinstance(q, asyncio.Queue) 

278 True 

279 >>> q.maxsize 

280 1024 

281 >>> len(pubsub._subscribers) 

282 1 

283 >>> pubsub._subscribers[0] is q 

284 True 

285 """ 

286 q: asyncio.Queue[str] = asyncio.Queue(maxsize=1024) 

287 self._subscribers.append(q) 

288 return q 

289 

290 def unsubscribe(self, q: "asyncio.Queue[str]") -> None: 

291 """Unsubscribe from published data. 

292 

293 Removes the queue from the subscriber list. Safe to call even if 

294 the queue is not in the list. 

295 

296 Args: 

297 q: The queue to unsubscribe from published data. 

298 

299 Examples: 

300 >>> pubsub = _PubSub() 

301 >>> q = pubsub.subscribe() 

302 >>> pubsub.unsubscribe(q) 

303 >>> pubsub.unsubscribe(q) # No error on double unsubscribe 

304 """ 

305 with suppress(ValueError): 

306 self._subscribers.remove(q) 

307 

308 

309# ---------------------------------------------------------------------------# 

310# StdIO endpoint (child process ↔ async queues) # 

311# ---------------------------------------------------------------------------# 

312class StdIOEndpoint: 

313 """Wrap a child process whose stdin/stdout speak line-delimited JSON-RPC. 

314 

315 This class manages a subprocess that communicates via stdio using JSON-RPC 

316 protocol, pumping messages between the subprocess and a pubsub system. 

317 

318 Examples: 

319 >>> import asyncio 

320 >>> async def test_stdio(): 

321 ... pubsub = _PubSub() 

322 ... stdio = StdIOEndpoint("echo hello", pubsub) 

323 ... # Would start a real subprocess 

324 ... return isinstance(stdio, StdIOEndpoint) 

325 >>> asyncio.run(test_stdio()) 

326 True 

327 """ 

328 

329 def __init__(self, cmd: str, pubsub: _PubSub, env_vars: Optional[Dict[str, str]] = None, header_mappings: Optional[NormalizedMappings] = None) -> None: 

330 """Initialize a stdio endpoint for subprocess communication. 

331 

332 Sets up the endpoint with the command to run and the pubsub system 

333 for message distribution. The subprocess is not started until start() 

334 is called. 

335 

336 Args: 

337 cmd: The command string to execute as a subprocess. 

338 pubsub: The publish-subscribe system for distributing subprocess 

339 output to SSE clients. 

340 env_vars: Optional dictionary of environment variables to set 

341 when starting the subprocess. 

342 header_mappings: Optional mapping of HTTP headers to environment variable names 

343 for dynamic environment injection. 

344 

345 Examples: 

346 >>> pubsub = _PubSub() 

347 >>> endpoint = StdIOEndpoint("echo hello", pubsub) 

348 >>> endpoint._cmd 

349 'echo hello' 

350 >>> endpoint._proc is None 

351 True 

352 >>> isinstance(endpoint._pubsub, _PubSub) 

353 True 

354 >>> endpoint._stdin is None 

355 True 

356 >>> endpoint._pump_task is None 

357 True 

358 >>> endpoint._pubsub is pubsub 

359 True 

360 """ 

361 self._cmd = cmd 

362 self._pubsub = pubsub 

363 self._env_vars = env_vars or {} 

364 self._header_mappings = header_mappings 

365 self._proc: Optional[asyncio.subprocess.Process] = None 

366 self._stdin: Optional[asyncio.StreamWriter] = None 

367 self._pump_task: Optional[asyncio.Task[None]] = None 

368 

369 async def start(self, additional_env_vars: Optional[Dict[str, str]] = None) -> None: 

370 """Start the stdio subprocess with custom environment variables. 

371 

372 Creates the subprocess and starts the stdout pump task. The subprocess 

373 is created with stdin/stdout pipes and stderr passed through. 

374 

375 Args: 

376 additional_env_vars: Optional dictionary of additional environment 

377 variables to set when starting the subprocess. These will be 

378 combined with the environment variables set during initialization. 

379 

380 Raises: 

381 RuntimeError: If the subprocess fails to create stdin/stdout pipes. 

382 

383 Examples: 

384 >>> import asyncio # doctest: +SKIP 

385 >>> async def test_start(): # doctest: +SKIP 

386 ... pubsub = _PubSub() 

387 ... stdio = StdIOEndpoint("cat", pubsub) 

388 ... # await stdio.start() # doctest: +SKIP 

389 ... return True 

390 >>> asyncio.run(test_start()) # doctest: +SKIP 

391 True 

392 """ 

393 # Stop existing subprocess before starting a new one 

394 if self._proc is not None: 

395 await self.stop() 

396 

397 LOGGER.info(f"Starting stdio subprocess: {self._cmd}") 

398 

399 # Build environment from base + configured + additional 

400 env = os.environ.copy() 

401 env.update(self._env_vars) 

402 if additional_env_vars: 

403 env.update(additional_env_vars) 

404 

405 # System-critical environment variables that must never be cleared 

406 system_critical_vars = {"PATH", "HOME", "TMPDIR", "TEMP", "TMP", "USER", "LOGNAME", "SHELL", "LANG", "LC_ALL", "LC_CTYPE", "PYTHONHOME", "PYTHONPATH"} 

407 

408 # Clear any mapped env vars that weren't provided in headers to avoid inheritance 

409 if self._header_mappings: 

410 for env_var_name in self._header_mappings.values(): 

411 if env_var_name not in (additional_env_vars or {}) and env_var_name not in system_critical_vars: 

412 # Delete the variable instead of setting to empty string to avoid 

413 # breaking subprocess initialization 

414 env.pop(env_var_name, None) 

415 

416 LOGGER.debug(f"Subprocess environment variables: {list(env.keys())}") 

417 self._proc = await asyncio.create_subprocess_exec( 

418 *shlex.split(self._cmd), 

419 stdin=asyncio.subprocess.PIPE, 

420 stdout=asyncio.subprocess.PIPE, 

421 stderr=sys.stderr, # passthrough for visibility 

422 env=env, # 🔑 Add environment variable support 

423 limit=STDIO_BUFFER_LIMIT, # Increase buffer limit for large tool responses 

424 ) 

425 

426 # Explicit error checking 

427 if not self._proc.stdin or not self._proc.stdout: 

428 raise RuntimeError(f"Failed to create subprocess with stdin/stdout pipes for command: {self._cmd}") 

429 

430 LOGGER.debug("Subprocess started successfully") 

431 

432 self._stdin = self._proc.stdin 

433 self._pump_task = asyncio.create_task(self._pump_stdout()) 

434 

435 async def stop(self) -> None: 

436 """Stop the stdio subprocess. 

437 

438 Terminates the subprocess gracefully with a 5-second timeout, 

439 then cancels the pump task. 

440 

441 Examples: 

442 >>> import asyncio 

443 >>> async def test_stop(): 

444 ... pubsub = _PubSub() 

445 ... stdio = StdIOEndpoint("cat", pubsub) 

446 ... await stdio.stop() # Safe to call even if not started 

447 ... return True 

448 >>> asyncio.run(test_stop()) 

449 True 

450 """ 

451 if self._proc is None: 

452 return 

453 

454 # Check if process is still running 

455 try: 

456 if self._proc.returncode is not None: 

457 # Process already terminated 

458 LOGGER.info(f"Subprocess (pid={self._proc.pid}) already terminated") 

459 self._proc = None 

460 self._stdin = None 

461 return 

462 except (ProcessLookupError, AttributeError): 

463 # Process doesn't exist or is already cleaned up 

464 LOGGER.info("Subprocess already cleaned up") 

465 self._proc = None 

466 self._stdin = None 

467 return 

468 

469 LOGGER.info(f"Stopping subprocess (pid={self._proc.pid})") 

470 try: 

471 self._proc.terminate() 

472 with suppress(asyncio.TimeoutError): 

473 await asyncio.wait_for(self._proc.wait(), timeout=5) 

474 except ProcessLookupError: 

475 # Process already terminated 

476 LOGGER.info("Subprocess already terminated") 

477 finally: 

478 if self._pump_task: 

479 self._pump_task.cancel() 

480 # Wait for pump task to actually finish, suppressing all exceptions 

481 # since the pump task logs its own errors. Use BaseException to catch 

482 # CancelledError which inherits from BaseException in Python 3.8+ 

483 with suppress(BaseException): 

484 await self._pump_task 

485 self._proc = None 

486 self._stdin = None # Reset stdin too! 

487 

488 def is_running(self) -> bool: 

489 """Check if the stdio subprocess is currently running. 

490 

491 Returns: 

492 True if the subprocess is running, False otherwise. 

493 

494 Examples: 

495 >>> import asyncio 

496 >>> async def test_is_running(): 

497 ... pubsub = _PubSub() 

498 ... stdio = StdIOEndpoint("cat", pubsub) 

499 ... return stdio.is_running() 

500 >>> asyncio.run(test_is_running()) 

501 False 

502 """ 

503 return self._proc is not None 

504 

505 async def send(self, raw: str) -> None: 

506 """Send data to the subprocess stdin. 

507 

508 Args: 

509 raw: The raw data string to send to the subprocess. 

510 

511 Raises: 

512 RuntimeError: If the stdio endpoint is not started. 

513 

514 Examples: 

515 >>> import asyncio 

516 >>> async def test_send(): 

517 ... pubsub = _PubSub() 

518 ... stdio = StdIOEndpoint("cat", pubsub) 

519 ... try: 

520 ... await stdio.send("test") 

521 ... except RuntimeError as e: 

522 ... return str(e) 

523 >>> asyncio.run(test_send()) 

524 'stdio endpoint not started' 

525 """ 

526 if not self._stdin: 

527 raise RuntimeError("stdio endpoint not started") 

528 LOGGER.debug(f"→ stdio: {raw.strip()}") 

529 self._stdin.write(raw.encode()) 

530 await self._stdin.drain() 

531 

532 async def _pump_stdout(self) -> None: 

533 """Pump stdout from subprocess to pubsub. 

534 

535 Continuously reads lines from the subprocess stdout and publishes them 

536 to the pubsub system. Runs until EOF or exception. 

537 

538 Raises: 

539 RuntimeError: If process or stdout is not properly initialized. 

540 Exception: For any other error encountered while pumping stdout. 

541 """ 

542 if not self._proc or not self._proc.stdout: 

543 raise RuntimeError("Process not properly initialized: missing stdout") 

544 

545 reader = self._proc.stdout 

546 try: 

547 while True: 

548 line = await reader.readline() 

549 if not line: # EOF 

550 break 

551 text = line.decode(errors="replace") 

552 LOGGER.debug(f"← stdio: {text.strip()}") 

553 await self._pubsub.publish(text) 

554 except ConnectionResetError: # pragma: no cover --subprocess terminated 

555 # Subprocess terminated abruptly - this is expected behavior 

556 LOGGER.debug("stdout pump: subprocess connection closed") 

557 except Exception: # pragma: no cover --best-effort logging 

558 LOGGER.exception("stdout pump crashed - terminating bridge") 

559 raise 

560 

561 

562# ---------------------------------------------------------------------------# 

563# SSE Event Parser # 

564# ---------------------------------------------------------------------------# 

565class SSEEvent: 

566 """Represents a Server-Sent Event with proper field parsing. 

567 

568 Attributes: 

569 event: The event type (e.g., 'message', 'keepalive', 'endpoint') 

570 data: The event data payload 

571 event_id: Optional event ID 

572 retry: Optional retry interval in milliseconds 

573 """ 

574 

575 def __init__(self, event: str = "message", data: str = "", event_id: Optional[str] = None, retry: Optional[int] = None): 

576 """Initialize an SSE event. 

577 

578 Args: 

579 event: Event type, defaults to "message" 

580 data: Event data payload 

581 event_id: Optional event ID 

582 retry: Optional retry interval in milliseconds 

583 """ 

584 self.event = event 

585 self.data = data 

586 self.event_id = event_id 

587 self.retry = retry 

588 

589 @classmethod 

590 def parse_sse_line(cls, line: str, current_event: Optional["SSEEvent"] = None) -> Tuple[Optional["SSEEvent"], bool]: 

591 """Parse a single SSE line and update or create an event. 

592 

593 Args: 

594 line: The SSE line to parse 

595 current_event: The current event being built (if any) 

596 

597 Returns: 

598 Tuple of (event, is_complete) where event is the SSEEvent object 

599 and is_complete indicates if the event is ready to be processed 

600 """ 

601 line = line.rstrip("\n\r") 

602 

603 # Empty line signals end of event 

604 if not line: 

605 if current_event and current_event.data: 

606 return current_event, True 

607 return None, False 

608 

609 # Comment line 

610 if line.startswith(":"): 

611 return current_event, False 

612 

613 # Parse field 

614 if ":" in line: 

615 field, value = line.split(":", 1) 

616 value = value.lstrip(" ") # Remove leading space if present 

617 else: 

618 field = line 

619 value = "" 

620 

621 # Create event if needed 

622 if current_event is None: 

623 current_event = cls() 

624 

625 # Update fields 

626 if field == "event": 

627 current_event.event = value 

628 elif field == "data": 

629 if current_event.data: 

630 current_event.data += "\n" + value 

631 else: 

632 current_event.data = value 

633 elif field == "id": 

634 current_event.event_id = value 

635 elif field == "retry": 

636 try: 

637 current_event.retry = int(value) 

638 except ValueError: 

639 pass # Ignore invalid retry values 

640 

641 return current_event, False 

642 

643 

644# ---------------------------------------------------------------------------# 

645# FastAPI app exposing /sse & /message # 

646# ---------------------------------------------------------------------------# 

647 

648 

649def _build_fastapi( 

650 pubsub: _PubSub, 

651 stdio: StdIOEndpoint, 

652 keep_alive: float = KEEP_ALIVE_INTERVAL, 

653 sse_path: str = "/sse", 

654 message_path: str = "/message", 

655 cors_origins: Optional[List[str]] = None, 

656 header_mappings: Optional[NormalizedMappings] = None, 

657) -> FastAPI: 

658 """Build FastAPI application with SSE and message endpoints. 

659 

660 Creates a FastAPI app with SSE streaming endpoint and message posting 

661 endpoint for bidirectional communication with the stdio subprocess. 

662 

663 Args: 

664 pubsub: The publish/subscribe system for message routing. 

665 stdio: The stdio endpoint for subprocess communication. 

666 keep_alive: Interval in seconds for keepalive messages. Defaults to KEEP_ALIVE_INTERVAL. 

667 sse_path: Path for the SSE endpoint. Defaults to "/sse". 

668 message_path: Path for the message endpoint. Defaults to "/message". 

669 cors_origins: Optional list of CORS allowed origins. 

670 header_mappings: Optional mapping of HTTP headers to environment variables. 

671 

672 Returns: 

673 FastAPI: The configured FastAPI application. 

674 

675 Examples: 

676 >>> pubsub = _PubSub() 

677 >>> stdio = StdIOEndpoint("cat", pubsub) 

678 >>> app = _build_fastapi(pubsub, stdio) 

679 >>> isinstance(app, FastAPI) 

680 True 

681 >>> "/sse" in [r.path for r in app.routes] 

682 True 

683 >>> "/message" in [r.path for r in app.routes] 

684 True 

685 >>> "/healthz" in [r.path for r in app.routes] 

686 True 

687 

688 >>> # Test with custom paths 

689 >>> app2 = _build_fastapi(pubsub, stdio, sse_path="/events", message_path="/send") 

690 >>> "/events" in [r.path for r in app2.routes] 

691 True 

692 >>> "/send" in [r.path for r in app2.routes] 

693 True 

694 

695 >>> # Test CORS middleware is added 

696 >>> app3 = _build_fastapi(pubsub, stdio, cors_origins=["http://example.com"]) 

697 >>> # Check that middleware stack includes CORSMiddleware 

698 >>> any("CORSMiddleware" in str(m) for m in app3.user_middleware) 

699 True 

700 """ 

701 app = FastAPI() 

702 

703 # Add CORS middleware if origins specified 

704 if cors_origins: 

705 app.add_middleware( 

706 CORSMiddleware, 

707 allow_origins=cors_origins, 

708 allow_credentials=True, 

709 allow_methods=["*"], 

710 allow_headers=["*"], 

711 ) 

712 

713 # ----- GET /sse ---------------------------------------------------------# 

714 @app.get(sse_path) 

715 async def get_sse(request: Request) -> EventSourceResponse: # noqa: D401 

716 """Stream subprocess stdout to any number of SSE clients. 

717 

718 Args: 

719 request (Request): The incoming ``GET`` request that will be 

720 upgraded to a Server-Sent Events (SSE) stream. 

721 

722 Returns: 

723 EventSourceResponse: A streaming response that forwards JSON-RPC 

724 messages from the child process and emits periodic ``keepalive`` 

725 frames so that clients and proxies do not time out. 

726 """ 

727 # Extract environment variables from headers if dynamic env is enabled 

728 additional_env_vars = {} 

729 if header_mappings: 

730 request_headers = dict(request.headers) 

731 additional_env_vars = extract_env_vars_from_headers(request_headers, header_mappings) 

732 

733 # Restart stdio endpoint with new environment variables 

734 if additional_env_vars: 

735 LOGGER.info(f"Restarting stdio endpoint with {len(additional_env_vars)} environment variables") 

736 await stdio.stop() # Stop existing process 

737 await stdio.start(additional_env_vars) # Start with new env vars 

738 

739 queue = pubsub.subscribe() 

740 session_id = uuid.uuid4().hex 

741 

742 async def event_gen() -> AsyncIterator[Dict[str, Any]]: 

743 """Generate Server-Sent Events for the SSE stream. 

744 

745 Yields SSE events in the following sequence: 

746 1. An 'endpoint' event with the message posting URL (required by MCP spec) 

747 2. An immediate 'keepalive' event to confirm the stream is active 

748 3. 'message' events containing JSON-RPC responses from the subprocess 

749 4. Periodic 'keepalive' events to prevent timeouts 

750 

751 The generator runs until the client disconnects or the server shuts down. 

752 Automatically unsubscribes from the pubsub system on completion. 

753 

754 Yields: 

755 Dict[str, Any]: SSE event dictionaries containing: 

756 - event: The event type ('endpoint', 'message', or 'keepalive') 

757 - data: The event payload (URL, JSON-RPC message, or empty object) 

758 - retry: Retry interval in milliseconds for reconnection 

759 

760 Examples: 

761 >>> import asyncio 

762 >>> async def test_event_gen(): 

763 ... # This is tested indirectly through the SSE endpoint 

764 ... return True 

765 >>> asyncio.run(test_event_gen()) 

766 True 

767 """ 

768 # 1️⃣ Mandatory "endpoint" bootstrap required by the MCP spec 

769 endpoint_url = f"{str(request.base_url).rstrip('/')}{message_path}?session_id={session_id}" 

770 yield { 

771 "event": "endpoint", 

772 "data": endpoint_url, 

773 "retry": int(keep_alive * 1000), 

774 } 

775 

776 # 2️⃣ Immediate keepalive so clients know the stream is alive (if enabled in config) 

777 if DEFAULT_KEEPALIVE_ENABLED: 

778 yield {"event": "keepalive", "data": "{}", "retry": keep_alive * 1000} 

779 

780 try: 

781 while True: 

782 if await request.is_disconnected(): 

783 break 

784 

785 try: 

786 timeout = keep_alive if DEFAULT_KEEPALIVE_ENABLED else None 

787 msg = await asyncio.wait_for(queue.get(), timeout) 

788 yield {"event": "message", "data": msg.rstrip()} 

789 except asyncio.TimeoutError: 

790 if DEFAULT_KEEPALIVE_ENABLED: 

791 yield { 

792 "event": "keepalive", 

793 "data": "{}", 

794 "retry": keep_alive * 1000, 

795 } 

796 finally: 

797 if pubsub: 

798 pubsub.unsubscribe(queue) 

799 

800 return EventSourceResponse( 

801 event_gen(), 

802 headers={ 

803 "Cache-Control": "no-cache", 

804 "Connection": "keep-alive", 

805 "X-Accel-Buffering": "no", # disable proxy buffering 

806 }, 

807 ) 

808 

809 # ----- POST /message ----------------------------------------------------# 

810 @app.post(message_path, status_code=status.HTTP_202_ACCEPTED) 

811 async def post_message(raw: Request, session_id: str | None = None) -> Response: # noqa: D401 

812 """Forward a raw JSON-RPC request to the stdio subprocess. 

813 

814 Args: 

815 raw (Request): The incoming ``POST`` request whose body contains 

816 a single JSON-RPC message. 

817 session_id (str | None): The SSE session identifier that originated 

818 this back-channel call (present when the client obtained the 

819 endpoint URL from an ``endpoint`` bootstrap frame). 

820 

821 Returns: 

822 Response: ``202 Accepted`` if the payload is forwarded successfully, 

823 or ``400 Bad Request`` when the body is not valid JSON. 

824 """ 

825 _ = session_id # Unused but required for API compatibility 

826 

827 # Extract environment variables from headers if dynamic env is enabled 

828 additional_env_vars = {} 

829 if header_mappings: 

830 request_headers = dict(raw.headers) 

831 additional_env_vars = extract_env_vars_from_headers(request_headers, header_mappings) 

832 

833 # Restart stdio endpoint with new environment variables 

834 if additional_env_vars: 

835 LOGGER.info(f"Restarting stdio endpoint with {len(additional_env_vars)} environment variables") 

836 await stdio.stop() # Stop existing process 

837 await stdio.start(additional_env_vars) # Start with new env vars 

838 await asyncio.sleep(0.5) # Give process time to initialize 

839 

840 # Ensure stdio endpoint is running 

841 if not stdio.is_running(): 

842 LOGGER.info("Starting stdio endpoint (was not running)") 

843 await stdio.start() 

844 await asyncio.sleep(0.5) # Give process time to initialize 

845 

846 payload = await raw.body() 

847 try: 

848 orjson.loads(payload) # validate 

849 except Exception as exc: # noqa: BLE001 

850 return PlainTextResponse( 

851 f"Invalid JSON payload: {exc}", 

852 status_code=status.HTTP_400_BAD_REQUEST, 

853 ) 

854 await stdio.send(payload.decode().rstrip() + "\n") 

855 return PlainTextResponse("forwarded", status_code=status.HTTP_202_ACCEPTED) 

856 

857 # ----- Liveness ---------------------------------------------------------# 

858 @app.get("/healthz") 

859 async def health() -> Response: # noqa: D401 

860 """Health check endpoint. 

861 

862 Returns: 

863 Response: A plain text response with "ok" status. 

864 """ 

865 return PlainTextResponse("ok") 

866 

867 return app 

868 

869 

870# ---------------------------------------------------------------------------# 

871# CLI & orchestration # 

872# ---------------------------------------------------------------------------# 

873 

874 

875def _parse_args(argv: Sequence[str]) -> argparse.Namespace: 

876 """Parse command line arguments. 

877 

878 Validates mutually exclusive source options and sets defaults for 

879 port and logging configuration. 

880 

881 Args: 

882 argv: Sequence of command line arguments. 

883 

884 Returns: 

885 argparse.Namespace: Parsed command line arguments. 

886 

887 Raises: 

888 NotImplementedError: If streamableHttp option is specified. 

889 

890 Examples: 

891 >>> args = _parse_args(["--stdio", "cat", "--port", "9000"]) 

892 >>> args.stdio 

893 'cat' 

894 >>> args.port 

895 9000 

896 >>> args.logLevel 

897 'info' 

898 >>> args.host 

899 '127.0.0.1' 

900 >>> args.cors is None 

901 True 

902 >>> args.oauth2Bearer is None 

903 True 

904 

905 >>> # Test default parameters 

906 >>> args = _parse_args(["--stdio", "cat"]) 

907 >>> args.port 

908 8000 

909 >>> args.host 

910 '127.0.0.1' 

911 >>> args.logLevel 

912 'info' 

913 

914 >>> # Test connect-sse mode 

915 >>> args = _parse_args(["--connect-sse", "http://example.com/sse"]) 

916 >>> args.connect_sse 

917 'http://example.com/sse' 

918 >>> args.stdio is None 

919 True 

920 

921 >>> # Test CORS configuration 

922 >>> args = _parse_args(["--stdio", "cat", "--cors", "https://app.com", "https://web.com"]) 

923 >>> args.cors 

924 ['https://app.com', 'https://web.com'] 

925 

926 >>> # Test OAuth2 Bearer token 

927 >>> args = _parse_args(["--connect-sse", "http://example.com", "--oauth2Bearer", "token123"]) 

928 >>> args.oauth2Bearer 

929 'token123' 

930 

931 >>> # Test custom host and log level 

932 >>> args = _parse_args(["--stdio", "cat", "--host", "0.0.0.0", "--logLevel", "debug"]) 

933 >>> args.host 

934 '0.0.0.0' 

935 >>> args.logLevel 

936 'debug' 

937 

938 >>> # Test expose protocols 

939 >>> args = _parse_args(["--stdio", "uvx mcp-server-git", "--expose-sse", "--expose-streamable-http"]) 

940 >>> args.stdio 

941 'uvx mcp-server-git' 

942 >>> args.expose_sse 

943 True 

944 >>> args.expose_streamable_http 

945 True 

946 >>> args.stateless 

947 False 

948 >>> args.jsonResponse 

949 False 

950 

951 >>> # Test new parameters 

952 >>> args = _parse_args(["--stdio", "cat", "--ssePath", "/events", "--messagePath", "/send", "--keepAlive", "60"]) 

953 >>> args.ssePath 

954 '/events' 

955 >>> args.messagePath 

956 '/send' 

957 >>> args.keepAlive 

958 60 

959 

960 >>> # Test connect-sse with stdio command 

961 >>> args = _parse_args(["--connect-sse", "http://example.com/sse", "--stdioCommand", "uvx mcp-server-git"]) 

962 >>> args.stdioCommand 

963 'uvx mcp-server-git' 

964 

965 >>> # Test connect-sse without stdio command (allowed) 

966 >>> args = _parse_args(["--connect-sse", "http://example.com/sse"]) 

967 >>> args.stdioCommand is None 

968 True 

969 """ 

970 p = argparse.ArgumentParser( 

971 prog="mcpgateway.translate", 

972 description="Bridges between different MCP transport protocols: stdio, SSE, and streamable HTTP.", 

973 ) 

974 

975 # Source/destination options 

976 p.add_argument("--stdio", help='Local command to run, e.g. "uvx mcp-server-git"') 

977 p.add_argument("--connect-sse", dest="connect_sse", help="Connect to remote SSE endpoint URL") 

978 p.add_argument("--connect-streamable-http", dest="connect_streamable_http", help="Connect to remote streamable HTTP endpoint URL") 

979 p.add_argument("--grpc", type=str, help="gRPC server target (host:port) to expose") 

980 p.add_argument("--connect-grpc", type=str, help="Remote gRPC endpoint to connect to") 

981 

982 # Protocol exposure options (can be combined) 

983 p.add_argument("--expose-sse", action="store_true", help="Expose via SSE protocol (endpoints: /sse and /message)") 

984 p.add_argument("--expose-streamable-http", action="store_true", help="Expose via streamable HTTP protocol (endpoint: /mcp)") 

985 

986 # gRPC configuration options 

987 p.add_argument("--grpc-tls", action="store_true", help="Enable TLS for gRPC connection") 

988 p.add_argument("--grpc-cert", type=str, help="Path to TLS certificate for gRPC") 

989 p.add_argument("--grpc-key", type=str, help="Path to TLS key for gRPC") 

990 p.add_argument("--grpc-metadata", action="append", help="gRPC metadata (KEY=VALUE, repeatable)") 

991 

992 p.add_argument("--port", type=int, default=8000, help="HTTP port to bind") 

993 p.add_argument("--host", default="127.0.0.1", help="Host interface to bind (default: 127.0.0.1)") 

994 p.add_argument( 

995 "--logLevel", 

996 default="info", 

997 choices=["debug", "info", "warning", "error", "critical"], 

998 help="Log level", 

999 ) 

1000 p.add_argument( 

1001 "--cors", 

1002 nargs="*", 

1003 help="CORS allowed origins (e.g., --cors https://app.example.com)", 

1004 ) 

1005 p.add_argument( 

1006 "--oauth2Bearer", 

1007 help="OAuth2 Bearer token for authentication", 

1008 ) 

1009 

1010 # New configuration options 

1011 p.add_argument( 

1012 "--ssePath", 

1013 default="/sse", 

1014 help="SSE endpoint path (default: /sse)", 

1015 ) 

1016 p.add_argument( 

1017 "--messagePath", 

1018 default="/message", 

1019 help="Message endpoint path (default: /message)", 

1020 ) 

1021 p.add_argument( 

1022 "--keepAlive", 

1023 type=int, 

1024 default=KEEP_ALIVE_INTERVAL, 

1025 help=f"Keep-alive interval in seconds (default: {KEEP_ALIVE_INTERVAL})", 

1026 ) 

1027 

1028 # For SSE to stdio mode 

1029 p.add_argument( 

1030 "--stdioCommand", 

1031 help="Command to run when bridging SSE/streamableHttp to stdio (optional with --sse or --streamableHttp)", 

1032 ) 

1033 

1034 # Dynamic environment variable injection 

1035 p.add_argument("--enable-dynamic-env", action="store_true", help="Enable dynamic environment variable injection from HTTP headers") 

1036 p.add_argument( 

1037 "--header-to-env", 

1038 action="append", 

1039 default=[], 

1040 help="Map HTTP header to environment variable (format: HEADER=ENV_VAR, can be used multiple times). Case-insensitive duplicates are rejected (e.g., Authorization and authorization cannot both be mapped).", 

1041 ) 

1042 

1043 # For streamable HTTP mode 

1044 p.add_argument( 

1045 "--stateless", 

1046 action="store_true", 

1047 help="Use stateless mode for streamable HTTP (default: False)", 

1048 ) 

1049 p.add_argument( 

1050 "--jsonResponse", 

1051 action="store_true", 

1052 help="Return JSON responses instead of SSE streams for streamable HTTP (default: False)", 

1053 ) 

1054 

1055 args = p.parse_args(argv) 

1056 # streamableHttp is now supported, no need to raise NotImplementedError 

1057 return args 

1058 

1059 

1060async def _run_stdio_to_sse( 

1061 cmd: str, 

1062 port: int, 

1063 log_level: str = "info", 

1064 cors: Optional[List[str]] = None, 

1065 host: str = "127.0.0.1", 

1066 sse_path: str = "/sse", 

1067 message_path: str = "/message", 

1068 keep_alive: float = KEEP_ALIVE_INTERVAL, 

1069 header_mappings: Optional[NormalizedMappings] = None, 

1070) -> None: 

1071 """Run stdio to SSE bridge. 

1072 

1073 Starts a subprocess and exposes it via HTTP/SSE endpoints. Handles graceful 

1074 shutdown on SIGINT/SIGTERM. 

1075 

1076 Args: 

1077 cmd: The command to run as a stdio subprocess. 

1078 port: The port to bind the HTTP server to. 

1079 log_level: The logging level to use. Defaults to "info". 

1080 cors: Optional list of CORS allowed origins. 

1081 host: The host interface to bind to. Defaults to "127.0.0.1" for security. 

1082 sse_path: Path for the SSE endpoint. Defaults to "/sse". 

1083 message_path: Path for the message endpoint. Defaults to "/message". 

1084 keep_alive: Keep-alive interval in seconds. Defaults to KEEP_ALIVE_INTERVAL. 

1085 header_mappings: Optional mapping of HTTP headers to environment variables. 

1086 

1087 Examples: 

1088 >>> import asyncio # doctest: +SKIP 

1089 >>> async def test_run(): # doctest: +SKIP 

1090 ... await _run_stdio_to_sse("cat", 9000) # doctest: +SKIP 

1091 ... return True 

1092 >>> asyncio.run(test_run()) # doctest: +SKIP 

1093 True 

1094 """ 

1095 pubsub = _PubSub() 

1096 stdio = StdIOEndpoint(cmd, pubsub, header_mappings=header_mappings) 

1097 await stdio.start() 

1098 

1099 app = _build_fastapi(pubsub, stdio, keep_alive=keep_alive, sse_path=sse_path, message_path=message_path, cors_origins=cors, header_mappings=header_mappings) 

1100 config = uvicorn.Config( 

1101 app, 

1102 host=host, # Changed from hardcoded "0.0.0.0" 

1103 port=port, 

1104 log_level=log_level, 

1105 lifespan="off", 

1106 ) 

1107 uvicorn_server = uvicorn.Server(config) 

1108 

1109 shutting_down = asyncio.Event() # 🔄 make shutdown idempotent 

1110 

1111 async def _shutdown() -> None: 

1112 """Handle graceful shutdown of the stdio bridge. 

1113 

1114 Performs shutdown operations in the correct order: 

1115 1. Sets a flag to prevent multiple shutdown attempts 

1116 2. Stops the stdio subprocess 

1117 3. Shuts down the HTTP server 

1118 

1119 This function is idempotent - multiple calls will only execute 

1120 the shutdown sequence once. 

1121 

1122 Examples: 

1123 >>> import asyncio 

1124 >>> async def test_shutdown(): 

1125 ... # Shutdown is tested as part of the main run flow 

1126 ... return True 

1127 >>> asyncio.run(test_shutdown()) 

1128 True 

1129 """ 

1130 if shutting_down.is_set(): 

1131 return 

1132 shutting_down.set() 

1133 LOGGER.info("Shutting down ...") 

1134 await stdio.stop() 

1135 # Graceful shutdown by setting the shutdown event 

1136 # Use getattr to safely access should_exit attribute 

1137 setattr(uvicorn_server, "should_exit", getattr(uvicorn_server, "should_exit", False) or True) 

1138 

1139 loop = asyncio.get_running_loop() 

1140 for sig in (signal.SIGINT, signal.SIGTERM): 

1141 with suppress(NotImplementedError): # Windows lacks add_signal_handler 

1142 loop.add_signal_handler(sig, lambda *_: asyncio.create_task(_shutdown())) 

1143 

1144 LOGGER.info(f"Bridge ready → http://{host}:{port}{sse_path}") 

1145 await uvicorn_server.serve() 

1146 await _shutdown() # final cleanup 

1147 

1148 

1149async def _run_sse_to_stdio(url: str, oauth2_bearer: Optional[str] = None, timeout: float = 30.0, stdio_command: Optional[str] = None, max_retries: int = 5, initial_retry_delay: float = 1.0) -> None: 

1150 """Run SSE to stdio bridge. 

1151 

1152 Connects to a remote SSE endpoint and bridges it to local stdio. 

1153 Implements proper bidirectional message flow with error handling and retries. 

1154 

1155 Args: 

1156 url: The SSE endpoint URL to connect to. 

1157 oauth2_bearer: Optional OAuth2 bearer token for authentication. Defaults to None. 

1158 timeout: HTTP client timeout in seconds. Defaults to 30.0. 

1159 stdio_command: Optional command to run for local stdio processing. 

1160 If not provided, will simply print SSE messages to stdout. 

1161 max_retries: Maximum number of connection retry attempts. Defaults to 5. 

1162 initial_retry_delay: Initial delay between retries in seconds. Defaults to 1.0. 

1163 

1164 Raises: 

1165 ImportError: If httpx package is not available. 

1166 RuntimeError: If the subprocess fails to create stdin/stdout pipes. 

1167 Exception: For any unexpected error in SSE stream processing. 

1168 

1169 Examples: 

1170 >>> import asyncio 

1171 >>> async def test_sse(): 

1172 ... try: 

1173 ... await _run_sse_to_stdio("http://example.com/sse", None) # doctest: +SKIP 

1174 ... except ImportError as e: 

1175 ... return "httpx" in str(e) 

1176 >>> asyncio.run(test_sse()) # Would return True if httpx not installed # doctest: +SKIP 

1177 """ 

1178 if not httpx: 

1179 raise ImportError("httpx package is required for SSE to stdio bridging") 

1180 

1181 headers = {} 

1182 if oauth2_bearer: 

1183 headers["Authorization"] = f"Bearer {oauth2_bearer}" 

1184 

1185 # If no stdio command provided, use simple mode (just print to stdout) 

1186 if not stdio_command: 

1187 LOGGER.warning("No --stdioCommand provided, running in simple mode (SSE to stdout only)") 

1188 # First-Party 

1189 from mcpgateway.services.http_client_service import get_isolated_http_client # pylint: disable=import-outside-toplevel 

1190 

1191 async with get_isolated_http_client(timeout=timeout, headers=headers, verify=DEFAULT_SSL_VERIFY, connect_timeout=10.0, write_timeout=timeout, pool_timeout=timeout) as client: 

1192 await _simple_sse_pump(client, url, max_retries, initial_retry_delay) 

1193 return 

1194 

1195 # Start the stdio subprocess 

1196 process = await asyncio.create_subprocess_exec( 

1197 *shlex.split(stdio_command), 

1198 stdin=asyncio.subprocess.PIPE, 

1199 stdout=asyncio.subprocess.PIPE, 

1200 stderr=sys.stderr, 

1201 ) 

1202 

1203 if not process.stdin or not process.stdout: 

1204 raise RuntimeError(f"Failed to create subprocess with stdin/stdout pipes for command: {stdio_command}") 

1205 

1206 # Store the message endpoint URL once received 

1207 message_endpoint: Optional[str] = None 

1208 

1209 async def read_stdout(client: httpx.AsyncClient) -> None: 

1210 """Read lines from subprocess stdout and POST to message endpoint. 

1211 

1212 Continuously reads JSON-RPC requests from the subprocess stdout 

1213 and POSTs them to the remote message endpoint obtained from the 

1214 SSE stream's endpoint event. 

1215 

1216 Args: 

1217 client: The HTTP client to use for POSTing messages. 

1218 

1219 Raises: 

1220 RuntimeError: If the process stdout stream is not available. 

1221 

1222 Examples: 

1223 >>> import asyncio 

1224 >>> async def test_read(): 

1225 ... # This is tested as part of the SSE to stdio flow 

1226 ... return True 

1227 >>> asyncio.run(test_read()) 

1228 True 

1229 """ 

1230 if not process.stdout: 

1231 raise RuntimeError("Process stdout not available") 

1232 

1233 while True: 

1234 if not process.stdout: 

1235 raise RuntimeError("Process stdout not available") 

1236 line = await process.stdout.readline() 

1237 if not line: 

1238 break 

1239 

1240 text = line.decode().strip() 

1241 if not text: 

1242 continue 

1243 

1244 LOGGER.debug(f"← stdio: {text}") 

1245 

1246 # Wait for endpoint URL if not yet received 

1247 retry_count = 0 

1248 while not message_endpoint and retry_count < 30: # 30 second timeout 

1249 await asyncio.sleep(1) 

1250 retry_count += 1 

1251 

1252 if not message_endpoint: 

1253 LOGGER.error("No message endpoint received from SSE stream") 

1254 continue 

1255 

1256 # POST the JSON-RPC request to the message endpoint 

1257 try: 

1258 response = await client.post(message_endpoint, content=text, headers={"Content-Type": "application/json"}) 

1259 if response.status_code != 202: 

1260 LOGGER.warning(f"Message endpoint returned {response.status_code}: {response.text}") 

1261 except Exception as e: 

1262 LOGGER.error(f"Failed to POST to message endpoint: {e}") 

1263 

1264 async def pump_sse_to_stdio(client: httpx.AsyncClient) -> None: 

1265 """Stream SSE data from remote endpoint to subprocess stdin. 

1266 

1267 Connects to the remote SSE endpoint with retry logic and forwards 

1268 message events to the subprocess stdin. Properly parses SSE events 

1269 and handles endpoint, message, and keepalive event types. 

1270 

1271 Args: 

1272 client: The HTTP client to use for SSE streaming. 

1273 

1274 Raises: 

1275 HTTPStatusError: If the SSE endpoint returns a non-200 status code. 

1276 Exception: For unexpected errors in SSE stream processing. 

1277 

1278 Examples: 

1279 >>> import asyncio 

1280 >>> async def test_pump(): 

1281 ... # This is tested as part of the SSE to stdio flow 

1282 ... return True 

1283 >>> asyncio.run(test_pump()) 

1284 True 

1285 """ 

1286 nonlocal message_endpoint 

1287 retry_delay = initial_retry_delay 

1288 retry_count = 0 

1289 

1290 while retry_count < max_retries: 

1291 try: 

1292 LOGGER.info(f"Connecting to SSE endpoint: {url}") 

1293 

1294 async with client.stream("GET", url) as response: 

1295 # Check status code if available (real httpx response) 

1296 if hasattr(response, "status_code") and response.status_code != 200: 

1297 if httpx: 

1298 raise httpx.HTTPStatusError(f"SSE endpoint returned {response.status_code}", request=response.request, response=response) 

1299 raise Exception(f"SSE endpoint returned {response.status_code}") 

1300 

1301 # Reset retry counter on successful connection 

1302 retry_count = 0 

1303 retry_delay = initial_retry_delay 

1304 current_event: Optional[SSEEvent] = None 

1305 

1306 async for line in response.aiter_lines(): 

1307 event, is_complete = SSEEvent.parse_sse_line(line, current_event) 

1308 current_event = event 

1309 

1310 if is_complete and current_event: 

1311 LOGGER.debug(f"SSE event: {current_event.event} - {current_event.data[:100]}...") 

1312 

1313 if current_event.event == "endpoint": 

1314 # Store the message endpoint URL 

1315 message_endpoint = current_event.data 

1316 LOGGER.info(f"Received message endpoint: {message_endpoint}") 

1317 

1318 elif current_event.event == "message": 

1319 # Forward JSON-RPC responses to stdio 

1320 if process.stdin: 

1321 process.stdin.write((current_event.data + "\n").encode()) 

1322 await process.stdin.drain() 

1323 LOGGER.debug(f"→ stdio: {current_event.data}") 

1324 

1325 elif current_event.event == "keepalive": 

1326 # Log keepalive but don't forward 

1327 LOGGER.debug("Received keepalive") 

1328 

1329 # Reset for next event 

1330 current_event = None 

1331 

1332 except Exception as e: 

1333 # Check if it's one of the expected httpx exceptions 

1334 if httpx and isinstance(e, (httpx.ConnectError, httpx.HTTPStatusError, httpx.ReadTimeout)): 

1335 retry_count += 1 

1336 if retry_count >= max_retries: 

1337 LOGGER.error(f"Max retries ({max_retries}) exceeded. Giving up.") 

1338 raise 

1339 

1340 LOGGER.warning(f"Connection error: {e}. Retrying in {retry_delay}s... (attempt {retry_count}/{max_retries})") 

1341 await asyncio.sleep(retry_delay) 

1342 retry_delay = min(retry_delay * 2, 30) # Exponential backoff, max 30s 

1343 else: 

1344 LOGGER.error(f"Unexpected error in SSE stream: {e}") 

1345 raise 

1346 

1347 # Run both tasks concurrently 

1348 # First-Party 

1349 from mcpgateway.services.http_client_service import get_isolated_http_client # pylint: disable=import-outside-toplevel 

1350 

1351 async with get_isolated_http_client(timeout=timeout, headers=headers, verify=DEFAULT_SSL_VERIFY, connect_timeout=10.0, write_timeout=timeout, pool_timeout=timeout) as client: 

1352 try: 

1353 await asyncio.gather(read_stdout(client), pump_sse_to_stdio(client)) 

1354 except Exception as e: 

1355 LOGGER.error(f"Bridge error: {e}") 

1356 raise 

1357 finally: 

1358 # Clean up subprocess 

1359 if process.returncode is None: 

1360 process.terminate() 

1361 with suppress(asyncio.TimeoutError): 

1362 await asyncio.wait_for(process.wait(), timeout=5) 

1363 

1364 

1365async def _run_stdio_to_streamable_http( 

1366 cmd: str, 

1367 port: int, 

1368 log_level: str = "info", 

1369 cors: Optional[List[str]] = None, 

1370 host: str = "127.0.0.1", 

1371 stateless: bool = False, 

1372 json_response: bool = False, 

1373) -> None: 

1374 """Run stdio to streamable HTTP bridge. 

1375 

1376 Starts a subprocess and exposes it via streamable HTTP endpoint. Handles graceful 

1377 shutdown on SIGINT/SIGTERM. 

1378 

1379 Args: 

1380 cmd: The command to run as a stdio subprocess. 

1381 port: The port to bind the HTTP server to. 

1382 log_level: The logging level to use. Defaults to "info". 

1383 cors: Optional list of CORS allowed origins. 

1384 host: The host interface to bind to. Defaults to "127.0.0.1" for security. 

1385 stateless: Whether to use stateless mode for streamable HTTP. Defaults to False. 

1386 json_response: Whether to return JSON responses instead of SSE streams. Defaults to False. 

1387 

1388 Raises: 

1389 ImportError: If MCP server components are not available. 

1390 RuntimeError: If subprocess fails to create stdin/stdout pipes. 

1391 

1392 Examples: 

1393 >>> import asyncio 

1394 >>> async def test_streamable_http(): 

1395 ... # Would start a real subprocess and HTTP server 

1396 ... cmd = "echo hello" 

1397 ... port = 9000 

1398 ... # This would normally run the server 

1399 ... return True 

1400 >>> asyncio.run(test_streamable_http()) 

1401 True 

1402 """ 

1403 # MCP components are available, proceed with setup 

1404 

1405 LOGGER.info(f"Starting stdio to streamable HTTP bridge for command: {cmd}") 

1406 

1407 # Create a simple MCP server that will proxy to stdio subprocess 

1408 mcp_server = MCPServer(name="stdio-proxy") 

1409 

1410 # Create subprocess for stdio communication 

1411 process = await asyncio.create_subprocess_exec( 

1412 *shlex.split(cmd), 

1413 stdin=asyncio.subprocess.PIPE, 

1414 stdout=asyncio.subprocess.PIPE, 

1415 stderr=sys.stderr, 

1416 ) 

1417 

1418 if not process.stdin or not process.stdout: 

1419 raise RuntimeError(f"Failed to create subprocess with stdin/stdout pipes for command: {cmd}") 

1420 

1421 # Set up the streamable HTTP session manager with the server 

1422 session_manager = StreamableHTTPSessionManager( 

1423 app=mcp_server, 

1424 stateless=stateless, 

1425 json_response=json_response, 

1426 ) 

1427 

1428 # Create Starlette app to host the streamable HTTP endpoint 

1429 async def handle_mcp(request: Request) -> None: 

1430 """Handle MCP requests via streamable HTTP. 

1431 

1432 Args: 

1433 request: The incoming HTTP request from Starlette. 

1434 

1435 Examples: 

1436 >>> async def test_handle(): 

1437 ... # Mock request handling 

1438 ... class MockRequest: 

1439 ... scope = {"type": "http"} 

1440 ... async def receive(self): return {} 

1441 ... async def send(self, msg): return None 

1442 ... req = MockRequest() 

1443 ... # Would handle the request via session manager 

1444 ... return req is not None 

1445 >>> import asyncio 

1446 >>> asyncio.run(test_handle()) 

1447 True 

1448 """ 

1449 # The session manager handles all the protocol details - Note: I don't like accessing _send directly -JPS 

1450 await session_manager.handle_request(request.scope, request.receive, request._send) # pylint: disable=W0212 

1451 

1452 routes = [ 

1453 Route("/mcp", handle_mcp, methods=["GET", "POST"]), 

1454 Route("/healthz", lambda request: PlainTextResponse("ok"), methods=["GET"]), 

1455 ] 

1456 

1457 app = Starlette(routes=routes) 

1458 

1459 # Add CORS middleware if specified 

1460 if cors: 

1461 app.add_middleware( 

1462 CORSMiddleware, 

1463 allow_origins=cors, 

1464 allow_credentials=True, 

1465 allow_methods=["*"], 

1466 allow_headers=["*"], 

1467 ) 

1468 

1469 # Run the server with Uvicorn 

1470 config = uvicorn.Config( 

1471 app, 

1472 host=host, 

1473 port=port, 

1474 log_level=log_level, 

1475 lifespan="off", 

1476 ) 

1477 uvicorn_server = uvicorn.Server(config) 

1478 

1479 shutting_down = asyncio.Event() 

1480 

1481 async def _shutdown() -> None: 

1482 """Handle graceful shutdown of the streamable HTTP bridge.""" 

1483 if shutting_down.is_set(): 

1484 return 

1485 shutting_down.set() 

1486 LOGGER.info("Shutting down streamable HTTP bridge...") 

1487 if process.returncode is None: 

1488 process.terminate() 

1489 with suppress(asyncio.TimeoutError): 

1490 await asyncio.wait_for(process.wait(), 5) 

1491 # Graceful shutdown by setting the shutdown event 

1492 # Use getattr to safely access should_exit attribute 

1493 setattr(uvicorn_server, "should_exit", getattr(uvicorn_server, "should_exit", False) or True) 

1494 

1495 loop = asyncio.get_running_loop() 

1496 for sig in (signal.SIGINT, signal.SIGTERM): 

1497 with suppress(NotImplementedError): # Windows lacks add_signal_handler 

1498 loop.add_signal_handler(sig, lambda *_: asyncio.create_task(_shutdown())) 

1499 

1500 # Pump messages between stdio and HTTP 

1501 async def pump_stdio_to_http() -> None: 

1502 """Forward messages from subprocess stdout to HTTP responses. 

1503 

1504 Examples: 

1505 >>> async def test(): 

1506 ... # This would pump messages in real usage 

1507 ... return True 

1508 >>> import asyncio 

1509 >>> asyncio.run(test()) 

1510 True 

1511 """ 

1512 while True: 

1513 try: 

1514 if not process.stdout: 

1515 raise RuntimeError("Process stdout not available") 

1516 line = await process.stdout.readline() 

1517 if not line: 

1518 break 

1519 # The session manager will handle routing to appropriate HTTP responses 

1520 # This would need proper integration with session_manager's internal queue 

1521 LOGGER.debug(f"Received from subprocess: {line.decode().strip()}") 

1522 except Exception as e: 

1523 LOGGER.error(f"Error reading from subprocess: {e}") 

1524 break 

1525 

1526 async def pump_http_to_stdio(data: str) -> None: 

1527 """Forward HTTP requests to subprocess stdin. 

1528 

1529 Args: 

1530 data: The data string to send to subprocess stdin. 

1531 

1532 Examples: 

1533 >>> async def test_pump(): 

1534 ... # Would pump data to subprocess 

1535 ... data = '{"method": "test"}' 

1536 ... # In real use, would write to process.stdin 

1537 ... return len(data) > 0 

1538 >>> import asyncio 

1539 >>> asyncio.run(test_pump()) 

1540 True 

1541 """ 

1542 if not process.stdin: 

1543 raise RuntimeError("Process stdin not available") 

1544 process.stdin.write(data.encode() + b"\n") 

1545 await process.stdin.drain() 

1546 

1547 # Note: pump_http_to_stdio will be used when stdio-to-HTTP bridge is fully implemented 

1548 _ = pump_http_to_stdio 

1549 

1550 # Start the pump task 

1551 pump_task = asyncio.create_task(pump_stdio_to_http()) 

1552 

1553 try: 

1554 LOGGER.info(f"Streamable HTTP bridge ready → http://{host}:{port}/mcp") 

1555 await uvicorn_server.serve() 

1556 finally: 

1557 pump_task.cancel() 

1558 await _shutdown() 

1559 

1560 

1561async def _run_streamable_http_to_stdio( 

1562 url: str, 

1563 oauth2_bearer: Optional[str] = None, 

1564 timeout: float = 30.0, 

1565 stdio_command: Optional[str] = None, 

1566 max_retries: int = 5, 

1567 initial_retry_delay: float = 1.0, 

1568) -> None: 

1569 """Run streamable HTTP to stdio bridge. 

1570 

1571 Connects to a remote streamable HTTP endpoint and bridges it to local stdio. 

1572 Implements proper bidirectional message flow with error handling and retries. 

1573 

1574 Args: 

1575 url: The streamable HTTP endpoint URL to connect to. 

1576 oauth2_bearer: Optional OAuth2 bearer token for authentication. Defaults to None. 

1577 timeout: HTTP client timeout in seconds. Defaults to 30.0. 

1578 stdio_command: Optional command to run for local stdio processing. 

1579 If not provided, will simply print messages to stdout. 

1580 max_retries: Maximum number of connection retry attempts. Defaults to 5. 

1581 initial_retry_delay: Initial delay between retries in seconds. Defaults to 1.0. 

1582 

1583 Raises: 

1584 ImportError: If httpx package is not available. 

1585 RuntimeError: If the subprocess fails to create stdin/stdout pipes. 

1586 Exception: For any unexpected error during bridging operations. 

1587 """ 

1588 if not httpx: 

1589 raise ImportError("httpx package is required for streamable HTTP to stdio bridging") 

1590 

1591 headers = {} 

1592 if oauth2_bearer: 

1593 headers["Authorization"] = f"Bearer {oauth2_bearer}" 

1594 

1595 # Ensure URL ends with /mcp if not already 

1596 if not url.endswith("/mcp"): 

1597 url = url.rstrip("/") + "/mcp" 

1598 

1599 # If no stdio command provided, use simple mode (just print to stdout) 

1600 if not stdio_command: 

1601 LOGGER.warning("No --stdioCommand provided, running in simple mode (streamable HTTP to stdout only)") 

1602 # First-Party 

1603 from mcpgateway.services.http_client_service import get_isolated_http_client # pylint: disable=import-outside-toplevel 

1604 

1605 async with get_isolated_http_client(timeout=timeout, headers=headers, verify=DEFAULT_SSL_VERIFY, connect_timeout=10.0, write_timeout=timeout, pool_timeout=timeout) as client: 

1606 await _simple_streamable_http_pump(client, url, max_retries, initial_retry_delay) 

1607 return 

1608 

1609 # Start the stdio subprocess 

1610 process = await asyncio.create_subprocess_exec( 

1611 *shlex.split(stdio_command), 

1612 stdin=asyncio.subprocess.PIPE, 

1613 stdout=asyncio.subprocess.PIPE, 

1614 stderr=sys.stderr, 

1615 ) 

1616 

1617 if not process.stdin or not process.stdout: 

1618 raise RuntimeError(f"Failed to create subprocess with stdin/stdout pipes for command: {stdio_command}") 

1619 

1620 async def read_stdout(client: httpx.AsyncClient) -> None: 

1621 """Read lines from subprocess stdout and POST to streamable HTTP endpoint. 

1622 

1623 Args: 

1624 client: The HTTP client to use for POSTing messages. 

1625 

1626 Raises: 

1627 RuntimeError: If the process stdout stream is not available. 

1628 """ 

1629 if not process.stdout: 

1630 raise RuntimeError("Process stdout not available") 

1631 

1632 while True: 

1633 if not process.stdout: 

1634 raise RuntimeError("Process stdout not available") 

1635 line = await process.stdout.readline() 

1636 if not line: 

1637 break 

1638 

1639 text = line.decode().strip() 

1640 if not text: 

1641 continue 

1642 

1643 LOGGER.debug(f"← stdio: {text}") 

1644 

1645 # POST the JSON-RPC request to the streamable HTTP endpoint 

1646 try: 

1647 if CONTENT_TYPE == "application/x-www-form-urlencoded": 

1648 # If text is JSON, parse and encode as form 

1649 try: 

1650 payload = orjson.loads(text) 

1651 body = urlencode(payload) 

1652 except Exception: 

1653 body = text 

1654 response = await client.post(url, content=body, headers=headers) 

1655 else: 

1656 response = await client.post(url, content=text, headers=headers) 

1657 if response.status_code == 200: 

1658 # Handle JSON response 

1659 response_data = response.text 

1660 if response_data and process.stdin: 

1661 process.stdin.write((response_data + "\n").encode()) 

1662 await process.stdin.drain() 

1663 LOGGER.debug(f"→ stdio: {response_data}") 

1664 else: 

1665 LOGGER.warning(f"Streamable HTTP endpoint returned {response.status_code}: {response.text}") 

1666 except Exception as e: 

1667 LOGGER.error(f"Failed to POST to streamable HTTP endpoint: {e}") 

1668 

1669 async def pump_streamable_http_to_stdio(client: httpx.AsyncClient) -> None: 

1670 """Stream data from remote streamable HTTP endpoint to subprocess stdin. 

1671 

1672 Args: 

1673 client: The HTTP client to use for streamable HTTP streaming. 

1674 

1675 Raises: 

1676 httpx.HTTPStatusError: If the streamable HTTP endpoint returns a non-200 status code. 

1677 Exception: For unexpected errors in streamable HTTP stream processing. 

1678 """ 

1679 retry_delay = initial_retry_delay 

1680 retry_count = 0 

1681 

1682 while retry_count < max_retries: 

1683 try: 

1684 LOGGER.info(f"Connecting to streamable HTTP endpoint: {url}") 

1685 

1686 # For streamable HTTP, we need to handle both SSE streams and JSON responses 

1687 # Try SSE first (for stateful sessions or when SSE is preferred) 

1688 async with client.stream("GET", url, headers={"Accept": "text/event-stream"}) as response: 

1689 if response.status_code != 200: 

1690 if httpx: 

1691 raise httpx.HTTPStatusError(f"Streamable HTTP endpoint returned {response.status_code}", request=response.request, response=response) 

1692 raise Exception(f"Streamable HTTP endpoint returned {response.status_code}") 

1693 

1694 # Reset retry counter on successful connection 

1695 retry_count = 0 

1696 retry_delay = initial_retry_delay 

1697 

1698 async for line in response.aiter_lines(): 

1699 if line.startswith("data: "): 

1700 data = line[6:] # Remove "data: " prefix 

1701 if data and process.stdin: 

1702 process.stdin.write((data + "\n").encode()) 

1703 await process.stdin.drain() 

1704 LOGGER.debug(f"→ stdio: {data}") 

1705 

1706 except Exception as e: 

1707 if httpx and isinstance(e, (httpx.ConnectError, httpx.HTTPStatusError, httpx.ReadTimeout)): 

1708 retry_count += 1 

1709 if retry_count >= max_retries: 

1710 LOGGER.error(f"Max retries ({max_retries}) exceeded. Giving up.") 

1711 raise 

1712 

1713 LOGGER.warning(f"Connection error: {e}. Retrying in {retry_delay}s... (attempt {retry_count}/{max_retries})") 

1714 await asyncio.sleep(retry_delay) 

1715 retry_delay = min(retry_delay * 2, 30) # Exponential backoff, max 30s 

1716 else: 

1717 LOGGER.error(f"Unexpected error in streamable HTTP stream: {e}") 

1718 raise 

1719 

1720 # Run both tasks concurrently 

1721 # First-Party 

1722 from mcpgateway.services.http_client_service import get_isolated_http_client # pylint: disable=import-outside-toplevel 

1723 

1724 async with get_isolated_http_client(timeout=timeout, headers=headers, verify=DEFAULT_SSL_VERIFY, connect_timeout=10.0, write_timeout=timeout, pool_timeout=timeout) as client: 

1725 try: 

1726 await asyncio.gather(read_stdout(client), pump_streamable_http_to_stdio(client)) 

1727 except Exception as e: 

1728 LOGGER.error(f"Bridge error: {e}") 

1729 raise 

1730 finally: 

1731 # Clean up subprocess 

1732 if process.returncode is None: 

1733 process.terminate() 

1734 with suppress(asyncio.TimeoutError): 

1735 await asyncio.wait_for(process.wait(), timeout=5) 

1736 

1737 

1738async def _simple_streamable_http_pump(client: "Any", url: str, max_retries: int, initial_retry_delay: float) -> None: 

1739 """Simple streamable HTTP pump that just prints messages to stdout. 

1740 

1741 Used when no stdio command is provided to bridge streamable HTTP to stdout directly. 

1742 

1743 Args: 

1744 client: The HTTP client to use for streamable HTTP streaming. 

1745 url: The streamable HTTP endpoint URL to connect to. 

1746 max_retries: Maximum number of connection retry attempts. 

1747 initial_retry_delay: Initial delay between retries in seconds. 

1748 

1749 Raises: 

1750 Exception: For unexpected errors in streamable HTTP stream processing including 

1751 HTTPStatusError if the endpoint returns a non-200 status code. 

1752 """ 

1753 retry_delay = initial_retry_delay 

1754 retry_count = 0 

1755 

1756 while retry_count < max_retries: 

1757 try: 

1758 LOGGER.info(f"Connecting to streamable HTTP endpoint: {url}") 

1759 

1760 # Try to get SSE stream 

1761 async with client.stream("GET", url, headers={"Accept": "text/event-stream"}) as response: 

1762 if response.status_code != 200: 

1763 if httpx: 

1764 raise httpx.HTTPStatusError(f"Streamable HTTP endpoint returned {response.status_code}", request=response.request, response=response) 

1765 raise Exception(f"Streamable HTTP endpoint returned {response.status_code}") 

1766 

1767 # Reset retry counter on successful connection 

1768 retry_count = 0 

1769 retry_delay = initial_retry_delay 

1770 

1771 async for line in response.aiter_lines(): 

1772 if line.startswith("data: "): 

1773 data = line[6:] # Remove "data: " prefix 

1774 if data: 

1775 print(data) 

1776 LOGGER.debug(f"Received: {data}") 

1777 

1778 except Exception as e: 

1779 if httpx and isinstance(e, (httpx.ConnectError, httpx.HTTPStatusError, httpx.ReadTimeout)): 

1780 retry_count += 1 

1781 if retry_count >= max_retries: 

1782 LOGGER.error(f"Max retries ({max_retries}) exceeded. Giving up.") 

1783 raise 

1784 

1785 LOGGER.warning(f"Connection error: {e}. Retrying in {retry_delay}s... (attempt {retry_count}/{max_retries})") 

1786 await asyncio.sleep(retry_delay) 

1787 retry_delay = min(retry_delay * 2, 30) # Exponential backoff, max 30s 

1788 else: 

1789 LOGGER.error(f"Unexpected error in streamable HTTP stream: {e}") 

1790 raise 

1791 

1792 

1793async def _run_multi_protocol_server( # pylint: disable=too-many-positional-arguments 

1794 cmd: str, 

1795 port: int, 

1796 log_level: str = "info", 

1797 cors: Optional[List[str]] = None, 

1798 host: str = "127.0.0.1", 

1799 expose_sse: bool = False, 

1800 expose_streamable_http: bool = False, 

1801 sse_path: str = "/sse", 

1802 message_path: str = "/message", 

1803 keep_alive: float = KEEP_ALIVE_INTERVAL, 

1804 stateless: bool = False, 

1805 json_response: bool = False, 

1806 header_mappings: Optional[NormalizedMappings] = None, 

1807) -> None: 

1808 """Run a stdio server and expose it via multiple protocols simultaneously. 

1809 

1810 Args: 

1811 cmd: The command to run as a stdio subprocess. 

1812 port: The port to bind the HTTP server to. 

1813 log_level: The logging level to use. Defaults to "info". 

1814 cors: Optional list of CORS allowed origins. 

1815 host: The host interface to bind to. Defaults to "127.0.0.1". 

1816 expose_sse: Whether to expose via SSE protocol. 

1817 expose_streamable_http: Whether to expose via streamable HTTP protocol. 

1818 sse_path: Path for SSE endpoint. Defaults to "/sse". 

1819 message_path: Path for message endpoint. Defaults to "/message". 

1820 keep_alive: Keep-alive interval for SSE. Defaults to KEEP_ALIVE_INTERVAL. 

1821 stateless: Whether to use stateless mode for streamable HTTP. 

1822 json_response: Whether to return JSON responses for streamable HTTP. 

1823 header_mappings: Optional mapping of HTTP headers to environment variables. 

1824 """ 

1825 LOGGER.info(f"Starting multi-protocol server for command: {cmd}") 

1826 LOGGER.info(f"Protocols: SSE={expose_sse}, StreamableHTTP={expose_streamable_http}") 

1827 

1828 # Create a shared pubsub whenever either protocol needs stdout observations 

1829 pubsub = _PubSub() if (expose_sse or expose_streamable_http) else None 

1830 

1831 # Create the stdio endpoint 

1832 stdio = StdIOEndpoint(cmd, pubsub, header_mappings=header_mappings) if (expose_sse or expose_streamable_http) and pubsub else None 

1833 

1834 # Create fastapi app and middleware 

1835 app = FastAPI() 

1836 

1837 # Add CORS middleware if specified 

1838 if cors: 

1839 app.add_middleware( 

1840 CORSMiddleware, 

1841 allow_origins=cors, 

1842 allow_credentials=True, 

1843 allow_methods=["*"], 

1844 allow_headers=["*"], 

1845 ) 

1846 

1847 # Start stdio if at least one transport requires it 

1848 if stdio: 

1849 await stdio.start() 

1850 

1851 # SSE endpoints 

1852 if expose_sse and stdio and pubsub: 

1853 

1854 @app.get(sse_path) 

1855 async def get_sse(request: Request) -> EventSourceResponse: 

1856 """SSE endpoint. 

1857 

1858 Args: 

1859 request: The incoming HTTP request. 

1860 

1861 Returns: 

1862 EventSourceResponse: Server-sent events stream. 

1863 """ 

1864 if not pubsub: 

1865 raise RuntimeError("PubSub not available") 

1866 

1867 # Extract environment variables from headers if dynamic env is enabled 

1868 additional_env_vars = {} 

1869 if header_mappings and stdio: 

1870 request_headers = dict(request.headers) 

1871 additional_env_vars = extract_env_vars_from_headers(request_headers, header_mappings) 

1872 

1873 # Restart stdio endpoint with new environment variables 

1874 if additional_env_vars: 

1875 LOGGER.info(f"Restarting stdio endpoint with {len(additional_env_vars)} environment variables") 

1876 await stdio.stop() # Stop existing process 

1877 await stdio.start(additional_env_vars) # Start with new env vars 

1878 

1879 queue = pubsub.subscribe() 

1880 session_id = uuid.uuid4().hex 

1881 

1882 async def event_gen() -> AsyncIterator[Dict[str, Any]]: 

1883 """Generate SSE events for the client. 

1884 

1885 Yields: 

1886 Dict[str, Any]: SSE event data with event type and payload. 

1887 """ 

1888 endpoint_url = f"{str(request.base_url).rstrip('/')}{message_path}?session_id={session_id}" 

1889 yield { 

1890 "event": "endpoint", 

1891 "data": endpoint_url, 

1892 "retry": int(keep_alive * 1000), 

1893 } 

1894 

1895 if DEFAULT_KEEPALIVE_ENABLED: 

1896 yield {"event": "keepalive", "data": "{}", "retry": keep_alive * 1000} 

1897 

1898 try: 

1899 while True: 

1900 if await request.is_disconnected(): 

1901 break 

1902 

1903 try: 

1904 timeout = keep_alive if DEFAULT_KEEPALIVE_ENABLED else None 

1905 msg = await asyncio.wait_for(queue.get(), timeout) 

1906 yield {"event": "message", "data": msg.rstrip()} 

1907 except asyncio.TimeoutError: 

1908 if DEFAULT_KEEPALIVE_ENABLED: 

1909 yield { 

1910 "event": "keepalive", 

1911 "data": "{}", 

1912 "retry": keep_alive * 1000, 

1913 } 

1914 finally: 

1915 if pubsub: 

1916 pubsub.unsubscribe(queue) 

1917 

1918 return EventSourceResponse( 

1919 event_gen(), 

1920 headers={ 

1921 "Cache-Control": "no-cache", 

1922 "Connection": "keep-alive", 

1923 "X-Accel-Buffering": "no", 

1924 }, 

1925 ) 

1926 

1927 @app.post(message_path, status_code=status.HTTP_202_ACCEPTED) 

1928 async def post_message(raw: Request, session_id: str | None = None) -> Response: 

1929 """Message endpoint for SSE. 

1930 

1931 Args: 

1932 raw: The incoming HTTP request. 

1933 session_id: Optional session ID for correlation. 

1934 

1935 Returns: 

1936 Response: Acknowledgement of message receipt. 

1937 """ 

1938 _ = session_id 

1939 

1940 # Extract environment variables from headers if dynamic env is enabled 

1941 additional_env_vars = {} 

1942 if header_mappings and stdio: 

1943 request_headers = dict(raw.headers) 

1944 additional_env_vars = extract_env_vars_from_headers(request_headers, header_mappings) 

1945 

1946 # Only restart if we have new environment variables 

1947 if additional_env_vars: 

1948 LOGGER.info(f"Restarting stdio endpoint with {len(additional_env_vars)} environment variables") 

1949 await stdio.stop() # Stop existing process 

1950 await stdio.start(additional_env_vars) # Start with new env vars 

1951 await asyncio.sleep(0.5) # Give process time to initialize 

1952 

1953 # Ensure stdio endpoint is running 

1954 if stdio and not stdio.is_running(): 

1955 LOGGER.info("Starting stdio endpoint (was not running)") 

1956 await stdio.start() 

1957 await asyncio.sleep(0.5) # Give process time to initialize 

1958 

1959 payload = await raw.body() 

1960 try: 

1961 orjson.loads(payload) 

1962 except Exception as exc: 

1963 return PlainTextResponse( 

1964 f"Invalid JSON payload: {exc}", 

1965 status_code=status.HTTP_400_BAD_REQUEST, 

1966 ) 

1967 if not stdio: 

1968 raise RuntimeError("Stdio endpoint not available") 

1969 await stdio.send(payload.decode().rstrip() + "\n") 

1970 return PlainTextResponse("forwarded", status_code=status.HTTP_202_ACCEPTED) 

1971 

1972 # Add health check 

1973 @app.get("/healthz") 

1974 async def health() -> Response: 

1975 """Health check endpoint. 

1976 

1977 Returns: 

1978 Response: Health status response. 

1979 """ 

1980 return PlainTextResponse("ok") 

1981 

1982 # Streamable HTTP support 

1983 streamable_server = None 

1984 streamable_manager = None 

1985 streamable_context = None 

1986 

1987 # Keep a reference to the original FastAPI app so we can wrap it with an ASGI 

1988 # layer that delegates `/mcp` scopes to the StreamableHTTPSessionManager if present. 

1989 original_app = app 

1990 

1991 if expose_streamable_http: 

1992 # Create an MCP server instance 

1993 streamable_server = MCPServer("stdio-proxy") 

1994 

1995 # Set up the streamable HTTP session manager 

1996 streamable_manager = StreamableHTTPSessionManager( 

1997 app=streamable_server, 

1998 stateless=stateless, 

1999 json_response=json_response, 

2000 ) 

2001 

2002 # Register POST /mcp on the FastAPI app as the canonical client->server POST 

2003 # path for Streamable HTTP. This forwards JSON-RPC notifications/requests to stdio. 

2004 @original_app.post("/mcp") 

2005 async def mcp_post(request: Request) -> Response: 

2006 """ 

2007 Handles POST requests to the `/mcp` endpoint, forwarding JSON payloads to stdio 

2008 and optionally waiting for a correlated response. 

2009 

2010 The request body is expected to be a JSON object or newline-delimited JSON. 

2011 If the JSON includes an "id" field, the function attempts to match it with 

2012 a response from stdio using a pubsub queue, within a timeout period. 

2013 

2014 Args: 

2015 request (Request): The incoming FastAPI request containing the JSON payload. 

2016 

2017 Returns: 

2018 Response: A FastAPI Response object. 

2019 - 200 OK with matched JSON response if correlation succeeds. 

2020 - 202 Accepted if no matching response is received in time or for notifications. 

2021 - 400 Bad Request if the payload is not valid JSON. 

2022 

2023 Example: 

2024 >>> import httpx 

2025 >>> response = httpx.post("http://localhost:8000/mcp", json={"id": 123, "method": "ping"}) 

2026 >>> response.status_code in (200, 202) 

2027 True 

2028 >>> response.text # May be the matched JSON or "accepted" 

2029 '{"id": 123, "result": "pong"}' # or "accepted" 

2030 """ 

2031 # Read and validate JSON 

2032 body = await request.body() 

2033 try: 

2034 obj = orjson.loads(body) 

2035 except Exception as exc: 

2036 return PlainTextResponse(f"Invalid JSON payload: {exc}", status_code=status.HTTP_400_BAD_REQUEST) 

2037 

2038 # Forward raw newline-delimited JSON to stdio 

2039 if not stdio: 

2040 raise RuntimeError("Stdio endpoint not available") 

2041 await stdio.send(body.decode().rstrip() + "\n") 

2042 

2043 # If it's a request (has an id) -> attempt to correlate response from stdio 

2044 if isinstance(obj, dict) and "id" in obj: 

2045 if not pubsub: 

2046 return PlainTextResponse("accepted", status_code=status.HTTP_202_ACCEPTED) 

2047 

2048 queue = pubsub.subscribe() 

2049 try: 

2050 timeout = 10.0 # seconds; tuneable 

2051 deadline = asyncio.get_event_loop().time() + timeout 

2052 while True: 

2053 remaining = max(0.0, deadline - asyncio.get_event_loop().time()) 

2054 if remaining == 0: 

2055 break 

2056 try: 

2057 msg = await asyncio.wait_for(queue.get(), timeout=remaining) 

2058 except asyncio.TimeoutError: 

2059 break 

2060 

2061 # stdio stdout lines may contain JSON objects or arrays 

2062 try: 

2063 parsed = orjson.loads(msg) 

2064 except (orjson.JSONDecodeError, ValueError): 

2065 # not JSON -> skip 

2066 continue 

2067 

2068 candidates = parsed if isinstance(parsed, list) else [parsed] 

2069 for candidate in candidates: 

2070 if isinstance(candidate, dict) and candidate.get("id") == obj.get("id"): 

2071 # return the matched response as JSON 

2072 return ORJSONResponse(candidate) 

2073 

2074 # timeout -> accept and return 202 

2075 return PlainTextResponse("accepted (no response yet)", status_code=status.HTTP_202_ACCEPTED) 

2076 finally: 

2077 if pubsub: 

2078 pubsub.unsubscribe(queue) 

2079 

2080 # Notification -> return 202 

2081 return PlainTextResponse("accepted", status_code=status.HTTP_202_ACCEPTED) 

2082 

2083 # ASGI wrapper to route GET/other /mcp scopes to streamable_manager.handle_request 

2084 async def mcp_asgi_wrapper(scope: Scope, receive: Receive, send: Send) -> None: 

2085 """ 

2086 ASGI middleware that intercepts HTTP requests to the `/mcp` endpoint. 

2087 

2088 If the request is an HTTP call to `/mcp` and a `streamable_manager` is available, 

2089 it can handle the request (currently commented out). All other requests are 

2090 passed to the original FastAPI application. 

2091 

2092 Args: 

2093 scope (Scope): The ASGI scope dictionary containing request metadata. 

2094 receive (Receive): An awaitable that yields incoming ASGI events. 

2095 send (Send): An awaitable used to send ASGI events. 

2096 """ 

2097 if scope.get("type") == "http" and scope.get("path") == "/mcp" and streamable_manager: 

2098 # Let StreamableHTTPSessionManager handle session-oriented streaming 

2099 # await streamable_manager.handle_request(scope, receive, send) 

2100 await original_app(scope, receive, send) 

2101 else: 

2102 # Delegate everything else to the original FastAPI app 

2103 await original_app(scope, receive, send) 

2104 

2105 # Replace the app used by uvicorn with the ASGI wrapper 

2106 app = mcp_asgi_wrapper # type: ignore[assignment] 

2107 

2108 # ---------------------- Server lifecycle ---------------------- 

2109 config = uvicorn.Config( 

2110 app, 

2111 host=host, 

2112 port=port, 

2113 log_level=log_level, 

2114 lifespan="off", 

2115 ) 

2116 server = uvicorn.Server(config) 

2117 

2118 shutting_down = asyncio.Event() 

2119 

2120 async def _shutdown() -> None: 

2121 """Handle graceful shutdown.""" 

2122 if shutting_down.is_set(): 

2123 return 

2124 shutting_down.set() 

2125 LOGGER.info("Shutting down multi-protocol server...") 

2126 if stdio: 

2127 await stdio.stop() 

2128 # Streamable HTTP cleanup handled by server shutdown 

2129 # Graceful shutdown by setting the shutdown event 

2130 # Use getattr to safely access should_exit attribute 

2131 setattr(server, "should_exit", getattr(server, "should_exit", False) or True) 

2132 

2133 loop = asyncio.get_running_loop() 

2134 for sig in (signal.SIGINT, signal.SIGTERM): 

2135 with suppress(NotImplementedError): 

2136 loop.add_signal_handler(sig, lambda *_: asyncio.create_task(_shutdown())) 

2137 

2138 # If we have a streamable manager, start its context so it can accept ASGI /mcp 

2139 if streamable_manager: 

2140 streamable_context = streamable_manager.run() 

2141 await streamable_context.__aenter__() # pylint: disable=unnecessary-dunder-call,no-member 

2142 

2143 # Log available endpoints 

2144 endpoints = [] 

2145 if expose_sse: 

2146 endpoints.append(f"SSE: http://{host}:{port}{sse_path}") 

2147 if expose_streamable_http: 

2148 endpoints.append(f"StreamableHTTP: http://{host}:{port}/mcp") 

2149 

2150 LOGGER.info(f"Multi-protocol server ready → {', '.join(endpoints)}") 

2151 

2152 try: 

2153 await server.serve() 

2154 finally: 

2155 await _shutdown() 

2156 # Clean up streamable HTTP context with timeout to prevent spin loop 

2157 # if tasks don't respond to cancellation (anyio _deliver_cancellation issue) 

2158 if streamable_context: 

2159 # Get cleanup timeout from config (with fallback for standalone usage) 

2160 try: 

2161 # First-Party 

2162 from mcpgateway.config import settings as cfg # pylint: disable=import-outside-toplevel 

2163 

2164 cleanup_timeout = cfg.mcp_session_pool_cleanup_timeout 

2165 except Exception: 

2166 cleanup_timeout = 5.0 

2167 # Use anyio.move_on_after instead of asyncio.wait_for to properly propagate 

2168 # cancellation through anyio's cancel scope system (prevents orphaned spinning tasks) 

2169 with anyio.move_on_after(cleanup_timeout) as cleanup_scope: 

2170 try: 

2171 await streamable_context.__aexit__(None, None, None) # pylint: disable=unnecessary-dunder-call,no-member 

2172 except Exception as e: 

2173 LOGGER.debug(f"Error cleaning up streamable HTTP context: {e}") 

2174 if cleanup_scope.cancelled_caught: 

2175 LOGGER.warning("Streamable HTTP context cleanup timed out - proceeding anyway") 

2176 

2177 

2178async def _simple_sse_pump(client: "Any", url: str, max_retries: int, initial_retry_delay: float) -> None: 

2179 """Simple SSE pump that just prints messages to stdout. 

2180 

2181 Used when no stdio command is provided to bridge SSE to stdout directly. 

2182 

2183 Args: 

2184 client: The HTTP client to use for SSE streaming. 

2185 url: The SSE endpoint URL to connect to. 

2186 max_retries: Maximum number of connection retry attempts. 

2187 initial_retry_delay: Initial delay between retries in seconds. 

2188 

2189 Raises: 

2190 HTTPStatusError: If the SSE endpoint returns a non-200 status code. 

2191 Exception: For unexpected errors in SSE stream processing. 

2192 """ 

2193 retry_delay = initial_retry_delay 

2194 retry_count = 0 

2195 

2196 while retry_count < max_retries: 

2197 try: 

2198 LOGGER.info(f"Connecting to SSE endpoint: {url}") 

2199 

2200 async with client.stream("GET", url) as response: 

2201 # Check status code if available (real httpx response) 

2202 if hasattr(response, "status_code") and response.status_code != 200: 

2203 if httpx: 

2204 raise httpx.HTTPStatusError(f"SSE endpoint returned {response.status_code}", request=response.request, response=response) 

2205 raise Exception(f"SSE endpoint returned {response.status_code}") 

2206 

2207 # Reset retry counter on successful connection 

2208 retry_count = 0 

2209 retry_delay = initial_retry_delay 

2210 current_event: Optional[SSEEvent] = None 

2211 

2212 async for line in response.aiter_lines(): 

2213 event, is_complete = SSEEvent.parse_sse_line(line, current_event) 

2214 current_event = event 

2215 

2216 if is_complete and current_event: 

2217 if current_event.event == "endpoint": 

2218 LOGGER.info(f"Received message endpoint: {current_event.data}") 

2219 elif current_event.event == "message": 

2220 # Just print the message to stdout 

2221 print(current_event.data) 

2222 elif current_event.event == "keepalive": 

2223 LOGGER.debug("Received keepalive") 

2224 

2225 # Reset for next event 

2226 current_event = None 

2227 

2228 except Exception as e: 

2229 # Check if it's one of the expected httpx exceptions 

2230 if httpx and isinstance(e, (httpx.ConnectError, httpx.HTTPStatusError, httpx.ReadTimeout)): 

2231 retry_count += 1 

2232 if retry_count >= max_retries: 

2233 LOGGER.error(f"Max retries ({max_retries}) exceeded. Giving up.") 

2234 raise 

2235 

2236 LOGGER.warning(f"Connection error: {e}. Retrying in {retry_delay}s... (attempt {retry_count}/{max_retries})") 

2237 await asyncio.sleep(retry_delay) 

2238 retry_delay = min(retry_delay * 2, 30) # Exponential backoff, max 30s 

2239 else: 

2240 LOGGER.error(f"Unexpected error in SSE stream: {e}") 

2241 raise 

2242 

2243 

2244def start_streamable_http_stdio( 

2245 cmd: str, 

2246 port: int, 

2247 log_level: str, 

2248 cors: Optional[List[str]], 

2249 host: str = "127.0.0.1", 

2250 stateless: bool = False, 

2251 json_response: bool = False, 

2252) -> None: 

2253 """Start stdio to streamable HTTP bridge. 

2254 

2255 Entry point for starting a stdio to streamable HTTP bridge server. 

2256 

2257 Args: 

2258 cmd: The command to run as a stdio subprocess. 

2259 port: The port to bind the HTTP server to. 

2260 log_level: The logging level to use. 

2261 cors: Optional list of CORS allowed origins. 

2262 host: The host interface to bind to. Defaults to "127.0.0.1". 

2263 stateless: Whether to use stateless mode. Defaults to False. 

2264 json_response: Whether to return JSON responses. Defaults to False. 

2265 

2266 Returns: 

2267 None: This function does not return a value. 

2268 """ 

2269 return asyncio.run(_run_stdio_to_streamable_http(cmd, port, log_level, cors, host, stateless, json_response)) 

2270 

2271 

2272def start_streamable_http_client(url: str, bearer_token: Optional[str] = None, timeout: float = 30.0, stdio_command: Optional[str] = None) -> None: 

2273 """Start streamable HTTP to stdio bridge. 

2274 

2275 Entry point for starting a streamable HTTP to stdio bridge client. 

2276 

2277 Args: 

2278 url: The streamable HTTP endpoint URL to connect to. 

2279 bearer_token: Optional OAuth2 bearer token for authentication. Defaults to None. 

2280 timeout: HTTP client timeout in seconds. Defaults to 30.0. 

2281 stdio_command: Optional command to run for local stdio processing. 

2282 

2283 Returns: 

2284 None: This function does not return a value. 

2285 """ 

2286 return asyncio.run(_run_streamable_http_to_stdio(url, bearer_token, timeout, stdio_command)) 

2287 

2288 

2289def start_stdio( 

2290 cmd: str, port: int, log_level: str, cors: Optional[List[str]], host: str = "127.0.0.1", sse_path: str = "/sse", message_path: str = "/message", keep_alive: float = KEEP_ALIVE_INTERVAL 

2291) -> None: 

2292 """Start stdio bridge. 

2293 

2294 Entry point for starting a stdio to SSE bridge server. 

2295 

2296 Args: 

2297 cmd: The command to run as a stdio subprocess. 

2298 port: The port to bind the HTTP server to. 

2299 log_level: The logging level to use. 

2300 cors: Optional list of CORS allowed origins. 

2301 host: The host interface to bind to. Defaults to "127.0.0.1". 

2302 sse_path: Path for the SSE endpoint. Defaults to "/sse". 

2303 message_path: Path for the message endpoint. Defaults to "/message". 

2304 keep_alive: Keep-alive interval in seconds. Defaults to KEEP_ALIVE_INTERVAL. 

2305 

2306 Returns: 

2307 None: This function does not return a value. 

2308 

2309 Examples: 

2310 >>> # Test parameter validation 

2311 >>> isinstance(KEEP_ALIVE_INTERVAL, int) 

2312 True 

2313 >>> KEEP_ALIVE_INTERVAL > 0 

2314 True 

2315 >>> start_stdio("uvx mcp-server-git", 9000, "info", None) # doctest: +SKIP 

2316 """ 

2317 return asyncio.run(_run_stdio_to_sse(cmd, port, log_level, cors, host, sse_path, message_path, keep_alive)) 

2318 

2319 

2320def start_sse(url: str, bearer_token: Optional[str] = None, timeout: float = 30.0, stdio_command: Optional[str] = None) -> None: 

2321 """Start SSE bridge. 

2322 

2323 Entry point for starting an SSE to stdio bridge client. 

2324 

2325 Examples: 

2326 >>> # Test parameter defaults 

2327 >>> timeout_default = 30.0 

2328 >>> isinstance(timeout_default, float) 

2329 True 

2330 >>> timeout_default > 0 

2331 True 

2332 

2333 Args: 

2334 url: The SSE endpoint URL to connect to. 

2335 bearer_token: Optional OAuth2 bearer token for authentication. Defaults to None. 

2336 timeout: HTTP client timeout in seconds. Defaults to 30.0. 

2337 stdio_command: Optional command to run for local stdio processing. 

2338 

2339 Returns: 

2340 None: This function does not return a value. 

2341 

2342 Examples: 

2343 >>> start_sse("http://example.com/sse", "token123") # doctest: +SKIP 

2344 """ 

2345 return asyncio.run(_run_sse_to_stdio(url, bearer_token, timeout, stdio_command)) 

2346 

2347 

2348def main(argv: Optional[Sequence[str]] | None = None) -> None: 

2349 """Entry point for the translate module. 

2350 

2351 Configures logging, parses arguments, and starts the appropriate bridge 

2352 based on command line options. Handles keyboard interrupts gracefully. 

2353 

2354 Args: 

2355 argv: Optional sequence of command line arguments. If None, uses sys.argv[1:]. 

2356 

2357 Examples: 

2358 >>> # Test argument parsing 

2359 >>> try: 

2360 ... main(["--stdio", "cat", "--port", "9000"]) # doctest: +SKIP 

2361 ... except SystemExit: 

2362 ... pass # Would normally start the server 

2363 """ 

2364 args = _parse_args(argv or sys.argv[1:]) 

2365 logging.basicConfig( 

2366 level=getattr(logging, args.logLevel.upper(), logging.INFO), 

2367 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 

2368 datefmt="%Y-%m-%dT%H:%M:%S", 

2369 ) 

2370 

2371 # Parse header mappings if dynamic environment injection is enabled 

2372 # Pre-normalize mappings once at startup for O(1) lookups per request 

2373 header_mappings: NormalizedMappings | None = None 

2374 if getattr(args, "enable_dynamic_env", False): 

2375 try: 

2376 raw_mappings = parse_header_mappings(getattr(args, "header_to_env", [])) 

2377 header_mappings = NormalizedMappings(raw_mappings) 

2378 LOGGER.info(f"Dynamic environment injection enabled with {len(header_mappings)} header mappings") 

2379 except Exception as e: 

2380 LOGGER.error(f"Failed to parse header mappings: {e}") 

2381 raise 

2382 

2383 try: 

2384 # Handle gRPC server exposure 

2385 if getattr(args, "grpc", None): 

2386 # First-Party 

2387 from mcpgateway.translate_grpc import expose_grpc_via_sse # pylint: disable=import-outside-toplevel 

2388 

2389 # Parse metadata 

2390 metadata = {} 

2391 if getattr(args, "grpc_metadata", None): 

2392 for item in args.grpc_metadata: 

2393 if "=" in item: 

2394 key, value = item.split("=", 1) 

2395 metadata[key] = value 

2396 

2397 asyncio.run( 

2398 expose_grpc_via_sse( 

2399 target=args.grpc, 

2400 port=args.port, 

2401 tls_enabled=getattr(args, "grpc_tls", False), 

2402 tls_cert=getattr(args, "grpc_cert", None), 

2403 tls_key=getattr(args, "grpc_key", None), 

2404 metadata=metadata, 

2405 ) 

2406 ) 

2407 

2408 # Handle local stdio server exposure 

2409 elif args.stdio: 

2410 # Check which protocols to expose 

2411 expose_sse = getattr(args, "expose_sse", False) 

2412 expose_streamable_http = getattr(args, "expose_streamable_http", False) 

2413 

2414 # If no protocol specified, default to SSE for backward compatibility 

2415 if not expose_sse and not expose_streamable_http: 

2416 expose_sse = True 

2417 

2418 # Use multi-protocol server 

2419 asyncio.run( 

2420 _run_multi_protocol_server( 

2421 cmd=args.stdio, 

2422 port=args.port, 

2423 log_level=args.logLevel, 

2424 cors=args.cors, 

2425 host=args.host, 

2426 expose_sse=expose_sse, 

2427 expose_streamable_http=expose_streamable_http, 

2428 sse_path=getattr(args, "ssePath", "/sse"), 

2429 message_path=getattr(args, "messagePath", "/message"), 

2430 keep_alive=getattr(args, "keepAlive", KEEP_ALIVE_INTERVAL), 

2431 stateless=getattr(args, "stateless", False), 

2432 json_response=getattr(args, "jsonResponse", False), 

2433 header_mappings=header_mappings, 

2434 ) 

2435 ) 

2436 

2437 # Handle remote connection modes 

2438 elif getattr(args, "connect_sse", None): 

2439 start_sse(args.connect_sse, args.oauth2Bearer, 30.0, args.stdioCommand) 

2440 elif getattr(args, "connect_streamable_http", None): 

2441 start_streamable_http_client(args.connect_streamable_http, args.oauth2Bearer, 30.0, args.stdioCommand) 

2442 elif getattr(args, "connect_grpc", None): 

2443 print("Error: --connect-grpc mode not yet implemented. Use --grpc to expose a gRPC server.", file=sys.stderr) 

2444 sys.exit(1) 

2445 else: 

2446 print("Error: Must specify either --stdio (to expose local server), --grpc (to expose gRPC server), or --connect-sse/--connect-streamable-http (to connect to remote)", file=sys.stderr) 

2447 sys.exit(1) 

2448 except KeyboardInterrupt: 

2449 print("") # restore shell prompt 

2450 sys.exit(0) 

2451 except (NotImplementedError, ImportError) as exc: 

2452 print(exc, file=sys.stderr) 

2453 sys.exit(1) 

2454 

2455 

2456if __name__ == "__main__": # python3 -m mcpgateway.translate ... 

2457 main()