Coverage for mcpgateway / wrapper.py: 100%

307 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/wrapper.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Keval Mahajan 

6 

7MCP Gateway Wrapper. 

8MCP Client (stdio) <-> MCP Gateway Bridge 

9 

10This module implements a wrapper stdio bridge that facilitates 

11interaction between the MCP client and the MCP gateway. 

12It provides several functionalities, including listing tools, 

13invoking tools, managing resources, retrieving prompts, 

14and handling tool calls via the MCP gateway. 

15 

16- All JSON-RPC traffic is written to stdout. 

17- All logs/diagnostics are written to stderr, ensuring clean separation. 

18 

19Environment Variables 

20--------------------- 

21- **MCP_SERVER_URL** (or `--url`): Gateway MCP endpoint URL. 

22- **MCP_AUTH** (or `--auth`): Authorization header value. 

23- **MCP_TOOL_CALL_TIMEOUT** (or `--timeout`): Response timeout in seconds (default: 60). 

24- **MCP_WRAPPER_LOG_LEVEL** (or `--log-level`): Logging level, or OFF to disable. 

25- **CONCURRENCY**: Max concurrent tool calls (default: 10). 

26 

27Example usage: 

28-------------- 

29 

30Method 1: Using environment variables 

31 $ export MCP_SERVER_URL='http://localhost:4444/servers/UUID/mcp' 

32 $ export MCP_AUTH='Bearer <token>' 

33 $ export MCP_TOOL_CALL_TIMEOUT=120 

34 $ export MCP_WRAPPER_LOG_LEVEL=DEBUG 

35 $ python3 -m mcpgateway.wrapper 

36 

37Method 2: Using command-line arguments 

38 $ python3 -m mcpgateway.wrapper --url 'http://localhost:4444/servers/UUID/mcp' --auth 'Bearer <token>' --timeout 120 --log-level DEBUG 

39""" 

40 

41# Future 

42from __future__ import annotations 

43 

44# Standard 

45import argparse 

46import asyncio 

47from contextlib import suppress 

48from dataclasses import dataclass 

49import errno 

50import logging 

51import os 

52import signal 

53import sys 

54from typing import Any, AsyncIterator, Dict, List, Optional, Union 

55from urllib.parse import urlencode 

56 

57# Third-Party 

58import httpx 

59import orjson 

60 

61# First-Party 

62from mcpgateway.utils.retry_manager import ResilientHttpClient 

63 

64# ----------------------- 

65# Configuration Defaults 

66# ----------------------- 

67DEFAULT_CONCURRENCY = int(os.environ.get("CONCURRENCY", "10")) 

68DEFAULT_CONNECT_TIMEOUT = 15 

69DEFAULT_RESPONSE_TIMEOUT = float(os.environ.get("MCP_TOOL_CALL_TIMEOUT", "60")) 

70 

71JSONRPC_PARSE_ERROR = -32700 

72JSONRPC_INTERNAL_ERROR = -32603 

73JSONRPC_SERVER_ERROR = -32000 

74 

75CONTENT_TYPE = os.getenv("FORGE_CONTENT_TYPE", "application/json") 

76 

77# Global logger 

78logger = logging.getLogger("mcpgateway.wrapper") 

79logger.addHandler(logging.StreamHandler(sys.stderr)) 

80logger.propagate = False 

81logger.disabled = True # default: disabled 

82 

83# Shutdown flag 

84_shutdown = asyncio.Event() 

85 

86 

87def _mark_shutdown(): 

88 """Mark the shutdown flag for graceful termination. 

89 This is triggered when stdin closes, stdout fails, or a signal is caught. 

90 

91 Args: 

92 None 

93 

94 Examples: 

95 >>> _mark_shutdown() # doctest: +ELLIPSIS 

96 >>> shutting_down() 

97 True 

98 >>> # Reset for following doctests: 

99 >>> _ = _shutdown.clear() 

100 """ 

101 if not _shutdown.is_set(): 

102 _shutdown.set() 

103 

104 

105def shutting_down() -> bool: 

106 """Check whether the server is shutting down. 

107 

108 Args: 

109 None 

110 

111 Returns: 

112 bool: True if shutdown has been triggered, False otherwise. 

113 

114 Examples: 

115 >>> shutting_down() 

116 False 

117 """ 

118 return _shutdown.is_set() 

119 

120 

121# ----------------------- 

122# Utilities 

123# ----------------------- 

124def setup_logging(level: Optional[str]) -> None: 

125 """Configure logging for the wrapper. 

126 

127 Args: 

128 level: Logging level (e.g. "INFO", "DEBUG"), or OFF/None to disable. 

129 

130 Examples: 

131 >>> setup_logging("DEBUG") 

132 >>> logger.disabled 

133 False 

134 >>> setup_logging("OFF") 

135 >>> logger.disabled 

136 True 

137 """ 

138 if not level: 

139 logger.disabled = True 

140 return 

141 

142 log_level = level.strip().upper() 

143 if log_level in {"OFF", "NONE", "DISABLE", "FALSE", "0"}: 

144 logger.disabled = True 

145 return 

146 

147 logger.setLevel(getattr(logging, log_level, logging.INFO)) 

148 formatter = logging.Formatter( 

149 "%(asctime)s - %(name)s - %(levelname)s - %(message)s", 

150 datefmt="%Y-%m-%dT%H:%M:%S", 

151 ) 

152 for handler in logger.handlers: 

153 handler.setFormatter(formatter) 

154 logger.disabled = False 

155 

156 

157def convert_url(url: str) -> str: 

158 """Normalize an MCP server URL. 

159 

160 - If it ends with `/sse`, replace with `/mcp`. 

161 - If it ends with `/mcp` already, leave it. 

162 - Otherwise, append `/mcp`. 

163 

164 Args: 

165 url: The input server URL. 

166 

167 Returns: 

168 str: Normalized MCP URL. 

169 

170 Examples: 

171 >>> convert_url("http://localhost:4444/servers/uuid") 

172 'http://localhost:4444/servers/uuid/mcp/' 

173 >>> convert_url("http://localhost:4444/servers/uuid/sse") 

174 'http://localhost:4444/servers/uuid/mcp/' 

175 >>> convert_url("http://localhost:4444/servers/uuid/mcp") 

176 'http://localhost:4444/servers/uuid/mcp/' 

177 """ 

178 if url.endswith("/mcp") or url.endswith("/mcp/"): 

179 if url.endswith("/mcp"): 

180 return url + "/" 

181 return url 

182 if url.endswith("/sse"): 

183 return url.replace("/sse", "/mcp/") 

184 return url + "/mcp/" 

185 

186 

187def send_to_stdout(obj: Union[dict, str, bytes]) -> None: 

188 """Write JSON-serializable object to stdout. 

189 

190 Args: 

191 obj: Object to serialize and write. Falls back to str() if JSON fails. 

192 

193 Notes: 

194 If writing fails (e.g., broken pipe), triggers shutdown. 

195 """ 

196 try: 

197 # orjson.dumps returns bytes 

198 line = orjson.dumps(obj) 

199 except Exception: 

200 if isinstance(obj, bytes): 

201 line = obj 

202 else: 

203 line = str(obj).encode("utf-8") 

204 try: 

205 # Check if sys.stdout has buffer attribute 

206 # If not (eg. some mocks), fall back to write str 

207 if hasattr(sys.stdout, "buffer"): 

208 sys.stdout.buffer.write(line + b"\n") 

209 sys.stdout.buffer.flush() 

210 else: 

211 # Fallback for testing environments that mock sys.stdout but not buffer 

212 sys.stdout.write(line.decode("utf-8") + "\n") 

213 sys.stdout.flush() 

214 except OSError as e: 

215 if e.errno in (errno.EPIPE, errno.EINVAL): 

216 _mark_shutdown() 

217 else: 

218 _mark_shutdown() 

219 

220 

221def make_error(message: str, code: int = JSONRPC_INTERNAL_ERROR, data: Any = None) -> dict: 

222 """Construct a JSON-RPC error response. 

223 

224 Args: 

225 message: Error message. 

226 code: JSON-RPC error code (default -32603). 

227 data: Optional extra error data. 

228 

229 Returns: 

230 dict: JSON-RPC error object. 

231 

232 Examples: 

233 >>> make_error("Invalid input", code=-32600) 

234 {'jsonrpc': '2.0', 'id': 'bridge', 'error': {'code': -32600, 'message': 'Invalid input'}} 

235 >>> make_error("Oops", data={"info": 1})["error"]["data"] 

236 {'info': 1} 

237 """ 

238 err: dict[str, Any] = { 

239 "jsonrpc": "2.0", 

240 "id": "bridge", 

241 "error": {"code": code, "message": message}, 

242 } 

243 if data is not None: 

244 err["error"]["data"] = data 

245 return err 

246 

247 

248async def stdin_reader(queue: "asyncio.Queue[Union[dict, list, str, None]]") -> None: 

249 """Read lines from stdin and push parsed JSON into a queue. 

250 

251 Args: 

252 queue: Target asyncio.Queue where parsed messages are enqueued. 

253 

254 Notes: 

255 - On EOF, pushes None and triggers shutdown. 

256 - Invalid JSON produces a JSON-RPC error object. 

257 

258 Examples: 

259 >>> # Example pattern (not executed): asyncio.create_task(stdin_reader(q)) 

260 >>> True 

261 True 

262 """ 

263 while True: 

264 # read bytes directly if possible 

265 if hasattr(sys.stdin, "buffer"): 

266 line = await asyncio.to_thread(sys.stdin.buffer.readline) 

267 else: 

268 # Fallback 

269 line_str = await asyncio.to_thread(sys.stdin.readline) 

270 line = line_str.encode("utf-8") if line_str else b"" 

271 

272 if not line: 

273 await queue.put(None) 

274 _mark_shutdown() 

275 break 

276 

277 line = line.strip() 

278 if not line: 

279 continue 

280 try: 

281 # orjson.loads accepts bytes 

282 obj = orjson.loads(line) 

283 except Exception: 

284 # Decode for error message if needed 

285 try: 

286 line_str = line.decode("utf-8", errors="replace") 

287 except Exception: 

288 line_str = str(line) 

289 obj = make_error("Invalid JSON from stdin", JSONRPC_PARSE_ERROR, line_str) 

290 await queue.put(obj) 

291 

292 

293# ----------------------- 

294# Stream Parsers 

295# ----------------------- 

296async def ndjson_lines(resp: httpx.Response) -> AsyncIterator[bytes]: 

297 """Parse newline-delimited JSON (NDJSON) from an HTTP response. 

298 

299 Args: 

300 resp: httpx.Response with NDJSON content. 

301 

302 Yields: 

303 bytes: Individual JSON lines as bytes. 

304 

305 Examples: 

306 >>> # This function is a parser for network streams; doctest uses patterns only. 

307 >>> True 

308 True 

309 """ 

310 # read bytes directly if possible 

311 partial_line = b"" 

312 async for chunk in resp.aiter_bytes(): 

313 if shutting_down(): 

314 break 

315 if not chunk: 

316 continue 

317 

318 # Split chunk into lines, handling partial line from previous chunk 

319 lines = (partial_line + chunk).split(b"\n") 

320 

321 # The last element is always the new partial line (might be empty if chunk ended with newline) 

322 partial_line = lines.pop() 

323 

324 for line in lines: 

325 if line.strip(): 

326 yield line.strip() 

327 

328 # Process remaining partial line 

329 if partial_line.strip(): 

330 yield partial_line.strip() 

331 

332 

333async def sse_events(resp: httpx.Response) -> AsyncIterator[bytes]: 

334 """Parse Server-Sent Events (SSE) from an HTTP response. 

335 

336 Args: 

337 resp: httpx.Response with SSE content. 

338 

339 Yields: 

340 bytes: Event payload data lines (joined). 

341 """ 

342 partial_line = b"" 

343 event_lines: List[bytes] = [] 

344 

345 async for chunk in resp.aiter_bytes(): 

346 if shutting_down(): 

347 break 

348 if not chunk: 

349 continue 

350 

351 # Split chunk into lines 

352 lines = (partial_line + chunk).split(b"\n") 

353 partial_line = lines.pop() 

354 

355 for line in lines: 

356 line = line.rstrip(b"\r") 

357 if not line: 

358 if event_lines: 

359 yield b"\n".join(event_lines) 

360 event_lines = [] 

361 continue 

362 if line.startswith(b":"): 

363 continue 

364 

365 if b":" in line: 

366 field, value = line.split(b":", 1) 

367 value = value.lstrip(b" ") 

368 else: 

369 field, value = line, b"" 

370 

371 if field == b"data": 

372 event_lines.append(value) 

373 

374 # Process remaining partial line if any (though standard SSE ends with \n\n) 

375 if partial_line: 

376 line = partial_line.rstrip(b"\r") 

377 # Process the partial line same as above 

378 if line and not line.startswith(b":"): 

379 if b":" in line: 

380 field, value = line.split(b":", 1) 

381 value = value.lstrip(b" ") 

382 else: 

383 field, value = line, b"" 

384 if field == b"data": 

385 event_lines.append(value) 

386 

387 # Always yield any remaining accumulated event data 

388 if event_lines: 

389 yield b"\n".join(event_lines) 

390 

391 

392# ----------------------- 

393# Core HTTP forwarder 

394# ----------------------- 

395async def forward_once( 

396 client: ResilientHttpClient, 

397 settings: "Settings", 

398 payload: Union[str, Dict[str, Any], List[Any]], 

399) -> None: 

400 """Forward a single JSON-RPC payload to the MCP gateway and stream responses. 

401 

402 The function: 

403 - Sets content negotiation headers (JSON, NDJSON, SSE) 

404 - Adds Authorization header when configured 

405 - Streams the gateway response and forwards every JSON object to stdout 

406 (supports application/json, application/x-ndjson, and text/event-stream) 

407 

408 Args: 

409 client: Resilient HTTP client used to make the request. 

410 settings: Bridge configuration (URL, auth, timeouts). 

411 payload: JSON-RPC request payload as str/dict/list. 

412 """ 

413 if shutting_down(): 

414 return 

415 

416 headers = { 

417 "Content-Type": "application/json; charset=utf-8", 

418 "Accept": "application/json, application/x-ndjson, text/event-stream", 

419 } 

420 if settings.auth_header: 

421 headers["Authorization"] = settings.auth_header 

422 

423 # Step 1: Decide content type (manual override > auto-detect) 

424 content_type = getattr(settings, "content_type", None) or CONTENT_TYPE 

425 

426 if content_type == "application/x-www-form-urlencoded": 

427 # Always encode as form data 

428 if isinstance(payload, dict): 

429 body = urlencode(payload) 

430 else: 

431 body = str(payload) 

432 headers["Content-Type"] = "application/x-www-form-urlencoded" 

433 

434 elif content_type == "application/json": 

435 # Force JSON 

436 body = payload if isinstance(payload, str) else orjson.dumps(payload).decode() 

437 headers["Content-Type"] = "application/json; charset=utf-8" 

438 

439 else: 

440 # Auto-detect 

441 if isinstance(payload, dict) and all(isinstance(v, (str, int, float, bool, type(None))) for v in payload.values()): 

442 body = urlencode(payload) 

443 headers["Content-Type"] = "application/x-www-form-urlencoded" 

444 else: 

445 body = payload if isinstance(payload, str) else orjson.dumps(payload).decode() 

446 headers["Content-Type"] = "application/json; charset=utf-8" 

447 

448 body_bytes = body.encode("utf-8") 

449 

450 # Step 2: Send request and process response 

451 async with client.stream("POST", settings.server_url, data=body_bytes, headers=headers) as resp: 

452 ctype = (resp.headers.get("Content-Type") or "").lower() 

453 status = resp.status_code 

454 logger.debug("HTTP %s %s", status, ctype) 

455 

456 if shutting_down(): 

457 return 

458 

459 if status < 200 or status >= 300: 

460 send_to_stdout(make_error(f"HTTP {status}", code=status)) 

461 return 

462 

463 async def _process_line(line: Union[str, bytes]): 

464 """ 

465 Asynchronously processes a single line of text/bytes, expected to be a valid JSON. 

466 

467 If the system is shutting down, the function returns immediately. 

468 Otherwise, it attempts to parse the line as JSON and sends the resulting object to stdout. 

469 If parsing fails, logs a warning and sends a standardized error response to stdout. 

470 

471 Args: 

472 line (Union[str, bytes]): Valid JSON object (bytes optimized). 

473 """ 

474 if shutting_down(): 

475 return 

476 try: 

477 # orjson.loads accepts bytes or str 

478 obj = orjson.loads(line) 

479 send_to_stdout(obj) 

480 except Exception: 

481 logger.warning("Invalid JSON from server: %s", line) 

482 # Ensure line is str for error message 

483 line_str = line if isinstance(line, str) else str(line) 

484 send_to_stdout(make_error("Invalid JSON from server", JSONRPC_PARSE_ERROR, line_str)) 

485 

486 # Step 3: Handle response content types 

487 if "event-stream" in ctype: 

488 async for data_payload in sse_events(resp): 

489 if shutting_down(): 

490 break 

491 if not data_payload: 

492 continue 

493 await _process_line(data_payload) 

494 return 

495 

496 if "x-ndjson" in ctype or "ndjson" in ctype: 

497 async for line in ndjson_lines(resp): 

498 if shutting_down(): 

499 break 

500 await _process_line(line) 

501 return 

502 

503 if "application/json" in ctype: 

504 raw = await resp.aread() 

505 if not shutting_down(): 

506 # raw is bytes 

507 try: 

508 send_to_stdout(orjson.loads(raw)) 

509 except Exception: 

510 send_to_stdout(make_error("Invalid JSON response", JSONRPC_PARSE_ERROR, raw.decode("utf-8", "replace"))) 

511 return 

512 

513 # Fallback: try parsing as NDJSON 

514 async for line in ndjson_lines(resp): 

515 if shutting_down(): 

516 break 

517 await _process_line(line) 

518 

519 

520async def make_request( 

521 client: ResilientHttpClient, 

522 settings: "Settings", 

523 payload: Union[str, Dict[str, Any], List[Any]], 

524 *, 

525 max_retries: int = 5, 

526 base_delay: float = 0.25, 

527) -> None: 

528 """Make a gateway request with retry/backoff around a single forward attempt. 

529 

530 Args: 

531 client: Resilient HTTP client used to make the request. 

532 settings: Bridge configuration (URL, auth, timeouts). 

533 payload: JSON-RPC request payload as str/dict/list. 

534 max_retries: Maximum retry attempts upon exceptions (default 5). 

535 base_delay: Base delay in seconds for exponential backoff (default 0.25). 

536 """ 

537 attempt = 0 

538 while not shutting_down(): 

539 try: 

540 await forward_once(client, settings, payload) 

541 return 

542 except Exception as e: 

543 if shutting_down(): 

544 return 

545 logger.warning("Network or unexpected error in forward_once: %s", e) 

546 attempt += 1 

547 if attempt > max_retries: 

548 send_to_stdout(make_error("max retries exceeded", JSONRPC_SERVER_ERROR)) 

549 return 

550 delay = min(base_delay * (2 ** (attempt - 1)), 8.0) 

551 await asyncio.sleep(delay) 

552 

553 

554# ----------------------- 

555# Main loop & CLI 

556# ----------------------- 

557@dataclass 

558class Settings: 

559 """Bridge configuration settings. 

560 

561 Args: 

562 server_url: MCP server URL 

563 auth_header: Authorization header (optional) 

564 connect_timeout: HTTP connect timeout in seconds 

565 response_timeout: Max response wait in seconds 

566 concurrency: Max concurrent tool calls 

567 log_level: Logging verbosity 

568 

569 Examples: 

570 >>> s = Settings("http://x/mcp", "Bearer token", 5, 10, 2, "DEBUG") 

571 >>> s.server_url 

572 'http://x/mcp' 

573 >>> s.concurrency 

574 2 

575 """ 

576 

577 server_url: str 

578 auth_header: Optional[str] 

579 connect_timeout: float = DEFAULT_CONNECT_TIMEOUT 

580 response_timeout: float = DEFAULT_RESPONSE_TIMEOUT 

581 concurrency: int = DEFAULT_CONCURRENCY 

582 log_level: Optional[str] = None 

583 

584 

585async def main_async(settings: Settings) -> None: 

586 """Main async loop: reads stdin JSON lines and forwards them to the gateway. 

587 

588 - Spawns a reader task that pushes parsed lines to a queue. 

589 - Uses a semaphore to cap concurrent requests. 

590 - Creates tasks to forward each queued payload. 

591 - Gracefully shuts down on EOF or signals. 

592 

593 Args: 

594 settings: Bridge configuration settings. 

595 

596 Examples: 

597 >>> # Smoke-test structure only; no network or stdin in doctest. 

598 >>> settings = Settings("http://x/mcp", None) 

599 >>> async def _run_once(): 

600 ... q = asyncio.Queue() 

601 ... # Immediately signal shutdown by marking the queue end: 

602 ... await q.put(None) 

603 ... _mark_shutdown() 

604 ... # Minimal run: create then cancel tasks cleanly. 

605 ... await asyncio.sleep(0) 

606 >>> # Note: We avoid running main_async here due to stdin/network. 

607 >>> True 

608 True 

609 """ 

610 queue: "asyncio.Queue[Union[dict, list, str, None]]" = asyncio.Queue() 

611 reader_task = asyncio.create_task(stdin_reader(queue)) 

612 

613 sem = asyncio.Semaphore(settings.concurrency) 

614 

615 httpx_timeout = httpx.Timeout( 

616 connect=settings.connect_timeout, 

617 read=settings.response_timeout, 

618 write=settings.response_timeout, 

619 pool=settings.response_timeout, 

620 ) 

621 

622 # Get SSL verify setting from global config (with fallback for standalone usage) 

623 try: 

624 # First-Party 

625 from mcpgateway.config import settings as global_settings # pylint: disable=import-outside-toplevel 

626 

627 ssl_verify = not global_settings.skip_ssl_verify 

628 except ImportError: 

629 ssl_verify = True # Default to verifying SSL when config unavailable 

630 

631 client_args = {"timeout": httpx_timeout, "http2": True, "verify": ssl_verify} 

632 resilient = ResilientHttpClient( 

633 max_retries=5, 

634 base_backoff=0.25, 

635 max_delay=8.0, 

636 jitter_max=0.25, 

637 client_args=client_args, 

638 ) 

639 

640 tasks: set[asyncio.Task[None]] = set() 

641 try: 

642 while not shutting_down(): 

643 item = await queue.get() 

644 if item is None: 

645 break 

646 

647 async def _worker(payload=item): 

648 """ 

649 Executes an asynchronous request with concurrency control. 

650 

651 Acquires a semaphore to limit the number of concurrent executions. 

652 If the system is not shutting down, sends the given payload using `make_request`. 

653 

654 Args: 

655 payload (Any): The data to be sent in the request. Defaults to `item`. 

656 """ 

657 async with sem: 

658 if not shutting_down(): 

659 await make_request(resilient, settings, payload) 

660 

661 t = asyncio.create_task(_worker()) 

662 tasks.add(t) 

663 t.add_done_callback(lambda fut, s=tasks: s.discard(fut)) 

664 

665 _mark_shutdown() 

666 for t in list(tasks): 

667 t.cancel() 

668 if tasks: 

669 with suppress(asyncio.CancelledError): 

670 await asyncio.gather(*tasks) 

671 finally: 

672 reader_task.cancel() 

673 with suppress(Exception): 

674 await reader_task 

675 with suppress(Exception): 

676 await resilient.aclose() 

677 

678 

679def parse_args() -> Settings: 

680 """Parse CLI arguments and environment variables into Settings. 

681 

682 Recognized flags: 

683 --url / MCP_SERVER_URL 

684 --auth / MCP_AUTH 

685 --timeout / MCP_TOOL_CALL_TIMEOUT 

686 --log-level / MCP_WRAPPER_LOG_LEVEL 

687 

688 Returns: 

689 Settings: Parsed and normalized configuration. 

690 

691 Examples: 

692 >>> import sys, os 

693 >>> _argv = sys.argv 

694 >>> sys.argv = ["prog", "--url", "http://localhost:4444/servers/u"] 

695 >>> try: 

696 ... s = parse_args() 

697 ... s.server_url.endswith("/mcp/") 

698 ... finally: 

699 ... sys.argv = _argv 

700 True 

701 """ 

702 parser = argparse.ArgumentParser(description="Stdio MCP Client <-> MCP HTTP Bridge") 

703 parser.add_argument("--url", default=os.environ.get("MCP_SERVER_URL"), help="MCP server URL (env: MCP_SERVER_URL)") 

704 parser.add_argument("--auth", default=os.environ.get("MCP_AUTH"), help="Authorization header value (env: MCP_AUTH)") 

705 parser.add_argument("--timeout", default=os.environ.get("MCP_TOOL_CALL_TIMEOUT"), help="Response timeout in seconds") 

706 parser.add_argument( 

707 "--log-level", 

708 default=os.environ.get("MCP_WRAPPER_LOG_LEVEL", "INFO"), 

709 help="Enable logging at this level (case-insensitive, default: disabled)", 

710 ) 

711 args = parser.parse_args() 

712 

713 if not args.url: 

714 print("Error: MCP server URL must be provided via --url or MCP_SERVER_URL", file=sys.stderr) 

715 sys.exit(2) 

716 

717 server_url = convert_url(args.url) 

718 response_timeout = float(args.timeout) if args.timeout else DEFAULT_RESPONSE_TIMEOUT 

719 

720 return Settings( 

721 server_url=server_url, 

722 auth_header=args.auth, 

723 connect_timeout=DEFAULT_CONNECT_TIMEOUT, 

724 response_timeout=response_timeout, 

725 log_level=args.log_level, 

726 concurrency=DEFAULT_CONCURRENCY, 

727 ) 

728 

729 

730def _install_signal_handlers(loop: asyncio.AbstractEventLoop) -> None: 

731 """Install SIGINT/SIGTERM handlers that trigger graceful shutdown. 

732 

733 Args: 

734 loop: The asyncio event loop to attach handlers to. 

735 

736 Examples: 

737 >>> import asyncio 

738 >>> loop = asyncio.new_event_loop() 

739 >>> _install_signal_handlers(loop) # doctest: +ELLIPSIS 

740 >>> loop.close() 

741 """ 

742 for sig in (getattr(signal, "SIGINT", None), getattr(signal, "SIGTERM", None)): 

743 if sig is None: 

744 continue 

745 with suppress(NotImplementedError): 

746 loop.add_signal_handler(sig, _mark_shutdown) 

747 

748 

749def main() -> None: 

750 """Entry point for the MCP stdio wrapper. 

751 

752 - Parses args/env vars into Settings 

753 - Configures logging 

754 - Runs the async main loop with signal handling 

755 

756 Args: 

757 None 

758 """ 

759 settings = parse_args() 

760 setup_logging(settings.log_level) 

761 if not logger.disabled: 

762 logger.info("Starting MCP stdio wrapper -> %s", settings.server_url) 

763 

764 loop = asyncio.new_event_loop() 

765 asyncio.set_event_loop(loop) 

766 _install_signal_handlers(loop) 

767 

768 try: 

769 loop.run_until_complete(main_async(settings)) 

770 finally: 

771 loop.run_until_complete(asyncio.sleep(0)) 

772 with suppress(Exception): 

773 loop.close() 

774 if not logger.disabled: 

775 logger.info("Shutdown complete.") 

776 

777 

778if __name__ == "__main__": 

779 main()