Coverage for mcpgateway / plugins / framework / external / unix / protocol.py: 100%

28 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/plugins/framework/external/unix/protocol.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Teryl Taylor 

6 

7Protocol helpers for length-prefixed message framing over Unix sockets. 

8 

9This module provides simple, efficient message framing using a 4-byte 

10big-endian length prefix followed by the message payload. 

11 

12Wire format: [4-byte length (big-endian)][payload bytes] 

13 

14Examples: 

15 Writing a message: 

16 

17 >>> import asyncio 

18 >>> from mcpgateway.plugins.framework.external.unix.protocol import write_message, read_message 

19 

20 Reading and writing work as inverse operations: 

21 

22 >>> # In an async context with reader/writer streams 

23 >>> # write_message(writer, b"hello") 

24 >>> # data = await read_message(reader) # returns b"hello" 

25""" 

26 

27# Standard 

28import asyncio 

29import struct 

30from typing import Optional 

31 

32# 4-byte big-endian unsigned int for length prefix 

33LENGTH_FORMAT = ">I" 

34LENGTH_SIZE = 4 

35 

36# Maximum message size (16 MB) to prevent memory exhaustion 

37MAX_MESSAGE_SIZE = 16 * 1024 * 1024 

38 

39 

40class ProtocolError(Exception): 

41 """Raised when a protocol-level error occurs.""" 

42 

43 

44async def read_message(reader: asyncio.StreamReader, timeout: Optional[float] = None) -> bytes: 

45 """Read a length-prefixed message from the stream. 

46 

47 Args: 

48 reader: The async stream reader. 

49 timeout: Optional timeout in seconds for the read operation. 

50 

51 Returns: 

52 The message payload as bytes. 

53 

54 Raises: 

55 ProtocolError: If the message is malformed or too large. 

56 asyncio.IncompleteReadError: If the connection is closed mid-read. 

57 asyncio.TimeoutError: If the read times out. 

58 

59 Examples: 

60 >>> # In an async context 

61 >>> # data = await read_message(reader) 

62 >>> # data = await read_message(reader, timeout=5.0) 

63 """ 

64 

65 async def _read() -> bytes: 

66 """Read and validate a length-prefixed message from the stream. 

67 

68 Returns: 

69 The data read from the message stream as bytes. 

70 

71 Raises: 

72 ProtocolError: If the message is too large. 

73 

74 """ 

75 # Read 4-byte length prefix 

76 length_bytes = await reader.readexactly(LENGTH_SIZE) 

77 length = struct.unpack(LENGTH_FORMAT, length_bytes)[0] 

78 

79 # Validate message size 

80 if length > MAX_MESSAGE_SIZE: 

81 raise ProtocolError(f"Message size {length} exceeds maximum {MAX_MESSAGE_SIZE}") 

82 

83 if length == 0: 

84 return b"" 

85 

86 # Read the message payload 

87 return await reader.readexactly(length) 

88 

89 if timeout is not None: 

90 return await asyncio.wait_for(_read(), timeout=timeout) 

91 return await _read() 

92 

93 

94def write_message(writer: asyncio.StreamWriter, data: bytes) -> None: 

95 """Write a length-prefixed message to the stream. 

96 

97 This writes the message to the buffer but does not flush. Call 

98 `await writer.drain()` after writing to ensure delivery. 

99 

100 Args: 

101 writer: The async stream writer. 

102 data: The message payload to write. 

103 

104 Raises: 

105 ProtocolError: If the message is too large. 

106 

107 Examples: 

108 >>> # In an async context 

109 >>> # write_message(writer, b"hello") 

110 >>> # await writer.drain() 

111 """ 

112 if len(data) > MAX_MESSAGE_SIZE: 

113 raise ProtocolError(f"Message size {len(data)} exceeds maximum {MAX_MESSAGE_SIZE}") 

114 

115 length = struct.pack(LENGTH_FORMAT, len(data)) 

116 writer.write(length + data) 

117 

118 

119async def write_message_async(writer: asyncio.StreamWriter, data: bytes, drain: bool = True) -> None: 

120 """Write a length-prefixed message and optionally drain. 

121 

122 Args: 

123 writer: The async stream writer. 

124 data: The message payload to write. 

125 drain: Whether to drain the write buffer (default True). 

126 

127 Raises: 

128 ProtocolError: If the message is too large. 

129 

130 Examples: 

131 >>> # In an async context 

132 >>> # await write_message_async(writer, b"hello") 

133 """ 

134 write_message(writer, data) 

135 if drain: 

136 await writer.drain()