Coverage for mcpgateway / wrapper.py: 100%

305 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/wrapper.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Keval Mahajan 

6 

7ContextForge Wrapper. 

8MCP Client (stdio) <-> ContextForge Bridge 

9 

10This module implements a wrapper stdio bridge that facilitates 

11interaction between the MCP client and the MCP gateway. 

12It provides several functionalities, including listing tools, 

13invoking tools, managing resources, retrieving prompts, 

14and handling tool calls via the MCP gateway. 

15 

16- All JSON-RPC traffic is written to stdout. 

17- All logs/diagnostics are written to stderr, ensuring clean separation. 

18 

19Environment Variables 

20--------------------- 

21- **MCP_SERVER_URL** (or `--url`): Gateway MCP endpoint URL. 

22- **MCP_AUTH** (or `--auth`): Authorization header value. 

23- **MCP_TOOL_CALL_TIMEOUT** (or `--timeout`): Response timeout in seconds (default: 60). 

24- **MCP_WRAPPER_LOG_LEVEL** (or `--log-level`): Logging level, or OFF to disable. 

25- **CONCURRENCY**: Max concurrent tool calls (default: 10). 

26 

27Example usage: 

28-------------- 

29 

30Method 1: Using environment variables 

31 $ export MCP_SERVER_URL='http://localhost:4444/servers/UUID/mcp' 

32 $ export MCP_AUTH='Bearer <token>' 

33 $ export MCP_TOOL_CALL_TIMEOUT=120 

34 $ export MCP_WRAPPER_LOG_LEVEL=DEBUG 

35 $ python3 -m mcpgateway.wrapper 

36 

37Method 2: Using command-line arguments 

38 $ python3 -m mcpgateway.wrapper --url 'http://localhost:4444/servers/UUID/mcp' --auth 'Bearer <token>' --timeout 120 --log-level DEBUG 

39""" 

40 

41# Future 

42from __future__ import annotations 

43 

44# Standard 

45import argparse 

46import asyncio 

47from contextlib import suppress 

48from dataclasses import dataclass 

49import logging 

50import os 

51import signal 

52import sys 

53from typing import Any, AsyncIterator, Dict, List, Optional, Union 

54from urllib.parse import urlencode 

55 

56# Third-Party 

57import httpx 

58import orjson 

59 

60# First-Party 

61from mcpgateway.utils.retry_manager import ResilientHttpClient 

62 

63# ----------------------- 

64# Configuration Defaults 

65# ----------------------- 

66DEFAULT_CONCURRENCY = int(os.environ.get("CONCURRENCY", "10")) 

67DEFAULT_CONNECT_TIMEOUT = 15 

68DEFAULT_RESPONSE_TIMEOUT = float(os.environ.get("MCP_TOOL_CALL_TIMEOUT", "60")) 

69 

70JSONRPC_PARSE_ERROR = -32700 

71JSONRPC_INTERNAL_ERROR = -32603 

72JSONRPC_SERVER_ERROR = -32000 

73 

74CONTENT_TYPE = os.getenv("FORGE_CONTENT_TYPE", "application/json") 

75 

76# Global logger 

77logger = logging.getLogger("mcpgateway.wrapper") 

78logger.addHandler(logging.StreamHandler(sys.stderr)) 

79logger.propagate = False 

80logger.disabled = True # default: disabled 

81 

82# Shutdown flag 

83_shutdown = asyncio.Event() 

84 

85 

86def _mark_shutdown(): 

87 """Mark the shutdown flag for graceful termination. 

88 This is triggered when stdin closes, stdout fails, or a signal is caught. 

89 

90 Args: 

91 None 

92 

93 Examples: 

94 >>> _mark_shutdown() # doctest: +ELLIPSIS 

95 >>> shutting_down() 

96 True 

97 >>> # Reset for following doctests: 

98 >>> _ = _shutdown.clear() 

99 """ 

100 if not _shutdown.is_set(): 

101 _shutdown.set() 

102 

103 

104def shutting_down() -> bool: 

105 """Check whether the server is shutting down. 

106 

107 Args: 

108 None 

109 

110 Returns: 

111 bool: True if shutdown has been triggered, False otherwise. 

112 

113 Examples: 

114 >>> shutting_down() 

115 False 

116 """ 

117 return _shutdown.is_set() 

118 

119 

120# ----------------------- 

121# Utilities 

122# ----------------------- 

123def setup_logging(level: Optional[str]) -> None: 

124 """Configure logging for the wrapper. 

125 

126 Args: 

127 level: Logging level (e.g. "INFO", "DEBUG"), or OFF/None to disable. 

128 

129 Examples: 

130 >>> setup_logging("DEBUG") 

131 >>> logger.disabled 

132 False 

133 >>> setup_logging("OFF") 

134 >>> logger.disabled 

135 True 

136 """ 

137 if not level: 

138 logger.disabled = True 

139 return 

140 

141 log_level = level.strip().upper() 

142 if log_level in {"OFF", "NONE", "DISABLE", "FALSE", "0"}: 

143 logger.disabled = True 

144 return 

145 

146 logger.setLevel(getattr(logging, log_level, logging.INFO)) 

147 formatter = logging.Formatter( 

148 "%(asctime)s - %(name)s - %(levelname)s - %(message)s", 

149 datefmt="%Y-%m-%dT%H:%M:%S", 

150 ) 

151 for handler in logger.handlers: 

152 handler.setFormatter(formatter) 

153 logger.disabled = False 

154 

155 

156def convert_url(url: str) -> str: 

157 """Normalize an MCP server URL. 

158 

159 - If it ends with `/sse`, replace with `/mcp`. 

160 - If it ends with `/mcp` already, leave it. 

161 - Otherwise, append `/mcp`. 

162 

163 Args: 

164 url: The input server URL. 

165 

166 Returns: 

167 str: Normalized MCP URL. 

168 

169 Examples: 

170 >>> convert_url("http://localhost:4444/servers/uuid") 

171 'http://localhost:4444/servers/uuid/mcp/' 

172 >>> convert_url("http://localhost:4444/servers/uuid/sse") 

173 'http://localhost:4444/servers/uuid/mcp/' 

174 >>> convert_url("http://localhost:4444/servers/uuid/mcp") 

175 'http://localhost:4444/servers/uuid/mcp/' 

176 """ 

177 if url.endswith("/mcp") or url.endswith("/mcp/"): 

178 if url.endswith("/mcp"): 

179 return url + "/" 

180 return url 

181 if url.endswith("/sse"): 

182 return url.replace("/sse", "/mcp/") 

183 return url + "/mcp/" 

184 

185 

186def send_to_stdout(obj: Union[dict, str, bytes]) -> None: 

187 """Write JSON-serializable object to stdout. 

188 

189 Args: 

190 obj: Object to serialize and write. Falls back to str() if JSON fails. 

191 

192 Notes: 

193 If writing fails (e.g., broken pipe), triggers shutdown. 

194 """ 

195 try: 

196 # orjson.dumps returns bytes 

197 line = orjson.dumps(obj) 

198 except Exception: 

199 if isinstance(obj, bytes): 

200 line = obj 

201 else: 

202 line = str(obj).encode("utf-8") 

203 try: 

204 # Check if sys.stdout has buffer attribute 

205 # If not (eg. some mocks), fall back to write str 

206 if hasattr(sys.stdout, "buffer"): 

207 sys.stdout.buffer.write(line + b"\n") 

208 sys.stdout.buffer.flush() 

209 else: 

210 # Fallback for testing environments that mock sys.stdout but not buffer 

211 sys.stdout.write(line.decode("utf-8") + "\n") 

212 sys.stdout.flush() 

213 except OSError as e: 

214 logger.error("OS error: %s", e) 

215 _mark_shutdown() 

216 

217 

218def make_error(message: str, code: int = JSONRPC_INTERNAL_ERROR, data: Any = None) -> dict: 

219 """Construct a JSON-RPC error response. 

220 

221 Args: 

222 message: Error message. 

223 code: JSON-RPC error code (default -32603). 

224 data: Optional extra error data. 

225 

226 Returns: 

227 dict: JSON-RPC error object. 

228 

229 Examples: 

230 >>> make_error("Invalid input", code=-32600) 

231 {'jsonrpc': '2.0', 'id': 'bridge', 'error': {'code': -32600, 'message': 'Invalid input'}} 

232 >>> make_error("Oops", data={"info": 1})["error"]["data"] 

233 {'info': 1} 

234 """ 

235 err: dict[str, Any] = { 

236 "jsonrpc": "2.0", 

237 "id": "bridge", 

238 "error": {"code": code, "message": message}, 

239 } 

240 if data is not None: 

241 err["error"]["data"] = data 

242 return err 

243 

244 

245async def stdin_reader(queue: "asyncio.Queue[Union[dict, list, str, None]]") -> None: 

246 """Read lines from stdin and push parsed JSON into a queue. 

247 

248 Args: 

249 queue: Target asyncio.Queue where parsed messages are enqueued. 

250 

251 Notes: 

252 - On EOF, pushes None and triggers shutdown. 

253 - Invalid JSON produces a JSON-RPC error object. 

254 

255 Examples: 

256 >>> # Example pattern (not executed): asyncio.create_task(stdin_reader(q)) 

257 >>> True 

258 True 

259 """ 

260 while True: 

261 # read bytes directly if possible 

262 if hasattr(sys.stdin, "buffer"): 

263 line = await asyncio.to_thread(sys.stdin.buffer.readline) 

264 else: 

265 # Fallback 

266 line_str = await asyncio.to_thread(sys.stdin.readline) 

267 line = line_str.encode("utf-8") if line_str else b"" 

268 

269 if not line: 

270 await queue.put(None) 

271 _mark_shutdown() 

272 break 

273 

274 line = line.strip() 

275 if not line: 

276 continue 

277 try: 

278 # orjson.loads accepts bytes 

279 obj = orjson.loads(line) 

280 except Exception: 

281 # Decode for error message if needed 

282 try: 

283 line_str = line.decode("utf-8", errors="replace") 

284 except Exception: 

285 line_str = str(line) 

286 obj = make_error("Invalid JSON from stdin", JSONRPC_PARSE_ERROR, line_str) 

287 await queue.put(obj) 

288 

289 

290# ----------------------- 

291# Stream Parsers 

292# ----------------------- 

293async def ndjson_lines(resp: httpx.Response) -> AsyncIterator[bytes]: 

294 """Parse newline-delimited JSON (NDJSON) from an HTTP response. 

295 

296 Args: 

297 resp: httpx.Response with NDJSON content. 

298 

299 Yields: 

300 bytes: Individual JSON lines as bytes. 

301 

302 Examples: 

303 >>> # This function is a parser for network streams; doctest uses patterns only. 

304 >>> True 

305 True 

306 """ 

307 # read bytes directly if possible 

308 partial_line = b"" 

309 async for chunk in resp.aiter_bytes(): 

310 if shutting_down(): 

311 break 

312 if not chunk: 

313 continue 

314 

315 # Split chunk into lines, handling partial line from previous chunk 

316 lines = (partial_line + chunk).split(b"\n") 

317 

318 # The last element is always the new partial line (might be empty if chunk ended with newline) 

319 partial_line = lines.pop() 

320 

321 for line in lines: 

322 if line.strip(): 

323 yield line.strip() 

324 

325 # Process remaining partial line 

326 if partial_line.strip(): 

327 yield partial_line.strip() 

328 

329 

330async def sse_events(resp: httpx.Response) -> AsyncIterator[bytes]: 

331 """Parse Server-Sent Events (SSE) from an HTTP response. 

332 

333 Args: 

334 resp: httpx.Response with SSE content. 

335 

336 Yields: 

337 bytes: Event payload data lines (joined). 

338 """ 

339 partial_line = b"" 

340 event_lines: List[bytes] = [] 

341 

342 async for chunk in resp.aiter_bytes(): 

343 if shutting_down(): 

344 break 

345 if not chunk: 

346 continue 

347 

348 # Split chunk into lines 

349 lines = (partial_line + chunk).split(b"\n") 

350 partial_line = lines.pop() 

351 

352 for line in lines: 

353 line = line.rstrip(b"\r") 

354 if not line: 

355 if event_lines: 

356 yield b"\n".join(event_lines) 

357 event_lines = [] 

358 continue 

359 if line.startswith(b":"): 

360 continue 

361 

362 if b":" in line: 

363 field, value = line.split(b":", 1) 

364 value = value.lstrip(b" ") 

365 else: 

366 field, value = line, b"" 

367 

368 if field == b"data": 

369 event_lines.append(value) 

370 

371 # Process remaining partial line if any (though standard SSE ends with \n\n) 

372 if partial_line: 

373 line = partial_line.rstrip(b"\r") 

374 # Process the partial line same as above 

375 if line and not line.startswith(b":"): 

376 if b":" in line: 

377 field, value = line.split(b":", 1) 

378 value = value.lstrip(b" ") 

379 else: 

380 field, value = line, b"" 

381 if field == b"data": 

382 event_lines.append(value) 

383 

384 # Always yield any remaining accumulated event data 

385 if event_lines: 

386 yield b"\n".join(event_lines) 

387 

388 

389# ----------------------- 

390# Core HTTP forwarder 

391# ----------------------- 

392async def forward_once( 

393 client: ResilientHttpClient, 

394 settings: "Settings", 

395 payload: Union[str, Dict[str, Any], List[Any]], 

396) -> None: 

397 """Forward a single JSON-RPC payload to the MCP gateway and stream responses. 

398 

399 The function: 

400 - Sets content negotiation headers (JSON, NDJSON, SSE) 

401 - Adds Authorization header when configured 

402 - Streams the gateway response and forwards every JSON object to stdout 

403 (supports application/json, application/x-ndjson, and text/event-stream) 

404 

405 Args: 

406 client: Resilient HTTP client used to make the request. 

407 settings: Bridge configuration (URL, auth, timeouts). 

408 payload: JSON-RPC request payload as str/dict/list. 

409 """ 

410 if shutting_down(): 

411 return 

412 

413 headers = { 

414 "Content-Type": "application/json; charset=utf-8", 

415 "Accept": "application/json, application/x-ndjson, text/event-stream", 

416 } 

417 if settings.auth_header: 

418 headers["Authorization"] = settings.auth_header 

419 

420 # Step 1: Decide content type (manual override > auto-detect) 

421 content_type = getattr(settings, "content_type", None) or CONTENT_TYPE 

422 

423 if content_type == "application/x-www-form-urlencoded": 

424 # Always encode as form data 

425 if isinstance(payload, dict): 

426 body = urlencode(payload) 

427 else: 

428 body = str(payload) 

429 headers["Content-Type"] = "application/x-www-form-urlencoded" 

430 

431 elif content_type == "application/json": 

432 # Force JSON 

433 body = payload if isinstance(payload, str) else orjson.dumps(payload).decode() 

434 headers["Content-Type"] = "application/json; charset=utf-8" 

435 

436 else: 

437 # Auto-detect 

438 if isinstance(payload, dict) and all(isinstance(v, (str, int, float, bool, type(None))) for v in payload.values()): 

439 body = urlencode(payload) 

440 headers["Content-Type"] = "application/x-www-form-urlencoded" 

441 else: 

442 body = payload if isinstance(payload, str) else orjson.dumps(payload).decode() 

443 headers["Content-Type"] = "application/json; charset=utf-8" 

444 

445 body_bytes = body.encode("utf-8") 

446 

447 # Step 2: Send request and process response 

448 async with client.stream("POST", settings.server_url, data=body_bytes, headers=headers) as resp: 

449 ctype = (resp.headers.get("Content-Type") or "").lower() 

450 status = resp.status_code 

451 logger.debug("HTTP %s %s", status, ctype) 

452 

453 if shutting_down(): 

454 return 

455 

456 if status < 200 or status >= 300: 

457 send_to_stdout(make_error(f"HTTP {status}", code=status)) 

458 return 

459 

460 async def _process_line(line: Union[str, bytes]): 

461 """ 

462 Asynchronously processes a single line of text/bytes, expected to be a valid JSON. 

463 

464 If the system is shutting down, the function returns immediately. 

465 Otherwise, it attempts to parse the line as JSON and sends the resulting object to stdout. 

466 If parsing fails, logs a warning and sends a standardized error response to stdout. 

467 

468 Args: 

469 line (Union[str, bytes]): Valid JSON object (bytes optimized). 

470 """ 

471 if shutting_down(): 

472 return 

473 try: 

474 # orjson.loads accepts bytes or str 

475 obj = orjson.loads(line) 

476 send_to_stdout(obj) 

477 except Exception: 

478 logger.warning("Invalid JSON from server: %s", line) 

479 # Ensure line is str for error message 

480 line_str = line if isinstance(line, str) else str(line) 

481 send_to_stdout(make_error("Invalid JSON from server", JSONRPC_PARSE_ERROR, line_str)) 

482 

483 # Step 3: Handle response content types 

484 if "event-stream" in ctype: 

485 async for data_payload in sse_events(resp): 

486 if shutting_down(): 

487 break 

488 if not data_payload: 

489 continue 

490 await _process_line(data_payload) 

491 return 

492 

493 if "x-ndjson" in ctype or "ndjson" in ctype: 

494 async for line in ndjson_lines(resp): 

495 if shutting_down(): 

496 break 

497 await _process_line(line) 

498 return 

499 

500 if "application/json" in ctype: 

501 raw = await resp.aread() 

502 if not shutting_down(): 

503 # raw is bytes 

504 try: 

505 send_to_stdout(orjson.loads(raw)) 

506 except Exception: 

507 send_to_stdout(make_error("Invalid JSON response", JSONRPC_PARSE_ERROR, raw.decode("utf-8", "replace"))) 

508 return 

509 

510 # Fallback: try parsing as NDJSON 

511 async for line in ndjson_lines(resp): 

512 if shutting_down(): 

513 break 

514 await _process_line(line) 

515 

516 

517async def make_request( 

518 client: ResilientHttpClient, 

519 settings: "Settings", 

520 payload: Union[str, Dict[str, Any], List[Any]], 

521 *, 

522 max_retries: int = 5, 

523 base_delay: float = 0.25, 

524) -> None: 

525 """Make a gateway request with retry/backoff around a single forward attempt. 

526 

527 Args: 

528 client: Resilient HTTP client used to make the request. 

529 settings: Bridge configuration (URL, auth, timeouts). 

530 payload: JSON-RPC request payload as str/dict/list. 

531 max_retries: Maximum retry attempts upon exceptions (default 5). 

532 base_delay: Base delay in seconds for exponential backoff (default 0.25). 

533 """ 

534 attempt = 0 

535 while not shutting_down(): 

536 try: 

537 await forward_once(client, settings, payload) 

538 return 

539 except Exception as e: 

540 if shutting_down(): 

541 return 

542 logger.warning("Network or unexpected error in forward_once: %s", e) 

543 attempt += 1 

544 if attempt > max_retries: 

545 send_to_stdout(make_error("max retries exceeded", JSONRPC_SERVER_ERROR)) 

546 return 

547 delay = min(base_delay * (2 ** (attempt - 1)), 8.0) 

548 await asyncio.sleep(delay) 

549 

550 

551# ----------------------- 

552# Main loop & CLI 

553# ----------------------- 

554@dataclass 

555class Settings: 

556 """Bridge configuration settings. 

557 

558 Args: 

559 server_url: MCP server URL 

560 auth_header: Authorization header (optional) 

561 connect_timeout: HTTP connect timeout in seconds 

562 response_timeout: Max response wait in seconds 

563 concurrency: Max concurrent tool calls 

564 log_level: Logging verbosity 

565 

566 Examples: 

567 >>> s = Settings("http://x/mcp", "Bearer token", 5, 10, 2, "DEBUG") 

568 >>> s.server_url 

569 'http://x/mcp' 

570 >>> s.concurrency 

571 2 

572 """ 

573 

574 server_url: str 

575 auth_header: Optional[str] 

576 connect_timeout: float = DEFAULT_CONNECT_TIMEOUT 

577 response_timeout: float = DEFAULT_RESPONSE_TIMEOUT 

578 concurrency: int = DEFAULT_CONCURRENCY 

579 log_level: Optional[str] = None 

580 

581 

582async def main_async(settings: Settings) -> None: 

583 """Main async loop: reads stdin JSON lines and forwards them to the gateway. 

584 

585 - Spawns a reader task that pushes parsed lines to a queue. 

586 - Uses a semaphore to cap concurrent requests. 

587 - Creates tasks to forward each queued payload. 

588 - Gracefully shuts down on EOF or signals. 

589 

590 Args: 

591 settings: Bridge configuration settings. 

592 

593 Examples: 

594 >>> # Smoke-test structure only; no network or stdin in doctest. 

595 >>> settings = Settings("http://x/mcp", None) 

596 >>> async def _run_once(): 

597 ... q = asyncio.Queue() 

598 ... # Immediately signal shutdown by marking the queue end: 

599 ... await q.put(None) 

600 ... _mark_shutdown() 

601 ... # Minimal run: create then cancel tasks cleanly. 

602 ... await asyncio.sleep(0) 

603 >>> # Note: We avoid running main_async here due to stdin/network. 

604 >>> True 

605 True 

606 """ 

607 queue: "asyncio.Queue[Union[dict, list, str, None]]" = asyncio.Queue() 

608 reader_task = asyncio.create_task(stdin_reader(queue)) 

609 

610 sem = asyncio.Semaphore(settings.concurrency) 

611 

612 httpx_timeout = httpx.Timeout( 

613 connect=settings.connect_timeout, 

614 read=settings.response_timeout, 

615 write=settings.response_timeout, 

616 pool=settings.response_timeout, 

617 ) 

618 

619 # Get SSL verify setting from global config (with fallback for standalone usage) 

620 try: 

621 # First-Party 

622 from mcpgateway.config import settings as global_settings # pylint: disable=import-outside-toplevel 

623 

624 ssl_verify = not global_settings.skip_ssl_verify 

625 except ImportError: 

626 ssl_verify = True # Default to verifying SSL when config unavailable 

627 

628 client_args = {"timeout": httpx_timeout, "http2": True, "verify": ssl_verify} 

629 resilient = ResilientHttpClient( 

630 max_retries=5, 

631 base_backoff=0.25, 

632 max_delay=8.0, 

633 jitter_max=0.25, 

634 client_args=client_args, 

635 ) 

636 

637 tasks: set[asyncio.Task[None]] = set() 

638 try: 

639 while not shutting_down(): 

640 item = await queue.get() 

641 if item is None: 

642 break 

643 

644 async def _worker(payload=item): 

645 """ 

646 Executes an asynchronous request with concurrency control. 

647 

648 Acquires a semaphore to limit the number of concurrent executions. 

649 If the system is not shutting down, sends the given payload using `make_request`. 

650 

651 Args: 

652 payload (Any): The data to be sent in the request. Defaults to `item`. 

653 """ 

654 async with sem: 

655 if not shutting_down(): 

656 await make_request(resilient, settings, payload) 

657 

658 t = asyncio.create_task(_worker()) 

659 tasks.add(t) 

660 t.add_done_callback(lambda fut, s=tasks: s.discard(fut)) 

661 

662 _mark_shutdown() 

663 for t in list(tasks): 

664 t.cancel() 

665 if tasks: 

666 with suppress(asyncio.CancelledError): 

667 await asyncio.gather(*tasks) 

668 finally: 

669 reader_task.cancel() 

670 with suppress(Exception): 

671 await reader_task 

672 with suppress(Exception): 

673 await resilient.aclose() 

674 

675 

676def parse_args() -> Settings: 

677 """Parse CLI arguments and environment variables into Settings. 

678 

679 Recognized flags: 

680 --url / MCP_SERVER_URL 

681 --auth / MCP_AUTH 

682 --timeout / MCP_TOOL_CALL_TIMEOUT 

683 --log-level / MCP_WRAPPER_LOG_LEVEL 

684 

685 Returns: 

686 Settings: Parsed and normalized configuration. 

687 

688 Examples: 

689 >>> import sys, os 

690 >>> _argv = sys.argv 

691 >>> sys.argv = ["prog", "--url", "http://localhost:4444/servers/u"] 

692 >>> try: 

693 ... s = parse_args() 

694 ... s.server_url.endswith("/mcp/") 

695 ... finally: 

696 ... sys.argv = _argv 

697 True 

698 """ 

699 parser = argparse.ArgumentParser(description="Stdio MCP Client <-> MCP HTTP Bridge") 

700 parser.add_argument("--url", default=os.environ.get("MCP_SERVER_URL"), help="MCP server URL (env: MCP_SERVER_URL)") 

701 parser.add_argument("--auth", default=os.environ.get("MCP_AUTH"), help="Authorization header value (env: MCP_AUTH)") 

702 parser.add_argument("--timeout", default=os.environ.get("MCP_TOOL_CALL_TIMEOUT"), help="Response timeout in seconds") 

703 parser.add_argument( 

704 "--log-level", 

705 default=os.environ.get("MCP_WRAPPER_LOG_LEVEL", "INFO"), 

706 help="Enable logging at this level (case-insensitive, default: disabled)", 

707 ) 

708 args = parser.parse_args() 

709 

710 if not args.url: 

711 print("Error: MCP server URL must be provided via --url or MCP_SERVER_URL", file=sys.stderr) 

712 sys.exit(2) 

713 

714 server_url = convert_url(args.url) 

715 response_timeout = float(args.timeout) if args.timeout else DEFAULT_RESPONSE_TIMEOUT 

716 

717 return Settings( 

718 server_url=server_url, 

719 auth_header=args.auth, 

720 connect_timeout=DEFAULT_CONNECT_TIMEOUT, 

721 response_timeout=response_timeout, 

722 log_level=args.log_level, 

723 concurrency=DEFAULT_CONCURRENCY, 

724 ) 

725 

726 

727def _install_signal_handlers(loop: asyncio.AbstractEventLoop) -> None: 

728 """Install SIGINT/SIGTERM handlers that trigger graceful shutdown. 

729 

730 Args: 

731 loop: The asyncio event loop to attach handlers to. 

732 

733 Examples: 

734 >>> import asyncio 

735 >>> loop = asyncio.new_event_loop() 

736 >>> _install_signal_handlers(loop) # doctest: +ELLIPSIS 

737 >>> loop.close() 

738 """ 

739 for sig in (getattr(signal, "SIGINT", None), getattr(signal, "SIGTERM", None)): 

740 if sig is None: 

741 continue 

742 with suppress(NotImplementedError): 

743 loop.add_signal_handler(sig, _mark_shutdown) 

744 

745 

746def main() -> None: 

747 """Entry point for the MCP stdio wrapper. 

748 

749 - Parses args/env vars into Settings 

750 - Configures logging 

751 - Runs the async main loop with signal handling 

752 

753 Args: 

754 None 

755 """ 

756 settings = parse_args() 

757 setup_logging(settings.log_level) 

758 if not logger.disabled: 

759 logger.info("Starting MCP stdio wrapper -> %s", settings.server_url) 

760 

761 loop = asyncio.new_event_loop() 

762 asyncio.set_event_loop(loop) 

763 _install_signal_handlers(loop) 

764 

765 try: 

766 loop.run_until_complete(main_async(settings)) 

767 finally: 

768 loop.run_until_complete(asyncio.sleep(0)) 

769 with suppress(Exception): 

770 loop.close() 

771 if not logger.disabled: 

772 logger.info("Shutdown complete.") 

773 

774 

775if __name__ == "__main__": 

776 main()