Coverage for mcpgateway / middleware / compression.py: 100%
22 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/middleware/compression.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7SSE-Aware Compression Middleware for MCP Gateway.
9This module wraps starlette-compress to skip compression for Server-Sent Events (SSE)
10responses, which should not be compressed as it can break streaming behavior.
11"""
13# Third-Party
14from starlette.types import ASGIApp, Receive, Scope, Send
15from starlette_compress import CompressMiddleware
17# First-Party
18from mcpgateway.config import settings
21class SSEAwareCompressMiddleware:
22 """
23 Compression middleware that skips compression for SSE responses on /mcp paths.
25 Server-Sent Events (text/event-stream) responses should not be compressed
26 because compression can buffer the stream and break real-time delivery.
27 When json_response_enabled=False (SSE mode), this middleware bypasses
28 compression for /mcp endpoints.
30 When json_response_enabled=True (default), all responses including /mcp
31 are compressed normally since they return JSON, not SSE streams.
33 Examples:
34 >>> from unittest.mock import AsyncMock
35 >>> app = AsyncMock()
36 >>> middleware = SSEAwareCompressMiddleware(app, minimum_size=500)
37 >>> isinstance(middleware, SSEAwareCompressMiddleware)
38 True
39 >>> middleware.app is app
40 True
42 >>> # Test path matching logic
43 >>> def is_mcp_path(path):
44 ... return path == "/mcp" or path == "/mcp/" or path.endswith("/mcp") or path.endswith("/mcp/")
45 >>> is_mcp_path("/mcp")
46 True
47 >>> is_mcp_path("/mcp/")
48 True
49 >>> is_mcp_path("/servers/123/mcp")
50 True
51 >>> is_mcp_path("/servers/123/mcp/")
52 True
53 >>> is_mcp_path("/tools")
54 False
55 """
57 def __init__(
58 self,
59 app: ASGIApp,
60 *,
61 minimum_size: int = 500,
62 gzip_level: int = 6,
63 brotli_quality: int = 4,
64 zstd_level: int = 3,
65 ) -> None:
66 """
67 Initialize the SSE-aware compression middleware.
69 Args:
70 app: The ASGI application to wrap.
71 minimum_size: Minimum response size to compress (bytes).
72 gzip_level: GZip compression level (1-9).
73 brotli_quality: Brotli compression quality (0-11).
74 zstd_level: Zstandard compression level.
76 Example:
77 >>> from unittest.mock import AsyncMock
78 >>> app = AsyncMock()
79 >>> middleware = SSEAwareCompressMiddleware(app, minimum_size=1000)
80 >>> middleware.minimum_size
81 1000
82 """
83 self.app = app
84 self.minimum_size = minimum_size
85 self.gzip_level = gzip_level
86 self.brotli_quality = brotli_quality
87 self.zstd_level = zstd_level
89 # Create the underlying compression middleware
90 self.compress_app = CompressMiddleware(
91 app,
92 minimum_size=minimum_size,
93 gzip_level=gzip_level,
94 brotli_quality=brotli_quality,
95 zstd_level=zstd_level,
96 )
98 def _is_mcp_path(self, path: str) -> bool:
99 """Check if the path is an MCP endpoint.
101 Args:
102 path: The request path to check.
104 Returns:
105 True if the path is an MCP endpoint, False otherwise.
106 """
107 return path == "/mcp" or path == "/mcp/" or path.endswith("/mcp") or path.endswith("/mcp/")
109 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
110 """
111 Process the ASGI request, skipping compression for SSE responses.
113 When json_response_enabled=False (SSE mode), MCP paths bypass compression
114 to prevent buffering of streaming responses.
116 Args:
117 scope: The ASGI connection scope.
118 receive: The ASGI receive callable.
119 send: The ASGI send callable.
121 Example:
122 >>> import asyncio
123 >>> from unittest.mock import AsyncMock, patch
124 >>> app = AsyncMock()
125 >>> middleware = SSEAwareCompressMiddleware(app)
126 >>> # Non-HTTP requests pass through to compress middleware
127 >>> scope = {"type": "websocket"}
128 >>> asyncio.run(middleware(scope, AsyncMock(), AsyncMock()))
129 """
130 if scope["type"] != "http":
131 # Non-HTTP requests (websocket, lifespan) go through compression
132 await self.compress_app(scope, receive, send)
133 return
135 path = scope.get("path", "")
137 # When SSE mode is enabled (json_response_enabled=False), skip compression
138 # for MCP paths to prevent buffering of streaming responses
139 if not settings.json_response_enabled and self._is_mcp_path(path):
140 # SSE mode for MCP - bypass compression entirely
141 await self.app(scope, receive, send)
142 return
144 # For all other requests (including MCP in JSON mode), use compression
145 await self.compress_app(scope, receive, send)