Coverage for mcpgateway / translate.py: 99%

817 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2 

3'''Location: ./mcpgateway/translate.py 

4Copyright 2025 

5SPDX-License-Identifier: Apache-2.0 

6Authors: Mihai Criveti, Manav Gupta 

7 

8r"""Bridges between different MCP transport protocols. 

9This module provides bidirectional bridging between MCP servers that communicate 

10via different transport protocols: stdio/JSON-RPC, HTTP/SSE, and streamable HTTP. 

11It enables exposing local MCP servers over HTTP or consuming remote endpoints 

12as local stdio servers. 

13 

14The bridge supports multiple modes of operation: 

15- stdio to SSE: Expose a local stdio MCP server over HTTP/SSE 

16- SSE to stdio: Bridge a remote SSE endpoint to local stdio 

17- stdio to streamable HTTP: Expose a local stdio MCP server via streamable HTTP 

18- streamable HTTP to stdio: Bridge a remote streamable HTTP endpoint to local stdio 

19 

20Examples: 

21 Programmatic usage: 

22 

23 >>> import asyncio 

24 >>> from mcpgateway.translate import start_stdio 

25 >>> asyncio.run(start_stdio("uvx mcp-server-git", 9000, "info", None, "127.0.0.1")) # doctest: +SKIP 

26 

27 Test imports and configuration: 

28 

29 >>> from mcpgateway.translate import MCPServer, StreamableHTTPSessionManager 

30 >>> isinstance(MCPServer, type) 

31 True 

32 >>> isinstance(StreamableHTTPSessionManager, type) 

33 True 

34 >>> from mcpgateway.translate import KEEP_ALIVE_INTERVAL 

35 >>> KEEP_ALIVE_INTERVAL > 0 

36 True 

37 >>> from mcpgateway.translate import DEFAULT_KEEPALIVE_ENABLED 

38 >>> isinstance(DEFAULT_KEEPALIVE_ENABLED, bool) 

39 True 

40 

41 Test Starlette imports: 

42 

43 >>> from mcpgateway.translate import Starlette, Route 

44 >>> isinstance(Starlette, type) 

45 True 

46 >>> isinstance(Route, type) 

47 True 

48 

49 Test logging setup: 

50 

51 >>> from mcpgateway.translate import LOGGER, logging_service 

52 >>> LOGGER is not None 

53 True 

54 >>> logging_service is not None 

55 True 

56 >>> hasattr(LOGGER, 'info') 

57 True 

58 >>> hasattr(LOGGER, 'error') 

59 True 

60 >>> hasattr(LOGGER, 'debug') 

61 True 

62 

63 Test utility classes: 

64 

65 >>> from mcpgateway.translate import _PubSub, StdIOEndpoint 

66 >>> pubsub = _PubSub() 

67 >>> hasattr(pubsub, 'publish') 

68 True 

69 >>> hasattr(pubsub, 'subscribe') 

70 True 

71 >>> hasattr(pubsub, 'unsubscribe') 

72 True 

73 

74Usage: 

75 Command line usage:: 

76 

77 # 1. Expose an MCP server that talks JSON-RPC on stdio at :9000/sse 

78 python3 -m mcpgateway.translate --stdio "uvx mcp-server-git" --port 9000 

79 

80 # 2. Bridge a remote SSE endpoint to local stdio 

81 python3 -m mcpgateway.translate --sse "https://example.com/sse" \ 

82 --stdioCommand "uvx mcp-client" 

83 

84 # 3. Expose stdio server via streamable HTTP at :9000/mcp 

85 python3 -m mcpgateway.translate --streamableHttp "uvx mcp-server-git" \ 

86 --port 9000 --stateless --jsonResponse 

87 

88 # 4. Connect to remote streamable HTTP endpoint 

89 python3 -m mcpgateway.translate \ 

90 --streamableHttp "https://example.com/mcp" \ 

91 --oauth2Bearer "your-token" 

92 

93 # 5. Test SSE endpoint 

94 curl -N http://localhost:9000/sse # receive the stream 

95 

96 # 6. Send a test echo request to SSE endpoint 

97 curl -X POST http://localhost:9000/message \ 

98 -H 'Content-Type: application/json' \ 

99 -d '{"jsonrpc":"2.0","id":1,"method":"echo","params":{"value":"hi"}}' 

100 

101 # 7. Test streamable HTTP endpoint 

102 curl -X POST http://localhost:9000/mcp \ 

103 -H 'Content-Type: application/json' \ 

104 -d '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{},"clientInfo":{"name":"demo","version":"0.0.1"}}}' 

105 

106 The SSE stream emits JSON-RPC responses as ``event: message`` frames and sends 

107 regular ``event: keepalive`` frames (default every 30s) to prevent timeouts. 

108 

109 Streamable HTTP supports both stateful (with session management) and stateless 

110 modes, and can return either JSON responses or SSE streams. 

111""" 

112''' 

113 

114# Future 

115from __future__ import annotations 

116 

117# Standard 

118import argparse 

119import asyncio 

120from contextlib import suppress 

121import logging 

122import os 

123import shlex 

124import signal 

125import sys 

126from typing import Any, AsyncIterator, Dict, List, Optional, Sequence, Tuple 

127from urllib.parse import urlencode 

128import uuid 

129 

130# Third-Party 

131import anyio 

132from fastapi import FastAPI, Request, Response, status 

133from fastapi.middleware.cors import CORSMiddleware 

134from fastapi.responses import PlainTextResponse 

135from mcp.server import Server as MCPServer 

136from mcp.server.streamable_http_manager import StreamableHTTPSessionManager 

137import orjson 

138from starlette.applications import Starlette 

139from starlette.routing import Route 

140from starlette.types import Receive, Scope, Send 

141import uvicorn 

142 

143try: 

144 # Third-Party 

145 import httpx 

146except ImportError: 

147 httpx = None # type: ignore[assignment] 

148 

149# First-Party 

150from mcpgateway.services.logging_service import LoggingService 

151from mcpgateway.translate_header_utils import extract_env_vars_from_headers, NormalizedMappings, parse_header_mappings 

152 

153# Use patched EventSourceResponse with CPU spin protection (anyio#695 fix) 

154from mcpgateway.transports.sse_transport import EventSourceResponse 

155from mcpgateway.utils.orjson_response import ORJSONResponse 

156 

157# Initialize logging service first 

158logging_service = LoggingService() 

159LOGGER = logging_service.get_logger("mcpgateway.translate") 

160CONTENT_TYPE = os.getenv("FORGE_CONTENT_TYPE", "application/json") 

161# headers = {"Content-Type": CONTENT_TYPE} 

162# Import settings for default keepalive interval 

163try: 

164 # First-Party 

165 from mcpgateway.config import settings 

166 

167 DEFAULT_KEEP_ALIVE_INTERVAL = settings.sse_keepalive_interval 

168 DEFAULT_KEEPALIVE_ENABLED = settings.sse_keepalive_enabled 

169 DEFAULT_SSL_VERIFY = not settings.skip_ssl_verify 

170except ImportError: 

171 # Fallback if config not available 

172 DEFAULT_KEEP_ALIVE_INTERVAL = 30 

173 DEFAULT_KEEPALIVE_ENABLED = True 

174 DEFAULT_SSL_VERIFY = True # Verify SSL by default when config unavailable 

175 

176KEEP_ALIVE_INTERVAL = DEFAULT_KEEP_ALIVE_INTERVAL # seconds - from config or fallback to 30 

177 

178# Buffer limit for subprocess stdout (default 64KB is too small for large tool responses) 

179# Set to 16MB to handle tools that return large amounts of data (e.g., search results) 

180STDIO_BUFFER_LIMIT = 16 * 1024 * 1024 # 16MB 

181 

182__all__ = ["main"] # for console-script entry-point 

183 

184 

185# ---------------------------------------------------------------------------# 

186# Helpers - trivial in-process Pub/Sub # 

187# ---------------------------------------------------------------------------# 

188class _PubSub: 

189 """Very small fan-out helper - one async Queue per subscriber. 

190 

191 This class implements a simple publish-subscribe pattern using asyncio queues 

192 for distributing messages from stdio subprocess to multiple SSE clients. 

193 

194 Examples: 

195 >>> import asyncio 

196 >>> async def test_pubsub(): 

197 ... pubsub = _PubSub() 

198 ... q = pubsub.subscribe() 

199 ... await pubsub.publish("hello") 

200 ... result = await q.get() 

201 ... pubsub.unsubscribe(q) 

202 ... return result 

203 >>> asyncio.run(test_pubsub()) 

204 'hello' 

205 """ 

206 

207 def __init__(self) -> None: 

208 """Initialize a new publish-subscribe system. 

209 

210 Creates an empty list of subscriber queues. Each subscriber will 

211 receive their own asyncio.Queue for receiving published messages. 

212 

213 Examples: 

214 >>> pubsub = _PubSub() 

215 >>> isinstance(pubsub._subscribers, list) 

216 True 

217 >>> len(pubsub._subscribers) 

218 0 

219 >>> hasattr(pubsub, '_subscribers') 

220 True 

221 """ 

222 self._subscribers: List[asyncio.Queue[str]] = [] 

223 

224 async def publish(self, data: str) -> None: 

225 """Publish data to all subscribers. 

226 

227 Dead queues (full) are automatically removed from the subscriber list. 

228 

229 Args: 

230 data: The data string to publish to all subscribers. 

231 

232 Examples: 

233 >>> import asyncio 

234 >>> async def test_publish(): 

235 ... pubsub = _PubSub() 

236 ... await pubsub.publish("test") # No subscribers, no error 

237 ... return True 

238 >>> asyncio.run(test_publish()) 

239 True 

240 

241 >>> # Test queue full handling 

242 >>> async def test_full_queue(): 

243 ... pubsub = _PubSub() 

244 ... # Create a queue with size 1 

245 ... q = asyncio.Queue(maxsize=1) 

246 ... pubsub._subscribers = [q] 

247 ... # Fill the queue 

248 ... await q.put("first") 

249 ... # This should remove the full queue 

250 ... await pubsub.publish("second") 

251 ... return len(pubsub._subscribers) 

252 >>> asyncio.run(test_full_queue()) 

253 0 

254 """ 

255 dead: List[asyncio.Queue[str]] = [] 

256 for q in self._subscribers: 

257 try: 

258 q.put_nowait(data) 

259 except asyncio.QueueFull: 

260 dead.append(q) 

261 for q in dead: 

262 with suppress(ValueError): 

263 self._subscribers.remove(q) 

264 

265 def subscribe(self) -> "asyncio.Queue[str]": 

266 """Subscribe to published data. 

267 

268 Creates a new queue for receiving published messages with a maximum 

269 size of 1024 items. 

270 

271 Returns: 

272 asyncio.Queue[str]: A queue that will receive published data. 

273 

274 Examples: 

275 >>> pubsub = _PubSub() 

276 >>> q = pubsub.subscribe() 

277 >>> isinstance(q, asyncio.Queue) 

278 True 

279 >>> q.maxsize 

280 1024 

281 >>> len(pubsub._subscribers) 

282 1 

283 >>> pubsub._subscribers[0] is q 

284 True 

285 """ 

286 q: asyncio.Queue[str] = asyncio.Queue(maxsize=1024) 

287 self._subscribers.append(q) 

288 return q 

289 

290 def unsubscribe(self, q: "asyncio.Queue[str]") -> None: 

291 """Unsubscribe from published data. 

292 

293 Removes the queue from the subscriber list. Safe to call even if 

294 the queue is not in the list. 

295 

296 Args: 

297 q: The queue to unsubscribe from published data. 

298 

299 Examples: 

300 >>> pubsub = _PubSub() 

301 >>> q = pubsub.subscribe() 

302 >>> pubsub.unsubscribe(q) 

303 >>> pubsub.unsubscribe(q) # No error on double unsubscribe 

304 """ 

305 with suppress(ValueError): 

306 self._subscribers.remove(q) 

307 

308 

309# ---------------------------------------------------------------------------# 

310# StdIO endpoint (child process ↔ async queues) # 

311# ---------------------------------------------------------------------------# 

312class StdIOEndpoint: 

313 """Wrap a child process whose stdin/stdout speak line-delimited JSON-RPC. 

314 

315 This class manages a subprocess that communicates via stdio using JSON-RPC 

316 protocol, pumping messages between the subprocess and a pubsub system. 

317 

318 Examples: 

319 >>> import asyncio 

320 >>> async def test_stdio(): 

321 ... pubsub = _PubSub() 

322 ... stdio = StdIOEndpoint("echo hello", pubsub) 

323 ... # Would start a real subprocess 

324 ... return isinstance(stdio, StdIOEndpoint) 

325 >>> asyncio.run(test_stdio()) 

326 True 

327 """ 

328 

329 def __init__(self, cmd: str, pubsub: _PubSub, env_vars: Optional[Dict[str, str]] = None, header_mappings: Optional[NormalizedMappings] = None) -> None: 

330 """Initialize a stdio endpoint for subprocess communication. 

331 

332 Sets up the endpoint with the command to run and the pubsub system 

333 for message distribution. The subprocess is not started until start() 

334 is called. 

335 

336 Args: 

337 cmd: The command string to execute as a subprocess. 

338 pubsub: The publish-subscribe system for distributing subprocess 

339 output to SSE clients. 

340 env_vars: Optional dictionary of environment variables to set 

341 when starting the subprocess. 

342 header_mappings: Optional mapping of HTTP headers to environment variable names 

343 for dynamic environment injection. 

344 

345 Examples: 

346 >>> pubsub = _PubSub() 

347 >>> endpoint = StdIOEndpoint("echo hello", pubsub) 

348 >>> endpoint._cmd 

349 'echo hello' 

350 >>> endpoint._proc is None 

351 True 

352 >>> isinstance(endpoint._pubsub, _PubSub) 

353 True 

354 >>> endpoint._stdin is None 

355 True 

356 >>> endpoint._pump_task is None 

357 True 

358 >>> endpoint._pubsub is pubsub 

359 True 

360 """ 

361 self._cmd = cmd 

362 self._pubsub = pubsub 

363 self._env_vars = env_vars or {} 

364 self._header_mappings = header_mappings 

365 self._proc: Optional[asyncio.subprocess.Process] = None 

366 self._stdin: Optional[asyncio.StreamWriter] = None 

367 self._pump_task: Optional[asyncio.Task[None]] = None 

368 

369 async def start(self, additional_env_vars: Optional[Dict[str, str]] = None) -> None: 

370 """Start the stdio subprocess with custom environment variables. 

371 

372 Creates the subprocess and starts the stdout pump task. The subprocess 

373 is created with stdin/stdout pipes and stderr passed through. 

374 

375 Args: 

376 additional_env_vars: Optional dictionary of additional environment 

377 variables to set when starting the subprocess. These will be 

378 combined with the environment variables set during initialization. 

379 

380 Raises: 

381 RuntimeError: If the subprocess fails to create stdin/stdout pipes. 

382 

383 Examples: 

384 >>> import asyncio # doctest: +SKIP 

385 >>> async def test_start(): # doctest: +SKIP 

386 ... pubsub = _PubSub() 

387 ... stdio = StdIOEndpoint("cat", pubsub) 

388 ... # await stdio.start() # doctest: +SKIP 

389 ... return True 

390 >>> asyncio.run(test_start()) # doctest: +SKIP 

391 True 

392 """ 

393 # Stop existing subprocess before starting a new one 

394 if self._proc is not None: 

395 await self.stop() 

396 

397 LOGGER.info(f"Starting stdio subprocess: {self._cmd}") 

398 

399 # Build environment from base + configured + additional 

400 env = os.environ.copy() 

401 env.update(self._env_vars) 

402 if additional_env_vars: 

403 env.update(additional_env_vars) 

404 

405 # System-critical environment variables that must never be cleared 

406 system_critical_vars = {"PATH", "HOME", "TMPDIR", "TEMP", "TMP", "USER", "LOGNAME", "SHELL", "LANG", "LC_ALL", "LC_CTYPE", "PYTHONHOME", "PYTHONPATH"} 

407 

408 # Clear any mapped env vars that weren't provided in headers to avoid inheritance 

409 if self._header_mappings: 

410 for env_var_name in self._header_mappings.values(): 

411 if env_var_name not in (additional_env_vars or {}) and env_var_name not in system_critical_vars: 

412 # Delete the variable instead of setting to empty string to avoid 

413 # breaking subprocess initialization 

414 env.pop(env_var_name, None) 

415 

416 LOGGER.debug(f"Subprocess environment variables: {list(env.keys())}") 

417 self._proc = await asyncio.create_subprocess_exec( 

418 *shlex.split(self._cmd), 

419 stdin=asyncio.subprocess.PIPE, 

420 stdout=asyncio.subprocess.PIPE, 

421 stderr=sys.stderr, # passthrough for visibility 

422 env=env, # 🔑 Add environment variable support 

423 limit=STDIO_BUFFER_LIMIT, # Increase buffer limit for large tool responses 

424 ) 

425 

426 # Explicit error checking 

427 if not self._proc.stdin or not self._proc.stdout: 

428 raise RuntimeError(f"Failed to create subprocess with stdin/stdout pipes for command: {self._cmd}") 

429 

430 LOGGER.debug("Subprocess started successfully") 

431 

432 self._stdin = self._proc.stdin 

433 self._pump_task = asyncio.create_task(self._pump_stdout()) 

434 

435 async def stop(self) -> None: 

436 """Stop the stdio subprocess. 

437 

438 Terminates the subprocess gracefully with a 5-second timeout, 

439 then cancels the pump task. 

440 

441 Examples: 

442 >>> import asyncio 

443 >>> async def test_stop(): 

444 ... pubsub = _PubSub() 

445 ... stdio = StdIOEndpoint("cat", pubsub) 

446 ... await stdio.stop() # Safe to call even if not started 

447 ... return True 

448 >>> asyncio.run(test_stop()) 

449 True 

450 """ 

451 if self._proc is None: 

452 return 

453 

454 # Check if process is still running 

455 try: 

456 if self._proc.returncode is not None: 

457 # Process already terminated 

458 LOGGER.info(f"Subprocess (pid={self._proc.pid}) already terminated") 

459 self._proc = None 

460 self._stdin = None 

461 return 

462 except (ProcessLookupError, AttributeError): 

463 # Process doesn't exist or is already cleaned up 

464 LOGGER.info("Subprocess already cleaned up") 

465 self._proc = None 

466 self._stdin = None 

467 return 

468 

469 LOGGER.info(f"Stopping subprocess (pid={self._proc.pid})") 

470 try: 

471 self._proc.terminate() 

472 with suppress(asyncio.TimeoutError): 

473 await asyncio.wait_for(self._proc.wait(), timeout=5) 

474 except ProcessLookupError: 

475 # Process already terminated 

476 LOGGER.info("Subprocess already terminated") 

477 finally: 

478 if self._pump_task: 

479 self._pump_task.cancel() 

480 # Wait for pump task to actually finish, suppressing all exceptions 

481 # since the pump task logs its own errors. Use BaseException to catch 

482 # CancelledError which inherits from BaseException in Python 3.8+ 

483 with suppress(BaseException): 

484 await self._pump_task 

485 self._proc = None 

486 self._stdin = None # Reset stdin too! 

487 

488 def is_running(self) -> bool: 

489 """Check if the stdio subprocess is currently running. 

490 

491 Returns: 

492 True if the subprocess is running, False otherwise. 

493 

494 Examples: 

495 >>> import asyncio 

496 >>> async def test_is_running(): 

497 ... pubsub = _PubSub() 

498 ... stdio = StdIOEndpoint("cat", pubsub) 

499 ... return stdio.is_running() 

500 >>> asyncio.run(test_is_running()) 

501 False 

502 """ 

503 return self._proc is not None 

504 

505 async def send(self, raw: str) -> None: 

506 """Send data to the subprocess stdin. 

507 

508 Args: 

509 raw: The raw data string to send to the subprocess. 

510 

511 Raises: 

512 RuntimeError: If the stdio endpoint is not started. 

513 

514 Examples: 

515 >>> import asyncio 

516 >>> async def test_send(): 

517 ... pubsub = _PubSub() 

518 ... stdio = StdIOEndpoint("cat", pubsub) 

519 ... try: 

520 ... await stdio.send("test") 

521 ... except RuntimeError as e: 

522 ... return str(e) 

523 >>> asyncio.run(test_send()) 

524 'stdio endpoint not started' 

525 """ 

526 if not self._stdin: 

527 raise RuntimeError("stdio endpoint not started") 

528 LOGGER.debug(f"→ stdio: {raw.strip()}") 

529 self._stdin.write(raw.encode()) 

530 await self._stdin.drain() 

531 

532 async def _pump_stdout(self) -> None: 

533 """Pump stdout from subprocess to pubsub. 

534 

535 Continuously reads lines from the subprocess stdout and publishes them 

536 to the pubsub system. Runs until EOF or exception. 

537 

538 Raises: 

539 RuntimeError: If process or stdout is not properly initialized. 

540 Exception: For any other error encountered while pumping stdout. 

541 """ 

542 if not self._proc or not self._proc.stdout: 

543 raise RuntimeError("Process not properly initialized: missing stdout") 

544 

545 reader = self._proc.stdout 

546 try: 

547 while True: 

548 line = await reader.readline() 

549 if not line: # EOF 

550 break 

551 text = line.decode(errors="replace") 

552 LOGGER.debug(f"← stdio: {text.strip()}") 

553 await self._pubsub.publish(text) 

554 except ConnectionResetError: # pragma: no cover --subprocess terminated 

555 # Subprocess terminated abruptly - this is expected behavior 

556 LOGGER.debug("stdout pump: subprocess connection closed") 

557 except Exception: # pragma: no cover --best-effort logging 

558 LOGGER.exception("stdout pump crashed - terminating bridge") 

559 raise 

560 

561 

562# ---------------------------------------------------------------------------# 

563# SSE Event Parser # 

564# ---------------------------------------------------------------------------# 

565class SSEEvent: 

566 """Represents a Server-Sent Event with proper field parsing. 

567 

568 Attributes: 

569 event: The event type (e.g., 'message', 'keepalive', 'endpoint') 

570 data: The event data payload 

571 event_id: Optional event ID 

572 retry: Optional retry interval in milliseconds 

573 """ 

574 

575 def __init__(self, event: str = "message", data: str = "", event_id: Optional[str] = None, retry: Optional[int] = None): 

576 """Initialize an SSE event. 

577 

578 Args: 

579 event: Event type, defaults to "message" 

580 data: Event data payload 

581 event_id: Optional event ID 

582 retry: Optional retry interval in milliseconds 

583 """ 

584 self.event = event 

585 self.data = data 

586 self.event_id = event_id 

587 self.retry = retry 

588 

589 @classmethod 

590 def parse_sse_line(cls, line: str, current_event: Optional["SSEEvent"] = None) -> Tuple[Optional["SSEEvent"], bool]: 

591 """Parse a single SSE line and update or create an event. 

592 

593 Args: 

594 line: The SSE line to parse 

595 current_event: The current event being built (if any) 

596 

597 Returns: 

598 Tuple of (event, is_complete) where event is the SSEEvent object 

599 and is_complete indicates if the event is ready to be processed 

600 """ 

601 line = line.rstrip("\n\r") 

602 

603 # Empty line signals end of event 

604 if not line: 

605 if current_event and current_event.data: 

606 return current_event, True 

607 return None, False 

608 

609 # Comment line 

610 if line.startswith(":"): 

611 return current_event, False 

612 

613 # Parse field 

614 if ":" in line: 

615 field, value = line.split(":", 1) 

616 value = value.lstrip(" ") # Remove leading space if present 

617 else: 

618 field = line 

619 value = "" 

620 

621 # Create event if needed 

622 if current_event is None: 

623 current_event = cls() 

624 

625 # Update fields 

626 if field == "event": 

627 current_event.event = value 

628 elif field == "data": 

629 if current_event.data: 

630 current_event.data += "\n" + value 

631 else: 

632 current_event.data = value 

633 elif field == "id": 

634 current_event.event_id = value 

635 elif field == "retry": 

636 try: 

637 current_event.retry = int(value) 

638 except ValueError: 

639 pass # Ignore invalid retry values 

640 

641 return current_event, False 

642 

643 

644# ---------------------------------------------------------------------------# 

645# FastAPI app exposing /sse & /message # 

646# ---------------------------------------------------------------------------# 

647 

648 

649def _build_fastapi( 

650 pubsub: _PubSub, 

651 stdio: StdIOEndpoint, 

652 keep_alive: float = KEEP_ALIVE_INTERVAL, 

653 sse_path: str = "/sse", 

654 message_path: str = "/message", 

655 cors_origins: Optional[List[str]] = None, 

656 header_mappings: Optional[NormalizedMappings] = None, 

657) -> FastAPI: 

658 """Build FastAPI application with SSE and message endpoints. 

659 

660 Creates a FastAPI app with SSE streaming endpoint and message posting 

661 endpoint for bidirectional communication with the stdio subprocess. 

662 

663 Args: 

664 pubsub: The publish/subscribe system for message routing. 

665 stdio: The stdio endpoint for subprocess communication. 

666 keep_alive: Interval in seconds for keepalive messages. Defaults to KEEP_ALIVE_INTERVAL. 

667 sse_path: Path for the SSE endpoint. Defaults to "/sse". 

668 message_path: Path for the message endpoint. Defaults to "/message". 

669 cors_origins: Optional list of CORS allowed origins. 

670 header_mappings: Optional mapping of HTTP headers to environment variables. 

671 

672 Returns: 

673 FastAPI: The configured FastAPI application. 

674 

675 Examples: 

676 >>> pubsub = _PubSub() 

677 >>> stdio = StdIOEndpoint("cat", pubsub) 

678 >>> app = _build_fastapi(pubsub, stdio) 

679 >>> isinstance(app, FastAPI) 

680 True 

681 >>> "/sse" in [r.path for r in app.routes] 

682 True 

683 >>> "/message" in [r.path for r in app.routes] 

684 True 

685 >>> "/healthz" in [r.path for r in app.routes] 

686 True 

687 

688 >>> # Test with custom paths 

689 >>> app2 = _build_fastapi(pubsub, stdio, sse_path="/events", message_path="/send") 

690 >>> "/events" in [r.path for r in app2.routes] 

691 True 

692 >>> "/send" in [r.path for r in app2.routes] 

693 True 

694 

695 >>> # Test CORS middleware is added 

696 >>> app3 = _build_fastapi(pubsub, stdio, cors_origins=["http://example.com"]) 

697 >>> # Check that middleware stack includes CORSMiddleware 

698 >>> any("CORSMiddleware" in str(m) for m in app3.user_middleware) 

699 True 

700 """ 

701 # Standard 

702 import time as _time # pylint: disable=import-outside-toplevel 

703 

704 app = FastAPI() 

705 

706 # Rate limiter: minimum interval between subprocess restarts (seconds) 

707 restart_min_interval_secs = 30.0 

708 last_restart_ts: dict[str, float] = {"t": 0.0} 

709 

710 def _restart_allowed() -> bool: 

711 """Return whether enough time elapsed to allow a subprocess restart. 

712 

713 Returns: 

714 bool: ``True`` when restart is allowed, otherwise ``False``. 

715 """ 

716 now = _time.monotonic() 

717 if now - last_restart_ts["t"] < restart_min_interval_secs: 

718 return False 

719 last_restart_ts["t"] = now 

720 return True 

721 

722 # Add CORS middleware if origins specified 

723 if cors_origins: 

724 app.add_middleware( 

725 CORSMiddleware, 

726 allow_origins=cors_origins, 

727 allow_credentials=True, 

728 allow_methods=["*"], 

729 allow_headers=["*"], 

730 ) 

731 

732 # ----- GET /sse ---------------------------------------------------------# 

733 @app.get(sse_path) 

734 async def get_sse(request: Request) -> EventSourceResponse: # noqa: D401 

735 """Stream subprocess stdout to any number of SSE clients. 

736 

737 Args: 

738 request (Request): The incoming ``GET`` request that will be 

739 upgraded to a Server-Sent Events (SSE) stream. 

740 

741 Returns: 

742 EventSourceResponse: A streaming response that forwards JSON-RPC 

743 messages from the child process and emits periodic ``keepalive`` 

744 frames so that clients and proxies do not time out. 

745 """ 

746 # Extract environment variables from headers if dynamic env is enabled 

747 additional_env_vars = {} 

748 if header_mappings: 

749 request_headers = dict(request.headers) 

750 additional_env_vars = extract_env_vars_from_headers(request_headers, header_mappings) 

751 

752 # Restart stdio endpoint with new environment variables (rate-limited) 

753 if additional_env_vars and _restart_allowed(): 

754 LOGGER.info(f"Restarting stdio endpoint with {len(additional_env_vars)} environment variables") 

755 await stdio.stop() 

756 await stdio.start(additional_env_vars) 

757 

758 queue = pubsub.subscribe() 

759 session_id = uuid.uuid4().hex 

760 

761 async def event_gen() -> AsyncIterator[Dict[str, Any]]: 

762 """Generate Server-Sent Events for the SSE stream. 

763 

764 Yields SSE events in the following sequence: 

765 1. An 'endpoint' event with the message posting URL (required by MCP spec) 

766 2. An immediate 'keepalive' event to confirm the stream is active 

767 3. 'message' events containing JSON-RPC responses from the subprocess 

768 4. Periodic 'keepalive' events to prevent timeouts 

769 

770 The generator runs until the client disconnects or the server shuts down. 

771 Automatically unsubscribes from the pubsub system on completion. 

772 

773 Yields: 

774 Dict[str, Any]: SSE event dictionaries containing: 

775 - event: The event type ('endpoint', 'message', or 'keepalive') 

776 - data: The event payload (URL, JSON-RPC message, or empty object) 

777 - retry: Retry interval in milliseconds for reconnection 

778 

779 Examples: 

780 >>> import asyncio 

781 >>> async def test_event_gen(): 

782 ... # This is tested indirectly through the SSE endpoint 

783 ... return True 

784 >>> asyncio.run(test_event_gen()) 

785 True 

786 """ 

787 # 1️⃣ Mandatory "endpoint" bootstrap required by the MCP spec 

788 endpoint_url = f"{str(request.base_url).rstrip('/')}{message_path}?session_id={session_id}" 

789 yield { 

790 "event": "endpoint", 

791 "data": endpoint_url, 

792 "retry": int(keep_alive * 1000), 

793 } 

794 

795 # 2️⃣ Immediate keepalive so clients know the stream is alive (if enabled in config) 

796 if DEFAULT_KEEPALIVE_ENABLED: 

797 yield {"event": "keepalive", "data": "{}", "retry": keep_alive * 1000} 

798 

799 try: 

800 while True: 

801 if await request.is_disconnected(): 

802 break 

803 

804 try: 

805 timeout = keep_alive if DEFAULT_KEEPALIVE_ENABLED else None 

806 msg = await asyncio.wait_for(queue.get(), timeout) 

807 yield {"event": "message", "data": msg.rstrip()} 

808 except asyncio.TimeoutError: 

809 if DEFAULT_KEEPALIVE_ENABLED: 

810 yield { 

811 "event": "keepalive", 

812 "data": "{}", 

813 "retry": keep_alive * 1000, 

814 } 

815 finally: 

816 if pubsub: 

817 pubsub.unsubscribe(queue) 

818 

819 return EventSourceResponse( 

820 event_gen(), 

821 headers={ 

822 "Cache-Control": "no-cache", 

823 "Connection": "keep-alive", 

824 "X-Accel-Buffering": "no", # disable proxy buffering 

825 }, 

826 ) 

827 

828 # ----- POST /message ----------------------------------------------------# 

829 @app.post(message_path, status_code=status.HTTP_202_ACCEPTED) 

830 async def post_message(raw: Request, session_id: str | None = None) -> Response: # noqa: D401 

831 """Forward a raw JSON-RPC request to the stdio subprocess. 

832 

833 Args: 

834 raw (Request): The incoming ``POST`` request whose body contains 

835 a single JSON-RPC message. 

836 session_id (str | None): The SSE session identifier that originated 

837 this back-channel call (present when the client obtained the 

838 endpoint URL from an ``endpoint`` bootstrap frame). 

839 

840 Returns: 

841 Response: ``202 Accepted`` if the payload is forwarded successfully, 

842 or ``400 Bad Request`` when the body is not valid JSON. 

843 """ 

844 _ = session_id # Unused but required for API compatibility 

845 

846 # Extract environment variables from headers if dynamic env is enabled 

847 additional_env_vars = {} 

848 if header_mappings: 

849 request_headers = dict(raw.headers) 

850 additional_env_vars = extract_env_vars_from_headers(request_headers, header_mappings) 

851 

852 # Restart stdio endpoint with new environment variables (rate-limited) 

853 if additional_env_vars and _restart_allowed(): 

854 LOGGER.info(f"Restarting stdio endpoint with {len(additional_env_vars)} environment variables") 

855 await stdio.stop() 

856 await stdio.start(additional_env_vars) 

857 await asyncio.sleep(0.5) # Give process time to initialize 

858 

859 # Ensure stdio endpoint is running 

860 if not stdio.is_running(): 

861 LOGGER.info("Starting stdio endpoint (was not running)") 

862 await stdio.start() 

863 await asyncio.sleep(0.5) # Give process time to initialize 

864 

865 payload = await raw.body() 

866 try: 

867 orjson.loads(payload) # validate 

868 except Exception as exc: # noqa: BLE001 

869 return PlainTextResponse( 

870 f"Invalid JSON payload: {exc}", 

871 status_code=status.HTTP_400_BAD_REQUEST, 

872 ) 

873 await stdio.send(payload.decode().rstrip() + "\n") 

874 return PlainTextResponse("forwarded", status_code=status.HTTP_202_ACCEPTED) 

875 

876 # ----- Liveness ---------------------------------------------------------# 

877 @app.get("/healthz") 

878 async def health() -> Response: # noqa: D401 

879 """Health check endpoint. 

880 

881 Returns: 

882 Response: A plain text response with "ok" status. 

883 """ 

884 return PlainTextResponse("ok") 

885 

886 return app 

887 

888 

889# ---------------------------------------------------------------------------# 

890# CLI & orchestration # 

891# ---------------------------------------------------------------------------# 

892 

893 

894def _parse_args(argv: Sequence[str]) -> argparse.Namespace: 

895 """Parse command line arguments. 

896 

897 Validates mutually exclusive source options and sets defaults for 

898 port and logging configuration. 

899 

900 Args: 

901 argv: Sequence of command line arguments. 

902 

903 Returns: 

904 argparse.Namespace: Parsed command line arguments. 

905 

906 Raises: 

907 NotImplementedError: If streamableHttp option is specified. 

908 

909 Examples: 

910 >>> args = _parse_args(["--stdio", "cat", "--port", "9000"]) 

911 >>> args.stdio 

912 'cat' 

913 >>> args.port 

914 9000 

915 >>> args.logLevel 

916 'info' 

917 >>> args.host 

918 '127.0.0.1' 

919 >>> args.cors is None 

920 True 

921 >>> args.oauth2Bearer is None 

922 True 

923 

924 >>> # Test default parameters 

925 >>> args = _parse_args(["--stdio", "cat"]) 

926 >>> args.port 

927 8000 

928 >>> args.host 

929 '127.0.0.1' 

930 >>> args.logLevel 

931 'info' 

932 

933 >>> # Test connect-sse mode 

934 >>> args = _parse_args(["--connect-sse", "http://example.com/sse"]) 

935 >>> args.connect_sse 

936 'http://example.com/sse' 

937 >>> args.stdio is None 

938 True 

939 

940 >>> # Test CORS configuration 

941 >>> args = _parse_args(["--stdio", "cat", "--cors", "https://app.com", "https://web.com"]) 

942 >>> args.cors 

943 ['https://app.com', 'https://web.com'] 

944 

945 >>> # Test OAuth2 Bearer token 

946 >>> args = _parse_args(["--connect-sse", "http://example.com", "--oauth2Bearer", "token123"]) 

947 >>> args.oauth2Bearer 

948 'token123' 

949 

950 >>> # Test custom host and log level 

951 >>> args = _parse_args(["--stdio", "cat", "--host", "0.0.0.0", "--logLevel", "debug"]) 

952 >>> args.host 

953 '0.0.0.0' 

954 >>> args.logLevel 

955 'debug' 

956 

957 >>> # Test expose protocols 

958 >>> args = _parse_args(["--stdio", "uvx mcp-server-git", "--expose-sse", "--expose-streamable-http"]) 

959 >>> args.stdio 

960 'uvx mcp-server-git' 

961 >>> args.expose_sse 

962 True 

963 >>> args.expose_streamable_http 

964 True 

965 >>> args.stateless 

966 False 

967 >>> args.jsonResponse 

968 False 

969 

970 >>> # Test new parameters 

971 >>> args = _parse_args(["--stdio", "cat", "--ssePath", "/events", "--messagePath", "/send", "--keepAlive", "60"]) 

972 >>> args.ssePath 

973 '/events' 

974 >>> args.messagePath 

975 '/send' 

976 >>> args.keepAlive 

977 60 

978 

979 >>> # Test connect-sse with stdio command 

980 >>> args = _parse_args(["--connect-sse", "http://example.com/sse", "--stdioCommand", "uvx mcp-server-git"]) 

981 >>> args.stdioCommand 

982 'uvx mcp-server-git' 

983 

984 >>> # Test connect-sse without stdio command (allowed) 

985 >>> args = _parse_args(["--connect-sse", "http://example.com/sse"]) 

986 >>> args.stdioCommand is None 

987 True 

988 """ 

989 p = argparse.ArgumentParser( 

990 prog="mcpgateway.translate", 

991 description="Bridges between different MCP transport protocols: stdio, SSE, and streamable HTTP.", 

992 ) 

993 

994 # Source/destination options 

995 p.add_argument("--stdio", help='Local command to run, e.g. "uvx mcp-server-git"') 

996 p.add_argument("--connect-sse", dest="connect_sse", help="Connect to remote SSE endpoint URL") 

997 p.add_argument("--connect-streamable-http", dest="connect_streamable_http", help="Connect to remote streamable HTTP endpoint URL") 

998 p.add_argument("--grpc", type=str, help="gRPC server target (host:port) to expose") 

999 p.add_argument("--connect-grpc", type=str, help="Remote gRPC endpoint to connect to") 

1000 

1001 # Protocol exposure options (can be combined) 

1002 p.add_argument("--expose-sse", action="store_true", help="Expose via SSE protocol (endpoints: /sse and /message)") 

1003 p.add_argument("--expose-streamable-http", action="store_true", help="Expose via streamable HTTP protocol (endpoint: /mcp)") 

1004 

1005 # gRPC configuration options 

1006 p.add_argument("--grpc-tls", action="store_true", help="Enable TLS for gRPC connection") 

1007 p.add_argument("--grpc-cert", type=str, help="Path to TLS certificate for gRPC") 

1008 p.add_argument("--grpc-key", type=str, help="Path to TLS key for gRPC") 

1009 p.add_argument("--grpc-metadata", action="append", help="gRPC metadata (KEY=VALUE, repeatable)") 

1010 

1011 p.add_argument("--port", type=int, default=8000, help="HTTP port to bind") 

1012 p.add_argument("--host", default="127.0.0.1", help="Host interface to bind (default: 127.0.0.1)") 

1013 p.add_argument( 

1014 "--logLevel", 

1015 default="info", 

1016 choices=["debug", "info", "warning", "error", "critical"], 

1017 help="Log level", 

1018 ) 

1019 p.add_argument( 

1020 "--cors", 

1021 nargs="*", 

1022 help="CORS allowed origins (e.g., --cors https://app.example.com)", 

1023 ) 

1024 p.add_argument( 

1025 "--oauth2Bearer", 

1026 help="OAuth2 Bearer token for authentication", 

1027 ) 

1028 

1029 # New configuration options 

1030 p.add_argument( 

1031 "--ssePath", 

1032 default="/sse", 

1033 help="SSE endpoint path (default: /sse)", 

1034 ) 

1035 p.add_argument( 

1036 "--messagePath", 

1037 default="/message", 

1038 help="Message endpoint path (default: /message)", 

1039 ) 

1040 p.add_argument( 

1041 "--keepAlive", 

1042 type=int, 

1043 default=KEEP_ALIVE_INTERVAL, 

1044 help=f"Keep-alive interval in seconds (default: {KEEP_ALIVE_INTERVAL})", 

1045 ) 

1046 

1047 # For SSE to stdio mode 

1048 p.add_argument( 

1049 "--stdioCommand", 

1050 help="Command to run when bridging SSE/streamableHttp to stdio (optional with --sse or --streamableHttp)", 

1051 ) 

1052 

1053 # Dynamic environment variable injection 

1054 p.add_argument("--enable-dynamic-env", action="store_true", help="Enable dynamic environment variable injection from HTTP headers") 

1055 p.add_argument( 

1056 "--header-to-env", 

1057 action="append", 

1058 default=[], 

1059 help="Map HTTP header to environment variable (format: HEADER=ENV_VAR, can be used multiple times). Case-insensitive duplicates are rejected (e.g., Authorization and authorization cannot both be mapped).", 

1060 ) 

1061 

1062 # For streamable HTTP mode 

1063 p.add_argument( 

1064 "--stateless", 

1065 action="store_true", 

1066 help="Use stateless mode for streamable HTTP (default: False)", 

1067 ) 

1068 p.add_argument( 

1069 "--jsonResponse", 

1070 action="store_true", 

1071 help="Return JSON responses instead of SSE streams for streamable HTTP (default: False)", 

1072 ) 

1073 

1074 args = p.parse_args(argv) 

1075 # streamableHttp is now supported, no need to raise NotImplementedError 

1076 return args 

1077 

1078 

1079async def _run_stdio_to_sse( 

1080 cmd: str, 

1081 port: int, 

1082 log_level: str = "info", 

1083 cors: Optional[List[str]] = None, 

1084 host: str = "127.0.0.1", 

1085 sse_path: str = "/sse", 

1086 message_path: str = "/message", 

1087 keep_alive: float = KEEP_ALIVE_INTERVAL, 

1088 header_mappings: Optional[NormalizedMappings] = None, 

1089) -> None: 

1090 """Run stdio to SSE bridge. 

1091 

1092 Starts a subprocess and exposes it via HTTP/SSE endpoints. Handles graceful 

1093 shutdown on SIGINT/SIGTERM. 

1094 

1095 Args: 

1096 cmd: The command to run as a stdio subprocess. 

1097 port: The port to bind the HTTP server to. 

1098 log_level: The logging level to use. Defaults to "info". 

1099 cors: Optional list of CORS allowed origins. 

1100 host: The host interface to bind to. Defaults to "127.0.0.1" for security. 

1101 sse_path: Path for the SSE endpoint. Defaults to "/sse". 

1102 message_path: Path for the message endpoint. Defaults to "/message". 

1103 keep_alive: Keep-alive interval in seconds. Defaults to KEEP_ALIVE_INTERVAL. 

1104 header_mappings: Optional mapping of HTTP headers to environment variables. 

1105 

1106 Examples: 

1107 >>> import asyncio # doctest: +SKIP 

1108 >>> async def test_run(): # doctest: +SKIP 

1109 ... await _run_stdio_to_sse("cat", 9000) # doctest: +SKIP 

1110 ... return True 

1111 >>> asyncio.run(test_run()) # doctest: +SKIP 

1112 True 

1113 """ 

1114 pubsub = _PubSub() 

1115 stdio = StdIOEndpoint(cmd, pubsub, header_mappings=header_mappings) 

1116 await stdio.start() 

1117 

1118 app = _build_fastapi(pubsub, stdio, keep_alive=keep_alive, sse_path=sse_path, message_path=message_path, cors_origins=cors, header_mappings=header_mappings) 

1119 config = uvicorn.Config( 

1120 app, 

1121 host=host, # Changed from hardcoded "0.0.0.0" 

1122 port=port, 

1123 log_level=log_level, 

1124 lifespan="off", 

1125 ) 

1126 uvicorn_server = uvicorn.Server(config) 

1127 

1128 shutting_down = asyncio.Event() # 🔄 make shutdown idempotent 

1129 

1130 async def _shutdown() -> None: 

1131 """Handle graceful shutdown of the stdio bridge. 

1132 

1133 Performs shutdown operations in the correct order: 

1134 1. Sets a flag to prevent multiple shutdown attempts 

1135 2. Stops the stdio subprocess 

1136 3. Shuts down the HTTP server 

1137 

1138 This function is idempotent - multiple calls will only execute 

1139 the shutdown sequence once. 

1140 

1141 Examples: 

1142 >>> import asyncio 

1143 >>> async def test_shutdown(): 

1144 ... # Shutdown is tested as part of the main run flow 

1145 ... return True 

1146 >>> asyncio.run(test_shutdown()) 

1147 True 

1148 """ 

1149 if shutting_down.is_set(): 

1150 return 

1151 shutting_down.set() 

1152 LOGGER.info("Shutting down ...") 

1153 await stdio.stop() 

1154 # Graceful shutdown by setting the shutdown event 

1155 # Use getattr to safely access should_exit attribute 

1156 setattr(uvicorn_server, "should_exit", getattr(uvicorn_server, "should_exit", False) or True) 

1157 

1158 loop = asyncio.get_running_loop() 

1159 for sig in (signal.SIGINT, signal.SIGTERM): 

1160 with suppress(NotImplementedError): # Windows lacks add_signal_handler 

1161 loop.add_signal_handler(sig, lambda *_: asyncio.create_task(_shutdown())) 

1162 

1163 LOGGER.info(f"Bridge ready → http://{host}:{port}{sse_path}") 

1164 await uvicorn_server.serve() 

1165 await _shutdown() # final cleanup 

1166 

1167 

1168async def _run_sse_to_stdio(url: str, oauth2_bearer: Optional[str] = None, timeout: float = 30.0, stdio_command: Optional[str] = None, max_retries: int = 5, initial_retry_delay: float = 1.0) -> None: 

1169 """Run SSE to stdio bridge. 

1170 

1171 Connects to a remote SSE endpoint and bridges it to local stdio. 

1172 Implements proper bidirectional message flow with error handling and retries. 

1173 

1174 Args: 

1175 url: The SSE endpoint URL to connect to. 

1176 oauth2_bearer: Optional OAuth2 bearer token for authentication. Defaults to None. 

1177 timeout: HTTP client timeout in seconds. Defaults to 30.0. 

1178 stdio_command: Optional command to run for local stdio processing. 

1179 If not provided, will simply print SSE messages to stdout. 

1180 max_retries: Maximum number of connection retry attempts. Defaults to 5. 

1181 initial_retry_delay: Initial delay between retries in seconds. Defaults to 1.0. 

1182 

1183 Raises: 

1184 ImportError: If httpx package is not available. 

1185 RuntimeError: If the subprocess fails to create stdin/stdout pipes. 

1186 Exception: For any unexpected error in SSE stream processing. 

1187 

1188 Examples: 

1189 >>> import asyncio 

1190 >>> async def test_sse(): 

1191 ... try: 

1192 ... await _run_sse_to_stdio("http://example.com/sse", None) # doctest: +SKIP 

1193 ... except ImportError as e: 

1194 ... return "httpx" in str(e) 

1195 >>> asyncio.run(test_sse()) # Would return True if httpx not installed # doctest: +SKIP 

1196 """ 

1197 if not httpx: 

1198 raise ImportError("httpx package is required for SSE to stdio bridging") 

1199 

1200 headers = {} 

1201 if oauth2_bearer: 

1202 headers["Authorization"] = f"Bearer {oauth2_bearer}" 

1203 

1204 # If no stdio command provided, use simple mode (just print to stdout) 

1205 if not stdio_command: 

1206 LOGGER.warning("No --stdioCommand provided, running in simple mode (SSE to stdout only)") 

1207 # First-Party 

1208 from mcpgateway.services.http_client_service import get_isolated_http_client # pylint: disable=import-outside-toplevel 

1209 

1210 async with get_isolated_http_client(timeout=timeout, headers=headers, verify=DEFAULT_SSL_VERIFY, connect_timeout=10.0, write_timeout=timeout, pool_timeout=timeout) as client: 

1211 await _simple_sse_pump(client, url, max_retries, initial_retry_delay) 

1212 return 

1213 

1214 # Start the stdio subprocess 

1215 process = await asyncio.create_subprocess_exec( 

1216 *shlex.split(stdio_command), 

1217 stdin=asyncio.subprocess.PIPE, 

1218 stdout=asyncio.subprocess.PIPE, 

1219 stderr=sys.stderr, 

1220 ) 

1221 

1222 if not process.stdin or not process.stdout: 

1223 raise RuntimeError(f"Failed to create subprocess with stdin/stdout pipes for command: {stdio_command}") 

1224 

1225 # Store the message endpoint URL once received 

1226 message_endpoint: Optional[str] = None 

1227 

1228 async def read_stdout(client: httpx.AsyncClient) -> None: 

1229 """Read lines from subprocess stdout and POST to message endpoint. 

1230 

1231 Continuously reads JSON-RPC requests from the subprocess stdout 

1232 and POSTs them to the remote message endpoint obtained from the 

1233 SSE stream's endpoint event. 

1234 

1235 Args: 

1236 client: The HTTP client to use for POSTing messages. 

1237 

1238 Raises: 

1239 RuntimeError: If the process stdout stream is not available. 

1240 

1241 Examples: 

1242 >>> import asyncio 

1243 >>> async def test_read(): 

1244 ... # This is tested as part of the SSE to stdio flow 

1245 ... return True 

1246 >>> asyncio.run(test_read()) 

1247 True 

1248 """ 

1249 if not process.stdout: 

1250 raise RuntimeError("Process stdout not available") 

1251 

1252 while True: 

1253 if not process.stdout: 

1254 raise RuntimeError("Process stdout not available") 

1255 line = await process.stdout.readline() 

1256 if not line: 

1257 break 

1258 

1259 text = line.decode().strip() 

1260 if not text: 

1261 continue 

1262 

1263 LOGGER.debug(f"← stdio: {text}") 

1264 

1265 # Wait for endpoint URL if not yet received 

1266 retry_count = 0 

1267 while not message_endpoint and retry_count < 30: # 30 second timeout 

1268 await asyncio.sleep(1) 

1269 retry_count += 1 

1270 

1271 if not message_endpoint: 

1272 LOGGER.error("No message endpoint received from SSE stream") 

1273 continue 

1274 

1275 # POST the JSON-RPC request to the message endpoint 

1276 try: 

1277 response = await client.post(message_endpoint, content=text, headers={"Content-Type": "application/json"}) 

1278 if response.status_code != 202: 

1279 LOGGER.warning(f"Message endpoint returned {response.status_code}: {response.text}") 

1280 except Exception as e: 

1281 LOGGER.error(f"Failed to POST to message endpoint: {e}") 

1282 

1283 async def pump_sse_to_stdio(client: httpx.AsyncClient) -> None: 

1284 """Stream SSE data from remote endpoint to subprocess stdin. 

1285 

1286 Connects to the remote SSE endpoint with retry logic and forwards 

1287 message events to the subprocess stdin. Properly parses SSE events 

1288 and handles endpoint, message, and keepalive event types. 

1289 

1290 Args: 

1291 client: The HTTP client to use for SSE streaming. 

1292 

1293 Raises: 

1294 HTTPStatusError: If the SSE endpoint returns a non-200 status code. 

1295 Exception: For unexpected errors in SSE stream processing. 

1296 

1297 Examples: 

1298 >>> import asyncio 

1299 >>> async def test_pump(): 

1300 ... # This is tested as part of the SSE to stdio flow 

1301 ... return True 

1302 >>> asyncio.run(test_pump()) 

1303 True 

1304 """ 

1305 nonlocal message_endpoint 

1306 retry_delay = initial_retry_delay 

1307 retry_count = 0 

1308 

1309 while retry_count < max_retries: 

1310 try: 

1311 LOGGER.info(f"Connecting to SSE endpoint: {url}") 

1312 

1313 async with client.stream("GET", url) as response: 

1314 # Check status code if available (real httpx response) 

1315 if hasattr(response, "status_code") and response.status_code != 200: 

1316 if httpx: 

1317 raise httpx.HTTPStatusError(f"SSE endpoint returned {response.status_code}", request=response.request, response=response) 

1318 raise Exception(f"SSE endpoint returned {response.status_code}") 

1319 

1320 # Reset retry counter on successful connection 

1321 retry_count = 0 

1322 retry_delay = initial_retry_delay 

1323 current_event: Optional[SSEEvent] = None 

1324 

1325 async for line in response.aiter_lines(): 

1326 event, is_complete = SSEEvent.parse_sse_line(line, current_event) 

1327 current_event = event 

1328 

1329 if is_complete and current_event: 

1330 LOGGER.debug(f"SSE event: {current_event.event} - {current_event.data[:100]}...") 

1331 

1332 if current_event.event == "endpoint": 

1333 # Store the message endpoint URL 

1334 message_endpoint = current_event.data 

1335 LOGGER.info(f"Received message endpoint: {message_endpoint}") 

1336 

1337 elif current_event.event == "message": 

1338 # Forward JSON-RPC responses to stdio 

1339 if process.stdin: 

1340 process.stdin.write((current_event.data + "\n").encode()) 

1341 await process.stdin.drain() 

1342 LOGGER.debug(f"→ stdio: {current_event.data}") 

1343 

1344 elif current_event.event == "keepalive": 

1345 # Log keepalive but don't forward 

1346 LOGGER.debug("Received keepalive") 

1347 

1348 # Reset for next event 

1349 current_event = None 

1350 

1351 except Exception as e: 

1352 # Check if it's one of the expected httpx exceptions 

1353 if httpx and isinstance(e, (httpx.ConnectError, httpx.HTTPStatusError, httpx.ReadTimeout)): 

1354 retry_count += 1 

1355 if retry_count >= max_retries: 

1356 LOGGER.error(f"Max retries ({max_retries}) exceeded. Giving up.") 

1357 raise 

1358 

1359 LOGGER.warning(f"Connection error: {e}. Retrying in {retry_delay}s... (attempt {retry_count}/{max_retries})") 

1360 await asyncio.sleep(retry_delay) 

1361 retry_delay = min(retry_delay * 2, 30) # Exponential backoff, max 30s 

1362 else: 

1363 LOGGER.error(f"Unexpected error in SSE stream: {e}") 

1364 raise 

1365 

1366 # Run both tasks concurrently 

1367 # First-Party 

1368 from mcpgateway.services.http_client_service import get_isolated_http_client # pylint: disable=import-outside-toplevel 

1369 

1370 async with get_isolated_http_client(timeout=timeout, headers=headers, verify=DEFAULT_SSL_VERIFY, connect_timeout=10.0, write_timeout=timeout, pool_timeout=timeout) as client: 

1371 try: 

1372 await asyncio.gather(read_stdout(client), pump_sse_to_stdio(client)) 

1373 except Exception as e: 

1374 LOGGER.error(f"Bridge error: {e}") 

1375 raise 

1376 finally: 

1377 # Clean up subprocess 

1378 if process.returncode is None: 

1379 process.terminate() 

1380 with suppress(asyncio.TimeoutError): 

1381 await asyncio.wait_for(process.wait(), timeout=5) 

1382 

1383 

1384async def _run_stdio_to_streamable_http( 

1385 cmd: str, 

1386 port: int, 

1387 log_level: str = "info", 

1388 cors: Optional[List[str]] = None, 

1389 host: str = "127.0.0.1", 

1390 stateless: bool = False, 

1391 json_response: bool = False, 

1392) -> None: 

1393 """Run stdio to streamable HTTP bridge. 

1394 

1395 Starts a subprocess and exposes it via streamable HTTP endpoint. Handles graceful 

1396 shutdown on SIGINT/SIGTERM. 

1397 

1398 Args: 

1399 cmd: The command to run as a stdio subprocess. 

1400 port: The port to bind the HTTP server to. 

1401 log_level: The logging level to use. Defaults to "info". 

1402 cors: Optional list of CORS allowed origins. 

1403 host: The host interface to bind to. Defaults to "127.0.0.1" for security. 

1404 stateless: Whether to use stateless mode for streamable HTTP. Defaults to False. 

1405 json_response: Whether to return JSON responses instead of SSE streams. Defaults to False. 

1406 

1407 Raises: 

1408 ImportError: If MCP server components are not available. 

1409 RuntimeError: If subprocess fails to create stdin/stdout pipes. 

1410 

1411 Examples: 

1412 >>> import asyncio 

1413 >>> async def test_streamable_http(): 

1414 ... # Would start a real subprocess and HTTP server 

1415 ... cmd = "echo hello" 

1416 ... port = 9000 

1417 ... # This would normally run the server 

1418 ... return True 

1419 >>> asyncio.run(test_streamable_http()) 

1420 True 

1421 """ 

1422 # MCP components are available, proceed with setup 

1423 

1424 LOGGER.info(f"Starting stdio to streamable HTTP bridge for command: {cmd}") 

1425 

1426 # Create a simple MCP server that will proxy to stdio subprocess 

1427 mcp_server = MCPServer(name="stdio-proxy") 

1428 

1429 # Create subprocess for stdio communication 

1430 process = await asyncio.create_subprocess_exec( 

1431 *shlex.split(cmd), 

1432 stdin=asyncio.subprocess.PIPE, 

1433 stdout=asyncio.subprocess.PIPE, 

1434 stderr=sys.stderr, 

1435 ) 

1436 

1437 if not process.stdin or not process.stdout: 

1438 raise RuntimeError(f"Failed to create subprocess with stdin/stdout pipes for command: {cmd}") 

1439 

1440 # Set up the streamable HTTP session manager with the server 

1441 session_manager = StreamableHTTPSessionManager( 

1442 app=mcp_server, 

1443 stateless=stateless, 

1444 json_response=json_response, 

1445 ) 

1446 

1447 # Create Starlette app to host the streamable HTTP endpoint 

1448 async def handle_mcp(request: Request) -> None: 

1449 """Handle MCP requests via streamable HTTP. 

1450 

1451 Args: 

1452 request: The incoming HTTP request from Starlette. 

1453 

1454 Examples: 

1455 >>> async def test_handle(): 

1456 ... # Mock request handling 

1457 ... class MockRequest: 

1458 ... scope = {"type": "http"} 

1459 ... async def receive(self): return {} 

1460 ... async def send(self, msg): return None 

1461 ... req = MockRequest() 

1462 ... # Would handle the request via session manager 

1463 ... return req is not None 

1464 >>> import asyncio 

1465 >>> asyncio.run(test_handle()) 

1466 True 

1467 """ 

1468 # The session manager handles all the protocol details - Note: I don't like accessing _send directly -JPS 

1469 await session_manager.handle_request(request.scope, request.receive, request._send) # pylint: disable=W0212 

1470 

1471 routes = [ 

1472 Route("/mcp", handle_mcp, methods=["GET", "POST"]), 

1473 Route("/healthz", lambda request: PlainTextResponse("ok"), methods=["GET"]), 

1474 ] 

1475 

1476 app = Starlette(routes=routes) 

1477 

1478 # Add CORS middleware if specified 

1479 if cors: 

1480 app.add_middleware( 

1481 CORSMiddleware, 

1482 allow_origins=cors, 

1483 allow_credentials=True, 

1484 allow_methods=["*"], 

1485 allow_headers=["*"], 

1486 ) 

1487 

1488 # Run the server with Uvicorn 

1489 config = uvicorn.Config( 

1490 app, 

1491 host=host, 

1492 port=port, 

1493 log_level=log_level, 

1494 lifespan="off", 

1495 ) 

1496 uvicorn_server = uvicorn.Server(config) 

1497 

1498 shutting_down = asyncio.Event() 

1499 

1500 async def _shutdown() -> None: 

1501 """Handle graceful shutdown of the streamable HTTP bridge.""" 

1502 if shutting_down.is_set(): 

1503 return 

1504 shutting_down.set() 

1505 LOGGER.info("Shutting down streamable HTTP bridge...") 

1506 if process.returncode is None: 

1507 process.terminate() 

1508 with suppress(asyncio.TimeoutError): 

1509 await asyncio.wait_for(process.wait(), 5) 

1510 # Graceful shutdown by setting the shutdown event 

1511 # Use getattr to safely access should_exit attribute 

1512 setattr(uvicorn_server, "should_exit", getattr(uvicorn_server, "should_exit", False) or True) 

1513 

1514 loop = asyncio.get_running_loop() 

1515 for sig in (signal.SIGINT, signal.SIGTERM): 

1516 with suppress(NotImplementedError): # Windows lacks add_signal_handler 

1517 loop.add_signal_handler(sig, lambda *_: asyncio.create_task(_shutdown())) 

1518 

1519 # Pump messages between stdio and HTTP 

1520 async def pump_stdio_to_http() -> None: 

1521 """Forward messages from subprocess stdout to HTTP responses. 

1522 

1523 Examples: 

1524 >>> async def test(): 

1525 ... # This would pump messages in real usage 

1526 ... return True 

1527 >>> import asyncio 

1528 >>> asyncio.run(test()) 

1529 True 

1530 """ 

1531 while True: 

1532 try: 

1533 if not process.stdout: 

1534 raise RuntimeError("Process stdout not available") 

1535 line = await process.stdout.readline() 

1536 if not line: 

1537 break 

1538 # The session manager will handle routing to appropriate HTTP responses 

1539 # This would need proper integration with session_manager's internal queue 

1540 LOGGER.debug(f"Received from subprocess: {line.decode().strip()}") 

1541 except Exception as e: 

1542 LOGGER.error(f"Error reading from subprocess: {e}") 

1543 break 

1544 

1545 async def pump_http_to_stdio(data: str) -> None: 

1546 """Forward HTTP requests to subprocess stdin. 

1547 

1548 Args: 

1549 data: The data string to send to subprocess stdin. 

1550 

1551 Examples: 

1552 >>> async def test_pump(): 

1553 ... # Would pump data to subprocess 

1554 ... data = '{"method": "test"}' 

1555 ... # In real use, would write to process.stdin 

1556 ... return len(data) > 0 

1557 >>> import asyncio 

1558 >>> asyncio.run(test_pump()) 

1559 True 

1560 """ 

1561 if not process.stdin: 

1562 raise RuntimeError("Process stdin not available") 

1563 process.stdin.write(data.encode() + b"\n") 

1564 await process.stdin.drain() 

1565 

1566 # Note: pump_http_to_stdio will be used when stdio-to-HTTP bridge is fully implemented 

1567 _ = pump_http_to_stdio 

1568 

1569 # Start the pump task 

1570 pump_task = asyncio.create_task(pump_stdio_to_http()) 

1571 

1572 try: 

1573 LOGGER.info(f"Streamable HTTP bridge ready → http://{host}:{port}/mcp") 

1574 await uvicorn_server.serve() 

1575 finally: 

1576 pump_task.cancel() 

1577 await _shutdown() 

1578 

1579 

1580async def _run_streamable_http_to_stdio( 

1581 url: str, 

1582 oauth2_bearer: Optional[str] = None, 

1583 timeout: float = 30.0, 

1584 stdio_command: Optional[str] = None, 

1585 max_retries: int = 5, 

1586 initial_retry_delay: float = 1.0, 

1587) -> None: 

1588 """Run streamable HTTP to stdio bridge. 

1589 

1590 Connects to a remote streamable HTTP endpoint and bridges it to local stdio. 

1591 Implements proper bidirectional message flow with error handling and retries. 

1592 

1593 Args: 

1594 url: The streamable HTTP endpoint URL to connect to. 

1595 oauth2_bearer: Optional OAuth2 bearer token for authentication. Defaults to None. 

1596 timeout: HTTP client timeout in seconds. Defaults to 30.0. 

1597 stdio_command: Optional command to run for local stdio processing. 

1598 If not provided, will simply print messages to stdout. 

1599 max_retries: Maximum number of connection retry attempts. Defaults to 5. 

1600 initial_retry_delay: Initial delay between retries in seconds. Defaults to 1.0. 

1601 

1602 Raises: 

1603 ImportError: If httpx package is not available. 

1604 RuntimeError: If the subprocess fails to create stdin/stdout pipes. 

1605 Exception: For any unexpected error during bridging operations. 

1606 """ 

1607 if not httpx: 

1608 raise ImportError("httpx package is required for streamable HTTP to stdio bridging") 

1609 

1610 headers = {} 

1611 if oauth2_bearer: 

1612 headers["Authorization"] = f"Bearer {oauth2_bearer}" 

1613 

1614 # Ensure URL ends with /mcp if not already 

1615 if not url.endswith("/mcp"): 

1616 url = url.rstrip("/") + "/mcp" 

1617 

1618 # If no stdio command provided, use simple mode (just print to stdout) 

1619 if not stdio_command: 

1620 LOGGER.warning("No --stdioCommand provided, running in simple mode (streamable HTTP to stdout only)") 

1621 # First-Party 

1622 from mcpgateway.services.http_client_service import get_isolated_http_client # pylint: disable=import-outside-toplevel 

1623 

1624 async with get_isolated_http_client(timeout=timeout, headers=headers, verify=DEFAULT_SSL_VERIFY, connect_timeout=10.0, write_timeout=timeout, pool_timeout=timeout) as client: 

1625 await _simple_streamable_http_pump(client, url, max_retries, initial_retry_delay) 

1626 return 

1627 

1628 # Start the stdio subprocess 

1629 process = await asyncio.create_subprocess_exec( 

1630 *shlex.split(stdio_command), 

1631 stdin=asyncio.subprocess.PIPE, 

1632 stdout=asyncio.subprocess.PIPE, 

1633 stderr=sys.stderr, 

1634 ) 

1635 

1636 if not process.stdin or not process.stdout: 

1637 raise RuntimeError(f"Failed to create subprocess with stdin/stdout pipes for command: {stdio_command}") 

1638 

1639 async def read_stdout(client: httpx.AsyncClient) -> None: 

1640 """Read lines from subprocess stdout and POST to streamable HTTP endpoint. 

1641 

1642 Args: 

1643 client: The HTTP client to use for POSTing messages. 

1644 

1645 Raises: 

1646 RuntimeError: If the process stdout stream is not available. 

1647 """ 

1648 if not process.stdout: 

1649 raise RuntimeError("Process stdout not available") 

1650 

1651 while True: 

1652 if not process.stdout: 

1653 raise RuntimeError("Process stdout not available") 

1654 line = await process.stdout.readline() 

1655 if not line: 

1656 break 

1657 

1658 text = line.decode().strip() 

1659 if not text: 

1660 continue 

1661 

1662 LOGGER.debug(f"← stdio: {text}") 

1663 

1664 # POST the JSON-RPC request to the streamable HTTP endpoint 

1665 try: 

1666 if CONTENT_TYPE == "application/x-www-form-urlencoded": 

1667 # If text is JSON, parse and encode as form 

1668 try: 

1669 payload = orjson.loads(text) 

1670 body = urlencode(payload) 

1671 except Exception: 

1672 body = text 

1673 response = await client.post(url, content=body, headers=headers) 

1674 else: 

1675 response = await client.post(url, content=text, headers=headers) 

1676 if response.status_code == 200: 

1677 # Handle JSON response 

1678 response_data = response.text 

1679 if response_data and process.stdin: 

1680 process.stdin.write((response_data + "\n").encode()) 

1681 await process.stdin.drain() 

1682 LOGGER.debug(f"→ stdio: {response_data}") 

1683 else: 

1684 LOGGER.warning(f"Streamable HTTP endpoint returned {response.status_code}: {response.text}") 

1685 except Exception as e: 

1686 LOGGER.error(f"Failed to POST to streamable HTTP endpoint: {e}") 

1687 

1688 async def pump_streamable_http_to_stdio(client: httpx.AsyncClient) -> None: 

1689 """Stream data from remote streamable HTTP endpoint to subprocess stdin. 

1690 

1691 Args: 

1692 client: The HTTP client to use for streamable HTTP streaming. 

1693 

1694 Raises: 

1695 httpx.HTTPStatusError: If the streamable HTTP endpoint returns a non-200 status code. 

1696 Exception: For unexpected errors in streamable HTTP stream processing. 

1697 """ 

1698 retry_delay = initial_retry_delay 

1699 retry_count = 0 

1700 

1701 while retry_count < max_retries: 

1702 try: 

1703 LOGGER.info(f"Connecting to streamable HTTP endpoint: {url}") 

1704 

1705 # For streamable HTTP, we need to handle both SSE streams and JSON responses 

1706 # Try SSE first (for stateful sessions or when SSE is preferred) 

1707 async with client.stream("GET", url, headers={"Accept": "text/event-stream"}) as response: 

1708 if response.status_code != 200: 

1709 if httpx: 

1710 raise httpx.HTTPStatusError(f"Streamable HTTP endpoint returned {response.status_code}", request=response.request, response=response) 

1711 raise Exception(f"Streamable HTTP endpoint returned {response.status_code}") 

1712 

1713 # Reset retry counter on successful connection 

1714 retry_count = 0 

1715 retry_delay = initial_retry_delay 

1716 

1717 async for line in response.aiter_lines(): 

1718 if line.startswith("data:"): 

1719 data = line[5:] 

1720 if data.startswith(" "): 

1721 data = data[1:] 

1722 if data and process.stdin: 

1723 process.stdin.write((data + "\n").encode()) 

1724 await process.stdin.drain() 

1725 LOGGER.debug(f"→ stdio: {data}") 

1726 

1727 except Exception as e: 

1728 if httpx and isinstance(e, (httpx.ConnectError, httpx.HTTPStatusError, httpx.ReadTimeout)): 

1729 retry_count += 1 

1730 if retry_count >= max_retries: 

1731 LOGGER.error(f"Max retries ({max_retries}) exceeded. Giving up.") 

1732 raise 

1733 

1734 LOGGER.warning(f"Connection error: {e}. Retrying in {retry_delay}s... (attempt {retry_count}/{max_retries})") 

1735 await asyncio.sleep(retry_delay) 

1736 retry_delay = min(retry_delay * 2, 30) # Exponential backoff, max 30s 

1737 else: 

1738 LOGGER.error(f"Unexpected error in streamable HTTP stream: {e}") 

1739 raise 

1740 

1741 # Run both tasks concurrently 

1742 # First-Party 

1743 from mcpgateway.services.http_client_service import get_isolated_http_client # pylint: disable=import-outside-toplevel 

1744 

1745 async with get_isolated_http_client(timeout=timeout, headers=headers, verify=DEFAULT_SSL_VERIFY, connect_timeout=10.0, write_timeout=timeout, pool_timeout=timeout) as client: 

1746 try: 

1747 await asyncio.gather(read_stdout(client), pump_streamable_http_to_stdio(client)) 

1748 except Exception as e: 

1749 LOGGER.error(f"Bridge error: {e}") 

1750 raise 

1751 finally: 

1752 # Clean up subprocess 

1753 if process.returncode is None: 

1754 process.terminate() 

1755 with suppress(asyncio.TimeoutError): 

1756 await asyncio.wait_for(process.wait(), timeout=5) 

1757 

1758 

1759async def _simple_streamable_http_pump(client: "Any", url: str, max_retries: int, initial_retry_delay: float) -> None: 

1760 """Simple streamable HTTP pump that just prints messages to stdout. 

1761 

1762 Used when no stdio command is provided to bridge streamable HTTP to stdout directly. 

1763 

1764 Args: 

1765 client: The HTTP client to use for streamable HTTP streaming. 

1766 url: The streamable HTTP endpoint URL to connect to. 

1767 max_retries: Maximum number of connection retry attempts. 

1768 initial_retry_delay: Initial delay between retries in seconds. 

1769 

1770 Raises: 

1771 Exception: For unexpected errors in streamable HTTP stream processing including 

1772 HTTPStatusError if the endpoint returns a non-200 status code. 

1773 """ 

1774 retry_delay = initial_retry_delay 

1775 retry_count = 0 

1776 

1777 while retry_count < max_retries: 

1778 try: 

1779 LOGGER.info(f"Connecting to streamable HTTP endpoint: {url}") 

1780 

1781 # Try to get SSE stream 

1782 async with client.stream("GET", url, headers={"Accept": "text/event-stream"}) as response: 

1783 if response.status_code != 200: 

1784 if httpx: 

1785 raise httpx.HTTPStatusError(f"Streamable HTTP endpoint returned {response.status_code}", request=response.request, response=response) 

1786 raise Exception(f"Streamable HTTP endpoint returned {response.status_code}") 

1787 

1788 # Reset retry counter on successful connection 

1789 retry_count = 0 

1790 retry_delay = initial_retry_delay 

1791 

1792 async for line in response.aiter_lines(): 

1793 if line.startswith("data:"): 

1794 data = line[5:] 

1795 if data.startswith(" "): 

1796 data = data[1:] 

1797 if data: 

1798 print(data) 

1799 LOGGER.debug(f"Received: {data}") 

1800 

1801 except Exception as e: 

1802 if httpx and isinstance(e, (httpx.ConnectError, httpx.HTTPStatusError, httpx.ReadTimeout)): 

1803 retry_count += 1 

1804 if retry_count >= max_retries: 

1805 LOGGER.error(f"Max retries ({max_retries}) exceeded. Giving up.") 

1806 raise 

1807 

1808 LOGGER.warning(f"Connection error: {e}. Retrying in {retry_delay}s... (attempt {retry_count}/{max_retries})") 

1809 await asyncio.sleep(retry_delay) 

1810 retry_delay = min(retry_delay * 2, 30) # Exponential backoff, max 30s 

1811 else: 

1812 LOGGER.error(f"Unexpected error in streamable HTTP stream: {e}") 

1813 raise 

1814 

1815 

1816async def _run_multi_protocol_server( # pylint: disable=too-many-positional-arguments 

1817 cmd: str, 

1818 port: int, 

1819 log_level: str = "info", 

1820 cors: Optional[List[str]] = None, 

1821 host: str = "127.0.0.1", 

1822 expose_sse: bool = False, 

1823 expose_streamable_http: bool = False, 

1824 sse_path: str = "/sse", 

1825 message_path: str = "/message", 

1826 keep_alive: float = KEEP_ALIVE_INTERVAL, 

1827 stateless: bool = False, 

1828 json_response: bool = False, 

1829 header_mappings: Optional[NormalizedMappings] = None, 

1830) -> None: 

1831 """Run a stdio server and expose it via multiple protocols simultaneously. 

1832 

1833 Args: 

1834 cmd: The command to run as a stdio subprocess. 

1835 port: The port to bind the HTTP server to. 

1836 log_level: The logging level to use. Defaults to "info". 

1837 cors: Optional list of CORS allowed origins. 

1838 host: The host interface to bind to. Defaults to "127.0.0.1". 

1839 expose_sse: Whether to expose via SSE protocol. 

1840 expose_streamable_http: Whether to expose via streamable HTTP protocol. 

1841 sse_path: Path for SSE endpoint. Defaults to "/sse". 

1842 message_path: Path for message endpoint. Defaults to "/message". 

1843 keep_alive: Keep-alive interval for SSE. Defaults to KEEP_ALIVE_INTERVAL. 

1844 stateless: Whether to use stateless mode for streamable HTTP. 

1845 json_response: Whether to return JSON responses for streamable HTTP. 

1846 header_mappings: Optional mapping of HTTP headers to environment variables. 

1847 """ 

1848 LOGGER.info(f"Starting multi-protocol server for command: {cmd}") 

1849 LOGGER.info(f"Protocols: SSE={expose_sse}, StreamableHTTP={expose_streamable_http}") 

1850 

1851 # Create a shared pubsub whenever either protocol needs stdout observations 

1852 pubsub = _PubSub() if (expose_sse or expose_streamable_http) else None 

1853 

1854 # Create the stdio endpoint 

1855 stdio = StdIOEndpoint(cmd, pubsub, header_mappings=header_mappings) if (expose_sse or expose_streamable_http) and pubsub else None 

1856 

1857 # Create fastapi app and middleware 

1858 app = FastAPI() 

1859 

1860 # Add CORS middleware if specified 

1861 if cors: 

1862 app.add_middleware( 

1863 CORSMiddleware, 

1864 allow_origins=cors, 

1865 allow_credentials=True, 

1866 allow_methods=["*"], 

1867 allow_headers=["*"], 

1868 ) 

1869 

1870 # Start stdio if at least one transport requires it 

1871 if stdio: 

1872 await stdio.start() 

1873 

1874 # SSE endpoints 

1875 if expose_sse and stdio and pubsub: 

1876 

1877 @app.get(sse_path) 

1878 async def get_sse(request: Request) -> EventSourceResponse: 

1879 """SSE endpoint. 

1880 

1881 Args: 

1882 request: The incoming HTTP request. 

1883 

1884 Returns: 

1885 EventSourceResponse: Server-sent events stream. 

1886 """ 

1887 if not pubsub: 

1888 raise RuntimeError("PubSub not available") 

1889 

1890 # Extract environment variables from headers if dynamic env is enabled 

1891 additional_env_vars = {} 

1892 if header_mappings and stdio: 

1893 request_headers = dict(request.headers) 

1894 additional_env_vars = extract_env_vars_from_headers(request_headers, header_mappings) 

1895 

1896 # Restart stdio endpoint with new environment variables 

1897 if additional_env_vars: 

1898 LOGGER.info(f"Restarting stdio endpoint with {len(additional_env_vars)} environment variables") 

1899 await stdio.stop() # Stop existing process 

1900 await stdio.start(additional_env_vars) # Start with new env vars 

1901 

1902 queue = pubsub.subscribe() 

1903 session_id = uuid.uuid4().hex 

1904 

1905 async def event_gen() -> AsyncIterator[Dict[str, Any]]: 

1906 """Generate SSE events for the client. 

1907 

1908 Yields: 

1909 Dict[str, Any]: SSE event data with event type and payload. 

1910 """ 

1911 endpoint_url = f"{str(request.base_url).rstrip('/')}{message_path}?session_id={session_id}" 

1912 yield { 

1913 "event": "endpoint", 

1914 "data": endpoint_url, 

1915 "retry": int(keep_alive * 1000), 

1916 } 

1917 

1918 if DEFAULT_KEEPALIVE_ENABLED: 

1919 yield {"event": "keepalive", "data": "{}", "retry": keep_alive * 1000} 

1920 

1921 try: 

1922 while True: 

1923 if await request.is_disconnected(): 

1924 break 

1925 

1926 try: 

1927 timeout = keep_alive if DEFAULT_KEEPALIVE_ENABLED else None 

1928 msg = await asyncio.wait_for(queue.get(), timeout) 

1929 yield {"event": "message", "data": msg.rstrip()} 

1930 except asyncio.TimeoutError: 

1931 if DEFAULT_KEEPALIVE_ENABLED: 

1932 yield { 

1933 "event": "keepalive", 

1934 "data": "{}", 

1935 "retry": keep_alive * 1000, 

1936 } 

1937 finally: 

1938 if pubsub: 

1939 pubsub.unsubscribe(queue) 

1940 

1941 return EventSourceResponse( 

1942 event_gen(), 

1943 headers={ 

1944 "Cache-Control": "no-cache", 

1945 "Connection": "keep-alive", 

1946 "X-Accel-Buffering": "no", 

1947 }, 

1948 ) 

1949 

1950 @app.post(message_path, status_code=status.HTTP_202_ACCEPTED) 

1951 async def post_message(raw: Request, session_id: str | None = None) -> Response: 

1952 """Message endpoint for SSE. 

1953 

1954 Args: 

1955 raw: The incoming HTTP request. 

1956 session_id: Optional session ID for correlation. 

1957 

1958 Returns: 

1959 Response: Acknowledgement of message receipt. 

1960 """ 

1961 _ = session_id 

1962 

1963 # Extract environment variables from headers if dynamic env is enabled 

1964 additional_env_vars = {} 

1965 if header_mappings and stdio: 

1966 request_headers = dict(raw.headers) 

1967 additional_env_vars = extract_env_vars_from_headers(request_headers, header_mappings) 

1968 

1969 # Only restart if we have new environment variables 

1970 if additional_env_vars: 

1971 LOGGER.info(f"Restarting stdio endpoint with {len(additional_env_vars)} environment variables") 

1972 await stdio.stop() # Stop existing process 

1973 await stdio.start(additional_env_vars) # Start with new env vars 

1974 await asyncio.sleep(0.5) # Give process time to initialize 

1975 

1976 # Ensure stdio endpoint is running 

1977 if stdio and not stdio.is_running(): 

1978 LOGGER.info("Starting stdio endpoint (was not running)") 

1979 await stdio.start() 

1980 await asyncio.sleep(0.5) # Give process time to initialize 

1981 

1982 payload = await raw.body() 

1983 try: 

1984 orjson.loads(payload) 

1985 except Exception as exc: 

1986 return PlainTextResponse( 

1987 f"Invalid JSON payload: {exc}", 

1988 status_code=status.HTTP_400_BAD_REQUEST, 

1989 ) 

1990 if not stdio: 

1991 raise RuntimeError("Stdio endpoint not available") 

1992 await stdio.send(payload.decode().rstrip() + "\n") 

1993 return PlainTextResponse("forwarded", status_code=status.HTTP_202_ACCEPTED) 

1994 

1995 # Add health check 

1996 @app.get("/healthz") 

1997 async def health() -> Response: 

1998 """Health check endpoint. 

1999 

2000 Returns: 

2001 Response: Health status response. 

2002 """ 

2003 return PlainTextResponse("ok") 

2004 

2005 # Streamable HTTP support 

2006 streamable_server = None 

2007 streamable_manager = None 

2008 streamable_context = None 

2009 

2010 # Keep a reference to the original FastAPI app so we can wrap it with an ASGI 

2011 # layer that delegates `/mcp` scopes to the StreamableHTTPSessionManager if present. 

2012 original_app = app 

2013 

2014 if expose_streamable_http: 

2015 # Create an MCP server instance 

2016 streamable_server = MCPServer("stdio-proxy") 

2017 

2018 # Set up the streamable HTTP session manager 

2019 streamable_manager = StreamableHTTPSessionManager( 

2020 app=streamable_server, 

2021 stateless=stateless, 

2022 json_response=json_response, 

2023 ) 

2024 

2025 # Register POST /mcp on the FastAPI app as the canonical client->server POST 

2026 # path for Streamable HTTP. This forwards JSON-RPC notifications/requests to stdio. 

2027 @original_app.post("/mcp") 

2028 async def mcp_post(request: Request) -> Response: 

2029 """ 

2030 Handles POST requests to the `/mcp` endpoint, forwarding JSON payloads to stdio 

2031 and optionally waiting for a correlated response. 

2032 

2033 The request body is expected to be a JSON object or newline-delimited JSON. 

2034 If the JSON includes an "id" field, the function attempts to match it with 

2035 a response from stdio using a pubsub queue, within a timeout period. 

2036 

2037 Args: 

2038 request (Request): The incoming FastAPI request containing the JSON payload. 

2039 

2040 Returns: 

2041 Response: A FastAPI Response object. 

2042 - 200 OK with matched JSON response if correlation succeeds. 

2043 - 202 Accepted if no matching response is received in time or for notifications. 

2044 - 400 Bad Request if the payload is not valid JSON. 

2045 

2046 Example: 

2047 >>> import httpx 

2048 >>> response = httpx.post("http://localhost:8000/mcp", json={"id": 123, "method": "ping"}) 

2049 >>> response.status_code in (200, 202) 

2050 True 

2051 >>> response.text # May be the matched JSON or "accepted" 

2052 '{"id": 123, "result": "pong"}' # or "accepted" 

2053 """ 

2054 # Read and validate JSON 

2055 body = await request.body() 

2056 try: 

2057 obj = orjson.loads(body) 

2058 except Exception as exc: 

2059 return PlainTextResponse(f"Invalid JSON payload: {exc}", status_code=status.HTTP_400_BAD_REQUEST) 

2060 

2061 # Forward raw newline-delimited JSON to stdio 

2062 if not stdio: 

2063 raise RuntimeError("Stdio endpoint not available") 

2064 await stdio.send(body.decode().rstrip() + "\n") 

2065 

2066 # If it's a request (has an id) -> attempt to correlate response from stdio 

2067 if isinstance(obj, dict) and "id" in obj: 

2068 if not pubsub: 

2069 return PlainTextResponse("accepted", status_code=status.HTTP_202_ACCEPTED) 

2070 

2071 queue = pubsub.subscribe() 

2072 try: 

2073 timeout = 10.0 # seconds; tuneable 

2074 deadline = asyncio.get_event_loop().time() + timeout 

2075 remaining = max(0.0, deadline - asyncio.get_event_loop().time()) 

2076 while remaining > 0: 

2077 try: 

2078 msg = await asyncio.wait_for(queue.get(), timeout=remaining) 

2079 except asyncio.TimeoutError: 

2080 break 

2081 

2082 # stdio stdout lines may contain JSON objects or arrays 

2083 try: 

2084 parsed = orjson.loads(msg) 

2085 except (orjson.JSONDecodeError, ValueError): 

2086 # not JSON -> skip 

2087 remaining = max(0.0, deadline - asyncio.get_event_loop().time()) 

2088 continue 

2089 

2090 candidates = parsed if isinstance(parsed, list) else [parsed] 

2091 for candidate in candidates: 

2092 if isinstance(candidate, dict) and candidate.get("id") == obj.get("id"): 

2093 # return the matched response as JSON 

2094 return ORJSONResponse(candidate) 

2095 

2096 remaining = max(0.0, deadline - asyncio.get_event_loop().time()) 

2097 

2098 # timeout -> accept and return 202 

2099 return PlainTextResponse("accepted (no response yet)", status_code=status.HTTP_202_ACCEPTED) 

2100 finally: 

2101 if pubsub: 

2102 pubsub.unsubscribe(queue) 

2103 

2104 # Notification -> return 202 

2105 return PlainTextResponse("accepted", status_code=status.HTTP_202_ACCEPTED) 

2106 

2107 # ASGI wrapper to route GET/other /mcp scopes to streamable_manager.handle_request 

2108 async def mcp_asgi_wrapper(scope: Scope, receive: Receive, send: Send) -> None: 

2109 """ 

2110 ASGI middleware that intercepts HTTP requests to the `/mcp` endpoint. 

2111 

2112 If the request is an HTTP call to `/mcp` and a `streamable_manager` is available, 

2113 it can handle the request (currently commented out). All other requests are 

2114 passed to the original FastAPI application. 

2115 

2116 Args: 

2117 scope (Scope): The ASGI scope dictionary containing request metadata. 

2118 receive (Receive): An awaitable that yields incoming ASGI events. 

2119 send (Send): An awaitable used to send ASGI events. 

2120 """ 

2121 await original_app(scope, receive, send) 

2122 

2123 # Replace the app used by uvicorn with the ASGI wrapper 

2124 app = mcp_asgi_wrapper # type: ignore[assignment] 

2125 

2126 # ---------------------- Server lifecycle ---------------------- 

2127 config = uvicorn.Config( 

2128 app, 

2129 host=host, 

2130 port=port, 

2131 log_level=log_level, 

2132 lifespan="off", 

2133 ) 

2134 server = uvicorn.Server(config) 

2135 

2136 shutting_down = asyncio.Event() 

2137 

2138 async def _shutdown() -> None: 

2139 """Handle graceful shutdown.""" 

2140 if shutting_down.is_set(): 

2141 return 

2142 shutting_down.set() 

2143 LOGGER.info("Shutting down multi-protocol server...") 

2144 if stdio: 

2145 await stdio.stop() 

2146 # Streamable HTTP cleanup handled by server shutdown 

2147 # Graceful shutdown by setting the shutdown event 

2148 # Use getattr to safely access should_exit attribute 

2149 setattr(server, "should_exit", getattr(server, "should_exit", False) or True) 

2150 

2151 loop = asyncio.get_running_loop() 

2152 for sig in (signal.SIGINT, signal.SIGTERM): 

2153 with suppress(NotImplementedError): 

2154 loop.add_signal_handler(sig, lambda *_: asyncio.create_task(_shutdown())) 

2155 

2156 # If we have a streamable manager, start its context so it can accept ASGI /mcp 

2157 if streamable_manager: 

2158 streamable_context = streamable_manager.run() 

2159 await streamable_context.__aenter__() # noqa: PLC2801 # pylint: disable=unnecessary-dunder-call,no-member 

2160 

2161 # Log available endpoints 

2162 endpoints = [] 

2163 if expose_sse: 

2164 endpoints.append(f"SSE: http://{host}:{port}{sse_path}") 

2165 if expose_streamable_http: 

2166 endpoints.append(f"StreamableHTTP: http://{host}:{port}/mcp") 

2167 

2168 LOGGER.info(f"Multi-protocol server ready → {', '.join(endpoints)}") 

2169 

2170 try: 

2171 await server.serve() 

2172 finally: 

2173 await _shutdown() 

2174 # Clean up streamable HTTP context with timeout to prevent spin loop 

2175 # if tasks don't respond to cancellation (anyio _deliver_cancellation issue) 

2176 if streamable_context: 

2177 # Get cleanup timeout from config (with fallback for standalone usage) 

2178 try: 

2179 # First-Party 

2180 from mcpgateway.config import settings as cfg # pylint: disable=import-outside-toplevel 

2181 

2182 cleanup_timeout = cfg.mcp_session_pool_cleanup_timeout 

2183 except Exception: 

2184 cleanup_timeout = 5.0 

2185 # Use anyio.move_on_after instead of asyncio.wait_for to properly propagate 

2186 # cancellation through anyio's cancel scope system (prevents orphaned spinning tasks) 

2187 with anyio.move_on_after(cleanup_timeout) as cleanup_scope: 

2188 try: 

2189 await streamable_context.__aexit__(None, None, None) # pylint: disable=unnecessary-dunder-call,no-member 

2190 except Exception as e: 

2191 LOGGER.debug(f"Error cleaning up streamable HTTP context: {e}") 

2192 if cleanup_scope.cancelled_caught: 

2193 LOGGER.warning("Streamable HTTP context cleanup timed out - proceeding anyway") 

2194 

2195 

2196async def _simple_sse_pump(client: "Any", url: str, max_retries: int, initial_retry_delay: float) -> None: 

2197 """Simple SSE pump that just prints messages to stdout. 

2198 

2199 Used when no stdio command is provided to bridge SSE to stdout directly. 

2200 

2201 Args: 

2202 client: The HTTP client to use for SSE streaming. 

2203 url: The SSE endpoint URL to connect to. 

2204 max_retries: Maximum number of connection retry attempts. 

2205 initial_retry_delay: Initial delay between retries in seconds. 

2206 

2207 Raises: 

2208 HTTPStatusError: If the SSE endpoint returns a non-200 status code. 

2209 Exception: For unexpected errors in SSE stream processing. 

2210 """ 

2211 retry_delay = initial_retry_delay 

2212 retry_count = 0 

2213 

2214 while retry_count < max_retries: 

2215 try: 

2216 LOGGER.info(f"Connecting to SSE endpoint: {url}") 

2217 

2218 async with client.stream("GET", url) as response: 

2219 # Check status code if available (real httpx response) 

2220 if hasattr(response, "status_code") and response.status_code != 200: 

2221 if httpx: 

2222 raise httpx.HTTPStatusError(f"SSE endpoint returned {response.status_code}", request=response.request, response=response) 

2223 raise Exception(f"SSE endpoint returned {response.status_code}") 

2224 

2225 # Reset retry counter on successful connection 

2226 retry_count = 0 

2227 retry_delay = initial_retry_delay 

2228 current_event: Optional[SSEEvent] = None 

2229 

2230 async for line in response.aiter_lines(): 

2231 event, is_complete = SSEEvent.parse_sse_line(line, current_event) 

2232 current_event = event 

2233 

2234 if is_complete and current_event: 

2235 if current_event.event == "endpoint": 

2236 LOGGER.info(f"Received message endpoint: {current_event.data}") 

2237 elif current_event.event == "message": 

2238 # Just print the message to stdout 

2239 print(current_event.data) 

2240 elif current_event.event == "keepalive": 

2241 LOGGER.debug("Received keepalive") 

2242 

2243 # Reset for next event 

2244 current_event = None 

2245 

2246 except Exception as e: 

2247 # Check if it's one of the expected httpx exceptions 

2248 if httpx and isinstance(e, (httpx.ConnectError, httpx.HTTPStatusError, httpx.ReadTimeout)): 

2249 retry_count += 1 

2250 if retry_count >= max_retries: 

2251 LOGGER.error(f"Max retries ({max_retries}) exceeded. Giving up.") 

2252 raise 

2253 

2254 LOGGER.warning(f"Connection error: {e}. Retrying in {retry_delay}s... (attempt {retry_count}/{max_retries})") 

2255 await asyncio.sleep(retry_delay) 

2256 retry_delay = min(retry_delay * 2, 30) # Exponential backoff, max 30s 

2257 else: 

2258 LOGGER.error(f"Unexpected error in SSE stream: {e}") 

2259 raise 

2260 

2261 

2262def start_streamable_http_stdio( 

2263 cmd: str, 

2264 port: int, 

2265 log_level: str, 

2266 cors: Optional[List[str]], 

2267 host: str = "127.0.0.1", 

2268 stateless: bool = False, 

2269 json_response: bool = False, 

2270) -> None: 

2271 """Start stdio to streamable HTTP bridge. 

2272 

2273 Entry point for starting a stdio to streamable HTTP bridge server. 

2274 

2275 Args: 

2276 cmd: The command to run as a stdio subprocess. 

2277 port: The port to bind the HTTP server to. 

2278 log_level: The logging level to use. 

2279 cors: Optional list of CORS allowed origins. 

2280 host: The host interface to bind to. Defaults to "127.0.0.1". 

2281 stateless: Whether to use stateless mode. Defaults to False. 

2282 json_response: Whether to return JSON responses. Defaults to False. 

2283 

2284 Returns: 

2285 None: This function does not return a value. 

2286 """ 

2287 return asyncio.run(_run_stdio_to_streamable_http(cmd, port, log_level, cors, host, stateless, json_response)) 

2288 

2289 

2290def start_streamable_http_client(url: str, bearer_token: Optional[str] = None, timeout: float = 30.0, stdio_command: Optional[str] = None) -> None: 

2291 """Start streamable HTTP to stdio bridge. 

2292 

2293 Entry point for starting a streamable HTTP to stdio bridge client. 

2294 

2295 Args: 

2296 url: The streamable HTTP endpoint URL to connect to. 

2297 bearer_token: Optional OAuth2 bearer token for authentication. Defaults to None. 

2298 timeout: HTTP client timeout in seconds. Defaults to 30.0. 

2299 stdio_command: Optional command to run for local stdio processing. 

2300 

2301 Returns: 

2302 None: This function does not return a value. 

2303 """ 

2304 return asyncio.run(_run_streamable_http_to_stdio(url, bearer_token, timeout, stdio_command)) 

2305 

2306 

2307def start_stdio( 

2308 cmd: str, port: int, log_level: str, cors: Optional[List[str]], host: str = "127.0.0.1", sse_path: str = "/sse", message_path: str = "/message", keep_alive: float = KEEP_ALIVE_INTERVAL 

2309) -> None: 

2310 """Start stdio bridge. 

2311 

2312 Entry point for starting a stdio to SSE bridge server. 

2313 

2314 Args: 

2315 cmd: The command to run as a stdio subprocess. 

2316 port: The port to bind the HTTP server to. 

2317 log_level: The logging level to use. 

2318 cors: Optional list of CORS allowed origins. 

2319 host: The host interface to bind to. Defaults to "127.0.0.1". 

2320 sse_path: Path for the SSE endpoint. Defaults to "/sse". 

2321 message_path: Path for the message endpoint. Defaults to "/message". 

2322 keep_alive: Keep-alive interval in seconds. Defaults to KEEP_ALIVE_INTERVAL. 

2323 

2324 Returns: 

2325 None: This function does not return a value. 

2326 

2327 Examples: 

2328 >>> # Test parameter validation 

2329 >>> isinstance(KEEP_ALIVE_INTERVAL, int) 

2330 True 

2331 >>> KEEP_ALIVE_INTERVAL > 0 

2332 True 

2333 >>> start_stdio("uvx mcp-server-git", 9000, "info", None) # doctest: +SKIP 

2334 """ 

2335 return asyncio.run(_run_stdio_to_sse(cmd, port, log_level, cors, host, sse_path, message_path, keep_alive)) 

2336 

2337 

2338def start_sse(url: str, bearer_token: Optional[str] = None, timeout: float = 30.0, stdio_command: Optional[str] = None) -> None: 

2339 """Start SSE bridge. 

2340 

2341 Entry point for starting an SSE to stdio bridge client. 

2342 

2343 Examples: 

2344 >>> # Test parameter defaults 

2345 >>> timeout_default = 30.0 

2346 >>> isinstance(timeout_default, float) 

2347 True 

2348 >>> timeout_default > 0 

2349 True 

2350 

2351 Args: 

2352 url: The SSE endpoint URL to connect to. 

2353 bearer_token: Optional OAuth2 bearer token for authentication. Defaults to None. 

2354 timeout: HTTP client timeout in seconds. Defaults to 30.0. 

2355 stdio_command: Optional command to run for local stdio processing. 

2356 

2357 Returns: 

2358 None: This function does not return a value. 

2359 

2360 Examples: 

2361 >>> start_sse("http://example.com/sse", "token123") # doctest: +SKIP 

2362 """ 

2363 return asyncio.run(_run_sse_to_stdio(url, bearer_token, timeout, stdio_command)) 

2364 

2365 

2366def main(argv: Optional[Sequence[str]] | None = None) -> None: 

2367 """Entry point for the translate module. 

2368 

2369 Configures logging, parses arguments, and starts the appropriate bridge 

2370 based on command line options. Handles keyboard interrupts gracefully. 

2371 

2372 Args: 

2373 argv: Optional sequence of command line arguments. If None, uses sys.argv[1:]. 

2374 

2375 Examples: 

2376 >>> # Test argument parsing 

2377 >>> try: 

2378 ... main(["--stdio", "cat", "--port", "9000"]) # doctest: +SKIP 

2379 ... except SystemExit: 

2380 ... pass # Would normally start the server 

2381 """ 

2382 args = _parse_args(argv or sys.argv[1:]) 

2383 logging.basicConfig( 

2384 level=getattr(logging, args.logLevel.upper(), logging.INFO), 

2385 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 

2386 datefmt="%Y-%m-%dT%H:%M:%S", 

2387 ) 

2388 

2389 # Parse header mappings if dynamic environment injection is enabled 

2390 # Pre-normalize mappings once at startup for O(1) lookups per request 

2391 header_mappings: NormalizedMappings | None = None 

2392 if getattr(args, "enable_dynamic_env", False): 

2393 try: 

2394 raw_mappings = parse_header_mappings(getattr(args, "header_to_env", [])) 

2395 header_mappings = NormalizedMappings(raw_mappings) 

2396 LOGGER.info(f"Dynamic environment injection enabled with {len(header_mappings)} header mappings") 

2397 except Exception as e: 

2398 LOGGER.error(f"Failed to parse header mappings: {e}") 

2399 raise 

2400 

2401 try: 

2402 # Handle gRPC server exposure 

2403 if getattr(args, "grpc", None): 

2404 # First-Party 

2405 from mcpgateway.translate_grpc import expose_grpc_via_sse # pylint: disable=import-outside-toplevel 

2406 

2407 # Parse metadata 

2408 metadata = {} 

2409 if getattr(args, "grpc_metadata", None): 

2410 for item in args.grpc_metadata: 

2411 if "=" in item: 

2412 key, value = item.split("=", 1) 

2413 metadata[key] = value 

2414 

2415 asyncio.run( 

2416 expose_grpc_via_sse( 

2417 target=args.grpc, 

2418 port=args.port, 

2419 tls_enabled=getattr(args, "grpc_tls", False), 

2420 tls_cert=getattr(args, "grpc_cert", None), 

2421 tls_key=getattr(args, "grpc_key", None), 

2422 metadata=metadata, 

2423 ) 

2424 ) 

2425 

2426 # Handle local stdio server exposure 

2427 elif args.stdio: 

2428 # Check which protocols to expose 

2429 expose_sse = getattr(args, "expose_sse", False) 

2430 expose_streamable_http = getattr(args, "expose_streamable_http", False) 

2431 

2432 # If no protocol specified, default to SSE for backward compatibility 

2433 if not expose_sse and not expose_streamable_http: 

2434 expose_sse = True 

2435 

2436 # Use multi-protocol server 

2437 asyncio.run( 

2438 _run_multi_protocol_server( 

2439 cmd=args.stdio, 

2440 port=args.port, 

2441 log_level=args.logLevel, 

2442 cors=args.cors, 

2443 host=args.host, 

2444 expose_sse=expose_sse, 

2445 expose_streamable_http=expose_streamable_http, 

2446 sse_path=getattr(args, "ssePath", "/sse"), 

2447 message_path=getattr(args, "messagePath", "/message"), 

2448 keep_alive=getattr(args, "keepAlive", KEEP_ALIVE_INTERVAL), 

2449 stateless=getattr(args, "stateless", False), 

2450 json_response=getattr(args, "jsonResponse", False), 

2451 header_mappings=header_mappings, 

2452 ) 

2453 ) 

2454 

2455 # Handle remote connection modes 

2456 elif getattr(args, "connect_sse", None): 

2457 start_sse(args.connect_sse, args.oauth2Bearer, 30.0, args.stdioCommand) 

2458 elif getattr(args, "connect_streamable_http", None): 

2459 start_streamable_http_client(args.connect_streamable_http, args.oauth2Bearer, 30.0, args.stdioCommand) 

2460 elif getattr(args, "connect_grpc", None): 

2461 print("Error: --connect-grpc mode not yet implemented. Use --grpc to expose a gRPC server.", file=sys.stderr) 

2462 sys.exit(1) 

2463 else: 

2464 print("Error: Must specify either --stdio (to expose local server), --grpc (to expose gRPC server), or --connect-sse/--connect-streamable-http (to connect to remote)", file=sys.stderr) 

2465 sys.exit(1) 

2466 except KeyboardInterrupt: 

2467 print("") # restore shell prompt 

2468 sys.exit(0) 

2469 except (NotImplementedError, ImportError) as exc: 

2470 print(exc, file=sys.stderr) 

2471 sys.exit(1) 

2472 

2473 

2474if __name__ == "__main__": # python3 -m mcpgateway.translate ... 

2475 main()