Coverage for mcpgateway / transports / stdio_transport.py: 100%

54 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/transports/stdio_transport.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7stdio Transport Implementation. 

8This module implements standard input/output (stdio) transport for ContextForge, enabling 

9communication over stdin/stdout streams. This transport is particularly useful 

10for command-line tools, subprocess communication, and scenarios where processes 

11need to communicate via standard I/O channels. 

12 

13The StdioTransport class provides asynchronous message handling with proper 

14JSON encoding/decoding and stream management. It follows the MCP transport 

15protocol for bidirectional communication between MCP clients and servers. 

16 

17Key Features: 

18- Asynchronous stream handling with asyncio 

19- JSON message encoding/decoding 

20- Line-based message protocol 

21- Proper connection state management 

22- Error handling and logging 

23- Clean resource cleanup 

24 

25Note: 

26 This transport requires access to sys.stdin and sys.stdout. In testing 

27 environments or when these streams are not available, the transport 

28 will raise RuntimeError during connection attempts. 

29""" 

30 

31# Standard 

32import asyncio 

33import sys 

34from typing import Any, AsyncGenerator, Dict, Optional 

35 

36# Third-Party 

37import orjson 

38 

39# First-Party 

40from mcpgateway.services.logging_service import LoggingService 

41from mcpgateway.transports.base import Transport 

42 

43# Initialize logging service first 

44logging_service = LoggingService() 

45logger = logging_service.get_logger(__name__) 

46 

47 

48class StdioTransport(Transport): 

49 """Transport implementation using stdio streams. 

50 

51 This transport implementation uses standard input/output streams for 

52 communication. It's commonly used for command-line tools and processes 

53 that communicate via stdin/stdout. 

54 

55 Examples: 

56 >>> # Create a new stdio transport instance 

57 >>> transport = StdioTransport() 

58 >>> transport 

59 <mcpgateway.transports.stdio_transport.StdioTransport object at ...> 

60 

61 >>> # Check initial connection state 

62 >>> import asyncio 

63 >>> asyncio.run(transport.is_connected()) 

64 False 

65 

66 >>> # Verify it's a proper Transport subclass 

67 >>> isinstance(transport, Transport) 

68 True 

69 >>> issubclass(StdioTransport, Transport) 

70 True 

71 

72 >>> # Check that required methods exist 

73 >>> hasattr(transport, 'connect') 

74 True 

75 >>> hasattr(transport, 'disconnect') 

76 True 

77 >>> hasattr(transport, 'send_message') 

78 True 

79 >>> hasattr(transport, 'receive_message') 

80 True 

81 >>> hasattr(transport, 'is_connected') 

82 True 

83 """ 

84 

85 def __init__(self): 

86 """Initialize stdio transport. 

87 

88 Examples: 

89 >>> # Create transport instance 

90 >>> transport = StdioTransport() 

91 >>> transport._stdin_reader is None 

92 True 

93 >>> transport._stdout_writer is None 

94 True 

95 >>> transport._connected 

96 False 

97 """ 

98 self._stdin_reader: Optional[asyncio.StreamReader] = None 

99 self._stdout_writer: Optional[asyncio.StreamWriter] = None 

100 self._connected = False 

101 

102 async def connect(self) -> None: 

103 """Set up stdio streams. 

104 

105 Examples: 

106 >>> # Note: This method requires actual stdio streams 

107 >>> # and cannot be easily tested in doctest environment 

108 >>> transport = StdioTransport() 

109 >>> # The connect method exists and is callable 

110 >>> callable(transport.connect) 

111 True 

112 """ 

113 loop = asyncio.get_running_loop() 

114 

115 # Set up stdin reader 

116 reader = asyncio.StreamReader() 

117 protocol = asyncio.StreamReaderProtocol(reader) 

118 await loop.connect_read_pipe(lambda: protocol, sys.stdin) 

119 self._stdin_reader = reader 

120 

121 # Set up stdout writer 

122 transport, protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout) 

123 self._stdout_writer = asyncio.StreamWriter(transport, protocol, reader, loop) 

124 

125 self._connected = True 

126 logger.info("stdio transport connected") 

127 

128 async def disconnect(self) -> None: 

129 """Clean up stdio streams. 

130 

131 Examples: 

132 >>> # Note: This method requires actual stdio streams 

133 >>> # and cannot be easily tested in doctest environment 

134 >>> transport = StdioTransport() 

135 >>> # The disconnect method exists and is callable 

136 >>> callable(transport.disconnect) 

137 True 

138 """ 

139 if self._stdout_writer: 

140 self._stdout_writer.close() 

141 await self._stdout_writer.wait_closed() 

142 self._connected = False 

143 logger.info("stdio transport disconnected") 

144 

145 async def send_message(self, message: Dict[str, Any]) -> None: 

146 """Send a message over stdout. 

147 

148 Args: 

149 message: Message to send 

150 

151 Raises: 

152 RuntimeError: If transport is not connected 

153 Exception: If unable to write to stdio writer 

154 

155 Examples: 

156 >>> # Test with unconnected transport 

157 >>> transport = StdioTransport() 

158 >>> import asyncio 

159 >>> try: 

160 ... asyncio.run(transport.send_message({"test": "message"})) 

161 ... except RuntimeError as e: 

162 ... print("Expected error:", str(e)) 

163 Expected error: Transport not connected 

164 

165 >>> # Verify message format validation 

166 >>> transport = StdioTransport() 

167 >>> # Valid message format 

168 >>> valid_message = {"jsonrpc": "2.0", "method": "test", "id": 1} 

169 >>> isinstance(valid_message, dict) 

170 True 

171 >>> "jsonrpc" in valid_message 

172 True 

173 """ 

174 if not self._stdout_writer: 

175 raise RuntimeError("Transport not connected") 

176 

177 try: 

178 # Write bytes directly, avoiding decode/encode roundtrip for performance 

179 data = orjson.dumps(message) 

180 self._stdout_writer.write(data + b"\n") 

181 await self._stdout_writer.drain() 

182 except Exception as e: 

183 logger.error(f"Failed to send message: {e}") 

184 raise 

185 

186 async def receive_message(self) -> AsyncGenerator[Dict[str, Any], None]: 

187 """Receive messages from stdin. 

188 

189 Yields: 

190 Received messages 

191 

192 Raises: 

193 asyncio.CancelledError: If the receive loop is cancelled. 

194 RuntimeError: If transport is not connected 

195 

196 Examples: 

197 >>> # Test with unconnected transport 

198 >>> transport = StdioTransport() 

199 >>> import asyncio 

200 >>> try: 

201 ... async def test_receive(): 

202 ... async for msg in transport.receive_message(): 

203 ... pass 

204 ... asyncio.run(test_receive()) 

205 ... except RuntimeError as e: 

206 ... print("Expected error:", str(e)) 

207 Expected error: Transport not connected 

208 

209 >>> # Verify generator behavior 

210 >>> transport = StdioTransport() 

211 >>> # The method returns an async generator 

212 >>> import inspect 

213 >>> inspect.isasyncgenfunction(transport.receive_message) 

214 True 

215 """ 

216 if not self._stdin_reader: 

217 raise RuntimeError("Transport not connected") 

218 

219 while True: 

220 try: 

221 # Read line from stdin 

222 line = await self._stdin_reader.readline() 

223 if not line: 

224 break 

225 

226 # Parse JSON message 

227 message = orjson.loads(line.strip()) 

228 yield message 

229 

230 except Exception as e: 

231 logger.error(f"Failed to receive message: {e}") 

232 continue 

233 

234 async def is_connected(self) -> bool: 

235 """Check if transport is connected. 

236 

237 Returns: 

238 True if connected 

239 

240 Examples: 

241 >>> # Test initial state 

242 >>> transport = StdioTransport() 

243 >>> import asyncio 

244 >>> asyncio.run(transport.is_connected()) 

245 False 

246 

247 >>> # Test after manual connection state change 

248 >>> transport = StdioTransport() 

249 >>> transport._connected = True 

250 >>> asyncio.run(transport.is_connected()) 

251 True 

252 

253 >>> # Test after manual disconnection 

254 >>> transport = StdioTransport() 

255 >>> transport._connected = True 

256 >>> transport._connected = False 

257 >>> asyncio.run(transport.is_connected()) 

258 False 

259 """ 

260 return self._connected