Coverage for mcpgateway / transports / stdio_transport.py: 100%
56 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/transports/stdio_transport.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7stdio Transport Implementation.
8This module implements standard input/output (stdio) transport for MCP Gateway, enabling
9communication over stdin/stdout streams. This transport is particularly useful
10for command-line tools, subprocess communication, and scenarios where processes
11need to communicate via standard I/O channels.
13The StdioTransport class provides asynchronous message handling with proper
14JSON encoding/decoding and stream management. It follows the MCP transport
15protocol for bidirectional communication between MCP clients and servers.
17Key Features:
18- Asynchronous stream handling with asyncio
19- JSON message encoding/decoding
20- Line-based message protocol
21- Proper connection state management
22- Error handling and logging
23- Clean resource cleanup
25Note:
26 This transport requires access to sys.stdin and sys.stdout. In testing
27 environments or when these streams are not available, the transport
28 will raise RuntimeError during connection attempts.
29"""
31# Standard
32import asyncio
33import sys
34from typing import Any, AsyncGenerator, Dict, Optional
36# Third-Party
37import orjson
39# First-Party
40from mcpgateway.services.logging_service import LoggingService
41from mcpgateway.transports.base import Transport
43# Initialize logging service first
44logging_service = LoggingService()
45logger = logging_service.get_logger(__name__)
48class StdioTransport(Transport):
49 """Transport implementation using stdio streams.
51 This transport implementation uses standard input/output streams for
52 communication. It's commonly used for command-line tools and processes
53 that communicate via stdin/stdout.
55 Examples:
56 >>> # Create a new stdio transport instance
57 >>> transport = StdioTransport()
58 >>> transport
59 <mcpgateway.transports.stdio_transport.StdioTransport object at ...>
61 >>> # Check initial connection state
62 >>> import asyncio
63 >>> asyncio.run(transport.is_connected())
64 False
66 >>> # Verify it's a proper Transport subclass
67 >>> isinstance(transport, Transport)
68 True
69 >>> issubclass(StdioTransport, Transport)
70 True
72 >>> # Check that required methods exist
73 >>> hasattr(transport, 'connect')
74 True
75 >>> hasattr(transport, 'disconnect')
76 True
77 >>> hasattr(transport, 'send_message')
78 True
79 >>> hasattr(transport, 'receive_message')
80 True
81 >>> hasattr(transport, 'is_connected')
82 True
83 """
85 def __init__(self):
86 """Initialize stdio transport.
88 Examples:
89 >>> # Create transport instance
90 >>> transport = StdioTransport()
91 >>> transport._stdin_reader is None
92 True
93 >>> transport._stdout_writer is None
94 True
95 >>> transport._connected
96 False
97 """
98 self._stdin_reader: Optional[asyncio.StreamReader] = None
99 self._stdout_writer: Optional[asyncio.StreamWriter] = None
100 self._connected = False
102 async def connect(self) -> None:
103 """Set up stdio streams.
105 Examples:
106 >>> # Note: This method requires actual stdio streams
107 >>> # and cannot be easily tested in doctest environment
108 >>> transport = StdioTransport()
109 >>> # The connect method exists and is callable
110 >>> callable(transport.connect)
111 True
112 """
113 loop = asyncio.get_running_loop()
115 # Set up stdin reader
116 reader = asyncio.StreamReader()
117 protocol = asyncio.StreamReaderProtocol(reader)
118 await loop.connect_read_pipe(lambda: protocol, sys.stdin)
119 self._stdin_reader = reader
121 # Set up stdout writer
122 transport, protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
123 self._stdout_writer = asyncio.StreamWriter(transport, protocol, reader, loop)
125 self._connected = True
126 logger.info("stdio transport connected")
128 async def disconnect(self) -> None:
129 """Clean up stdio streams.
131 Examples:
132 >>> # Note: This method requires actual stdio streams
133 >>> # and cannot be easily tested in doctest environment
134 >>> transport = StdioTransport()
135 >>> # The disconnect method exists and is callable
136 >>> callable(transport.disconnect)
137 True
138 """
139 if self._stdout_writer:
140 self._stdout_writer.close()
141 await self._stdout_writer.wait_closed()
142 self._connected = False
143 logger.info("stdio transport disconnected")
145 async def send_message(self, message: Dict[str, Any]) -> None:
146 """Send a message over stdout.
148 Args:
149 message: Message to send
151 Raises:
152 RuntimeError: If transport is not connected
153 Exception: If unable to write to stdio writer
155 Examples:
156 >>> # Test with unconnected transport
157 >>> transport = StdioTransport()
158 >>> import asyncio
159 >>> try:
160 ... asyncio.run(transport.send_message({"test": "message"}))
161 ... except RuntimeError as e:
162 ... print("Expected error:", str(e))
163 Expected error: Transport not connected
165 >>> # Verify message format validation
166 >>> transport = StdioTransport()
167 >>> # Valid message format
168 >>> valid_message = {"jsonrpc": "2.0", "method": "test", "id": 1}
169 >>> isinstance(valid_message, dict)
170 True
171 >>> "jsonrpc" in valid_message
172 True
173 """
174 if not self._stdout_writer:
175 raise RuntimeError("Transport not connected")
177 try:
178 # Write bytes directly, avoiding decode/encode roundtrip for performance
179 data = orjson.dumps(message)
180 self._stdout_writer.write(data + b"\n")
181 await self._stdout_writer.drain()
182 except Exception as e:
183 logger.error(f"Failed to send message: {e}")
184 raise
186 async def receive_message(self) -> AsyncGenerator[Dict[str, Any], None]:
187 """Receive messages from stdin.
189 Yields:
190 Received messages
192 Raises:
193 RuntimeError: If transport is not connected
195 Examples:
196 >>> # Test with unconnected transport
197 >>> transport = StdioTransport()
198 >>> import asyncio
199 >>> try:
200 ... async def test_receive():
201 ... async for msg in transport.receive_message():
202 ... pass
203 ... asyncio.run(test_receive())
204 ... except RuntimeError as e:
205 ... print("Expected error:", str(e))
206 Expected error: Transport not connected
208 >>> # Verify generator behavior
209 >>> transport = StdioTransport()
210 >>> # The method returns an async generator
211 >>> import inspect
212 >>> inspect.isasyncgenfunction(transport.receive_message)
213 True
214 """
215 if not self._stdin_reader:
216 raise RuntimeError("Transport not connected")
218 while True:
219 try:
220 # Read line from stdin
221 line = await self._stdin_reader.readline()
222 if not line:
223 break
225 # Parse JSON message
226 message = orjson.loads(line.strip())
227 yield message
229 except asyncio.CancelledError:
230 break
231 except Exception as e:
232 logger.error(f"Failed to receive message: {e}")
233 continue
235 async def is_connected(self) -> bool:
236 """Check if transport is connected.
238 Returns:
239 True if connected
241 Examples:
242 >>> # Test initial state
243 >>> transport = StdioTransport()
244 >>> import asyncio
245 >>> asyncio.run(transport.is_connected())
246 False
248 >>> # Test after manual connection state change
249 >>> transport = StdioTransport()
250 >>> transport._connected = True
251 >>> asyncio.run(transport.is_connected())
252 True
254 >>> # Test after manual disconnection
255 >>> transport = StdioTransport()
256 >>> transport._connected = True
257 >>> transport._connected = False
258 >>> asyncio.run(transport.is_connected())
259 False
260 """
261 return self._connected