Coverage for mcpgateway / transports / stdio_transport.py: 100%

56 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/transports/stdio_transport.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7stdio Transport Implementation. 

8This module implements standard input/output (stdio) transport for MCP Gateway, enabling 

9communication over stdin/stdout streams. This transport is particularly useful 

10for command-line tools, subprocess communication, and scenarios where processes 

11need to communicate via standard I/O channels. 

12 

13The StdioTransport class provides asynchronous message handling with proper 

14JSON encoding/decoding and stream management. It follows the MCP transport 

15protocol for bidirectional communication between MCP clients and servers. 

16 

17Key Features: 

18- Asynchronous stream handling with asyncio 

19- JSON message encoding/decoding 

20- Line-based message protocol 

21- Proper connection state management 

22- Error handling and logging 

23- Clean resource cleanup 

24 

25Note: 

26 This transport requires access to sys.stdin and sys.stdout. In testing 

27 environments or when these streams are not available, the transport 

28 will raise RuntimeError during connection attempts. 

29""" 

30 

31# Standard 

32import asyncio 

33import sys 

34from typing import Any, AsyncGenerator, Dict, Optional 

35 

36# Third-Party 

37import orjson 

38 

39# First-Party 

40from mcpgateway.services.logging_service import LoggingService 

41from mcpgateway.transports.base import Transport 

42 

43# Initialize logging service first 

44logging_service = LoggingService() 

45logger = logging_service.get_logger(__name__) 

46 

47 

48class StdioTransport(Transport): 

49 """Transport implementation using stdio streams. 

50 

51 This transport implementation uses standard input/output streams for 

52 communication. It's commonly used for command-line tools and processes 

53 that communicate via stdin/stdout. 

54 

55 Examples: 

56 >>> # Create a new stdio transport instance 

57 >>> transport = StdioTransport() 

58 >>> transport 

59 <mcpgateway.transports.stdio_transport.StdioTransport object at ...> 

60 

61 >>> # Check initial connection state 

62 >>> import asyncio 

63 >>> asyncio.run(transport.is_connected()) 

64 False 

65 

66 >>> # Verify it's a proper Transport subclass 

67 >>> isinstance(transport, Transport) 

68 True 

69 >>> issubclass(StdioTransport, Transport) 

70 True 

71 

72 >>> # Check that required methods exist 

73 >>> hasattr(transport, 'connect') 

74 True 

75 >>> hasattr(transport, 'disconnect') 

76 True 

77 >>> hasattr(transport, 'send_message') 

78 True 

79 >>> hasattr(transport, 'receive_message') 

80 True 

81 >>> hasattr(transport, 'is_connected') 

82 True 

83 """ 

84 

85 def __init__(self): 

86 """Initialize stdio transport. 

87 

88 Examples: 

89 >>> # Create transport instance 

90 >>> transport = StdioTransport() 

91 >>> transport._stdin_reader is None 

92 True 

93 >>> transport._stdout_writer is None 

94 True 

95 >>> transport._connected 

96 False 

97 """ 

98 self._stdin_reader: Optional[asyncio.StreamReader] = None 

99 self._stdout_writer: Optional[asyncio.StreamWriter] = None 

100 self._connected = False 

101 

102 async def connect(self) -> None: 

103 """Set up stdio streams. 

104 

105 Examples: 

106 >>> # Note: This method requires actual stdio streams 

107 >>> # and cannot be easily tested in doctest environment 

108 >>> transport = StdioTransport() 

109 >>> # The connect method exists and is callable 

110 >>> callable(transport.connect) 

111 True 

112 """ 

113 loop = asyncio.get_running_loop() 

114 

115 # Set up stdin reader 

116 reader = asyncio.StreamReader() 

117 protocol = asyncio.StreamReaderProtocol(reader) 

118 await loop.connect_read_pipe(lambda: protocol, sys.stdin) 

119 self._stdin_reader = reader 

120 

121 # Set up stdout writer 

122 transport, protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout) 

123 self._stdout_writer = asyncio.StreamWriter(transport, protocol, reader, loop) 

124 

125 self._connected = True 

126 logger.info("stdio transport connected") 

127 

128 async def disconnect(self) -> None: 

129 """Clean up stdio streams. 

130 

131 Examples: 

132 >>> # Note: This method requires actual stdio streams 

133 >>> # and cannot be easily tested in doctest environment 

134 >>> transport = StdioTransport() 

135 >>> # The disconnect method exists and is callable 

136 >>> callable(transport.disconnect) 

137 True 

138 """ 

139 if self._stdout_writer: 

140 self._stdout_writer.close() 

141 await self._stdout_writer.wait_closed() 

142 self._connected = False 

143 logger.info("stdio transport disconnected") 

144 

145 async def send_message(self, message: Dict[str, Any]) -> None: 

146 """Send a message over stdout. 

147 

148 Args: 

149 message: Message to send 

150 

151 Raises: 

152 RuntimeError: If transport is not connected 

153 Exception: If unable to write to stdio writer 

154 

155 Examples: 

156 >>> # Test with unconnected transport 

157 >>> transport = StdioTransport() 

158 >>> import asyncio 

159 >>> try: 

160 ... asyncio.run(transport.send_message({"test": "message"})) 

161 ... except RuntimeError as e: 

162 ... print("Expected error:", str(e)) 

163 Expected error: Transport not connected 

164 

165 >>> # Verify message format validation 

166 >>> transport = StdioTransport() 

167 >>> # Valid message format 

168 >>> valid_message = {"jsonrpc": "2.0", "method": "test", "id": 1} 

169 >>> isinstance(valid_message, dict) 

170 True 

171 >>> "jsonrpc" in valid_message 

172 True 

173 """ 

174 if not self._stdout_writer: 

175 raise RuntimeError("Transport not connected") 

176 

177 try: 

178 # Write bytes directly, avoiding decode/encode roundtrip for performance 

179 data = orjson.dumps(message) 

180 self._stdout_writer.write(data + b"\n") 

181 await self._stdout_writer.drain() 

182 except Exception as e: 

183 logger.error(f"Failed to send message: {e}") 

184 raise 

185 

186 async def receive_message(self) -> AsyncGenerator[Dict[str, Any], None]: 

187 """Receive messages from stdin. 

188 

189 Yields: 

190 Received messages 

191 

192 Raises: 

193 RuntimeError: If transport is not connected 

194 

195 Examples: 

196 >>> # Test with unconnected transport 

197 >>> transport = StdioTransport() 

198 >>> import asyncio 

199 >>> try: 

200 ... async def test_receive(): 

201 ... async for msg in transport.receive_message(): 

202 ... pass 

203 ... asyncio.run(test_receive()) 

204 ... except RuntimeError as e: 

205 ... print("Expected error:", str(e)) 

206 Expected error: Transport not connected 

207 

208 >>> # Verify generator behavior 

209 >>> transport = StdioTransport() 

210 >>> # The method returns an async generator 

211 >>> import inspect 

212 >>> inspect.isasyncgenfunction(transport.receive_message) 

213 True 

214 """ 

215 if not self._stdin_reader: 

216 raise RuntimeError("Transport not connected") 

217 

218 while True: 

219 try: 

220 # Read line from stdin 

221 line = await self._stdin_reader.readline() 

222 if not line: 

223 break 

224 

225 # Parse JSON message 

226 message = orjson.loads(line.strip()) 

227 yield message 

228 

229 except asyncio.CancelledError: 

230 break 

231 except Exception as e: 

232 logger.error(f"Failed to receive message: {e}") 

233 continue 

234 

235 async def is_connected(self) -> bool: 

236 """Check if transport is connected. 

237 

238 Returns: 

239 True if connected 

240 

241 Examples: 

242 >>> # Test initial state 

243 >>> transport = StdioTransport() 

244 >>> import asyncio 

245 >>> asyncio.run(transport.is_connected()) 

246 False 

247 

248 >>> # Test after manual connection state change 

249 >>> transport = StdioTransport() 

250 >>> transport._connected = True 

251 >>> asyncio.run(transport.is_connected()) 

252 True 

253 

254 >>> # Test after manual disconnection 

255 >>> transport = StdioTransport() 

256 >>> transport._connected = True 

257 >>> transport._connected = False 

258 >>> asyncio.run(transport.is_connected()) 

259 False 

260 """ 

261 return self._connected