Coverage for mcpgateway / plugins / framework / external / unix / protocol.py: 100%
28 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/plugins/framework/external/unix/protocol.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Teryl Taylor
7Protocol helpers for length-prefixed message framing over Unix sockets.
9This module provides simple, efficient message framing using a 4-byte
10big-endian length prefix followed by the message payload.
12Wire format: [4-byte length (big-endian)][payload bytes]
14Examples:
15 Writing a message:
17 >>> import asyncio
18 >>> from mcpgateway.plugins.framework.external.unix.protocol import write_message, read_message
20 Reading and writing work as inverse operations:
22 >>> # In an async context with reader/writer streams
23 >>> # write_message(writer, b"hello")
24 >>> # data = await read_message(reader) # returns b"hello"
25"""
27# Standard
28import asyncio
29import struct
30from typing import Optional
32# 4-byte big-endian unsigned int for length prefix
33LENGTH_FORMAT = ">I"
34LENGTH_SIZE = 4
36# Maximum message size (16 MB) to prevent memory exhaustion
37MAX_MESSAGE_SIZE = 16 * 1024 * 1024
40class ProtocolError(Exception):
41 """Raised when a protocol-level error occurs."""
44async def read_message(reader: asyncio.StreamReader, timeout: Optional[float] = None) -> bytes:
45 """Read a length-prefixed message from the stream.
47 Args:
48 reader: The async stream reader.
49 timeout: Optional timeout in seconds for the read operation.
51 Returns:
52 The message payload as bytes.
54 Raises:
55 ProtocolError: If the message is malformed or too large.
56 asyncio.IncompleteReadError: If the connection is closed mid-read.
57 asyncio.TimeoutError: If the read times out.
59 Examples:
60 >>> # In an async context
61 >>> # data = await read_message(reader)
62 >>> # data = await read_message(reader, timeout=5.0)
63 """
65 async def _read() -> bytes:
66 """Read and validate a length-prefixed message from the stream.
68 Returns:
69 The data read from the message stream as bytes.
71 Raises:
72 ProtocolError: If the message is too large.
74 """
75 # Read 4-byte length prefix
76 length_bytes = await reader.readexactly(LENGTH_SIZE)
77 length = struct.unpack(LENGTH_FORMAT, length_bytes)[0]
79 # Validate message size
80 if length > MAX_MESSAGE_SIZE:
81 raise ProtocolError(f"Message size {length} exceeds maximum {MAX_MESSAGE_SIZE}")
83 if length == 0:
84 return b""
86 # Read the message payload
87 return await reader.readexactly(length)
89 if timeout is not None:
90 return await asyncio.wait_for(_read(), timeout=timeout)
91 return await _read()
94def write_message(writer: asyncio.StreamWriter, data: bytes) -> None:
95 """Write a length-prefixed message to the stream.
97 This writes the message to the buffer but does not flush. Call
98 `await writer.drain()` after writing to ensure delivery.
100 Args:
101 writer: The async stream writer.
102 data: The message payload to write.
104 Raises:
105 ProtocolError: If the message is too large.
107 Examples:
108 >>> # In an async context
109 >>> # write_message(writer, b"hello")
110 >>> # await writer.drain()
111 """
112 if len(data) > MAX_MESSAGE_SIZE:
113 raise ProtocolError(f"Message size {len(data)} exceeds maximum {MAX_MESSAGE_SIZE}")
115 length = struct.pack(LENGTH_FORMAT, len(data))
116 writer.write(length + data)
119async def write_message_async(writer: asyncio.StreamWriter, data: bytes, drain: bool = True) -> None:
120 """Write a length-prefixed message and optionally drain.
122 Args:
123 writer: The async stream writer.
124 data: The message payload to write.
125 drain: Whether to drain the write buffer (default True).
127 Raises:
128 ProtocolError: If the message is too large.
130 Examples:
131 >>> # In an async context
132 >>> # await write_message_async(writer, b"hello")
133 """
134 write_message(writer, data)
135 if drain:
136 await writer.drain()