Coverage for mcpgateway / middleware / compression.py: 100%

22 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/middleware/compression.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7SSE-Aware Compression Middleware for MCP Gateway. 

8 

9This module wraps starlette-compress to skip compression for Server-Sent Events (SSE) 

10responses, which should not be compressed as it can break streaming behavior. 

11""" 

12 

13# Third-Party 

14from starlette.types import ASGIApp, Receive, Scope, Send 

15from starlette_compress import CompressMiddleware 

16 

17# First-Party 

18from mcpgateway.config import settings 

19 

20 

21class SSEAwareCompressMiddleware: 

22 """ 

23 Compression middleware that skips compression for SSE responses on /mcp paths. 

24 

25 Server-Sent Events (text/event-stream) responses should not be compressed 

26 because compression can buffer the stream and break real-time delivery. 

27 When json_response_enabled=False (SSE mode), this middleware bypasses 

28 compression for /mcp endpoints. 

29 

30 When json_response_enabled=True (default), all responses including /mcp 

31 are compressed normally since they return JSON, not SSE streams. 

32 

33 Examples: 

34 >>> from unittest.mock import AsyncMock 

35 >>> app = AsyncMock() 

36 >>> middleware = SSEAwareCompressMiddleware(app, minimum_size=500) 

37 >>> isinstance(middleware, SSEAwareCompressMiddleware) 

38 True 

39 >>> middleware.app is app 

40 True 

41 

42 >>> # Test path matching logic 

43 >>> def is_mcp_path(path): 

44 ... return path == "/mcp" or path == "/mcp/" or path.endswith("/mcp") or path.endswith("/mcp/") 

45 >>> is_mcp_path("/mcp") 

46 True 

47 >>> is_mcp_path("/mcp/") 

48 True 

49 >>> is_mcp_path("/servers/123/mcp") 

50 True 

51 >>> is_mcp_path("/servers/123/mcp/") 

52 True 

53 >>> is_mcp_path("/tools") 

54 False 

55 """ 

56 

57 def __init__( 

58 self, 

59 app: ASGIApp, 

60 *, 

61 minimum_size: int = 500, 

62 gzip_level: int = 6, 

63 brotli_quality: int = 4, 

64 zstd_level: int = 3, 

65 ) -> None: 

66 """ 

67 Initialize the SSE-aware compression middleware. 

68 

69 Args: 

70 app: The ASGI application to wrap. 

71 minimum_size: Minimum response size to compress (bytes). 

72 gzip_level: GZip compression level (1-9). 

73 brotli_quality: Brotli compression quality (0-11). 

74 zstd_level: Zstandard compression level. 

75 

76 Example: 

77 >>> from unittest.mock import AsyncMock 

78 >>> app = AsyncMock() 

79 >>> middleware = SSEAwareCompressMiddleware(app, minimum_size=1000) 

80 >>> middleware.minimum_size 

81 1000 

82 """ 

83 self.app = app 

84 self.minimum_size = minimum_size 

85 self.gzip_level = gzip_level 

86 self.brotli_quality = brotli_quality 

87 self.zstd_level = zstd_level 

88 

89 # Create the underlying compression middleware 

90 self.compress_app = CompressMiddleware( 

91 app, 

92 minimum_size=minimum_size, 

93 gzip_level=gzip_level, 

94 brotli_quality=brotli_quality, 

95 zstd_level=zstd_level, 

96 ) 

97 

98 def _is_mcp_path(self, path: str) -> bool: 

99 """Check if the path is an MCP endpoint. 

100 

101 Args: 

102 path: The request path to check. 

103 

104 Returns: 

105 True if the path is an MCP endpoint, False otherwise. 

106 """ 

107 return path == "/mcp" or path == "/mcp/" or path.endswith("/mcp") or path.endswith("/mcp/") 

108 

109 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: 

110 """ 

111 Process the ASGI request, skipping compression for SSE responses. 

112 

113 When json_response_enabled=False (SSE mode), MCP paths bypass compression 

114 to prevent buffering of streaming responses. 

115 

116 Args: 

117 scope: The ASGI connection scope. 

118 receive: The ASGI receive callable. 

119 send: The ASGI send callable. 

120 

121 Example: 

122 >>> import asyncio 

123 >>> from unittest.mock import AsyncMock, patch 

124 >>> app = AsyncMock() 

125 >>> middleware = SSEAwareCompressMiddleware(app) 

126 >>> # Non-HTTP requests pass through to compress middleware 

127 >>> scope = {"type": "websocket"} 

128 >>> asyncio.run(middleware(scope, AsyncMock(), AsyncMock())) 

129 """ 

130 if scope["type"] != "http": 

131 # Non-HTTP requests (websocket, lifespan) go through compression 

132 await self.compress_app(scope, receive, send) 

133 return 

134 

135 path = scope.get("path", "") 

136 

137 # When SSE mode is enabled (json_response_enabled=False), skip compression 

138 # for MCP paths to prevent buffering of streaming responses 

139 if not settings.json_response_enabled and self._is_mcp_path(path): 

140 # SSE mode for MCP - bypass compression entirely 

141 await self.app(scope, receive, send) 

142 return 

143 

144 # For all other requests (including MCP in JSON mode), use compression 

145 await self.compress_app(scope, receive, send)