Middleware Guide
Middleware in MCP Composer provides a powerful way to intercept, modify, and enhance tool calls, resource operations, and other MCP interactions. This guide covers how to create custom middleware and describes the available built-in middleware components.
Overview
Middleware follows a chain-of-responsibility pattern where each middleware can:
- Process requests before they reach the target handler
- Modify responses after they're generated
- Add cross-cutting concerns like logging, authentication, rate limiting, etc.
- Handle errors and exceptions
Creating Custom Middleware
Basic Middleware Structure
All middleware must inherit from the Middleware
base class and implement the appropriate hook methods:
from fastmcp.server.middleware import Middleware, MiddlewareContext, CallNext
from typing import Any
class MyCustomMiddleware(Middleware):
def __init__(self, config_param: str = "default"):
super().__init__()
self.config_param = config_param
async def on_call_tool(
self,
context: MiddlewareContext,
call_next: CallNext
) -> Any:
# Pre-processing logic
print(f"Before calling tool: {context.message.name}")
# Call the next middleware or handler
result = await call_next(context)
# Post-processing logic
print(f"After calling tool: {context.message.name}")
return result
Available Hook Methods
Middleware can implement these hook methods:
on_call_tool(context, call_next)
Called before and after tool execution.
async def on_call_tool(self, context, call_next):
# Pre-processing
tool_name = getattr(context.message, "name", "<unknown>")
arguments = getattr(context.message, "arguments", {})
# Modify arguments if needed
# context.message.arguments = modified_arguments
# Call next middleware/handler
result = await call_next(context)
# Post-processing
# Modify result if needed
return result
on_list_tools(context, call_next)
Called when listing available tools.
async def on_list_tools(self, context, call_next):
result = await call_next(context)
# Modify tool list if needed
# for tool in result.tools:
# tool.description = "Modified description"
return result
on_read_resource(context, call_next)
Called when reading resources.
async def on_read_resource(self, context, call_next):
result = await call_next(context)
# Modify resource content if needed
return result
Adding Middleware to Your Application
from fastmcp import FastMCP
from mcp_composer.middleware import MyCustomMiddleware
app = FastMCP("My App")
# Add middleware
app.add_middleware(MyCustomMiddleware(config_param="my_value"))
# Add multiple middleware (order matters)
app.add_middleware(AnotherMiddleware())
app.add_middleware(YetAnotherMiddleware())
Built-in Middleware
MCP Composer provides several built-in middleware components for common use cases.
Circuit Breaker Middleware
Prevents cascading failures by temporarily stopping calls to failing services.
from mcp_composer.middleware import CircuitBreakerMiddleware
app.add_middleware(
CircuitBreakerMiddleware(
failure_threshold=5, # Number of failures to trip
open_timeout=30.0, # Seconds to stay OPEN
window_seconds=60.0, # Rolling failure window
exempt_tools={"health_check"} # Tools to exempt
)
)
States:
- CLOSED: Normal operation, calls pass through
- OPEN: Circuit is open, calls are blocked
- HALF_OPEN: Allows one probe call to test recovery
Rate Limiting Middleware
Controls the rate of requests to prevent abuse.
from mcp_composer.middleware import RateLimiterMiddleware
app.add_middleware(
RateLimiterMiddleware(
per_tool_limits={
"expensive_tool": 10, # 10 calls per minute
"search_api": 100 # 100 calls per minute
},
per_tenant_limits={
"tenant_a": 50, # 50 calls per minute per tenant
"tenant_b": 100
},
acquire_timeout=2.0, # Fail fast if queue exceeds 2s
get_tenant=lambda ctx: getattr(ctx, "tenant_id", "unknown")
)
)
Concurrency Limiting Middleware
Limits concurrent executions using semaphores (bulkhead pattern).
from mcp_composer.middleware import ConcurrencyLimiterMiddleware
app.add_middleware(
ConcurrencyLimiterMiddleware(
per_tool_limits={
"ask_llm": 8, # Max 8 concurrent calls
"search_docs": 16
},
per_tenant_limits={
"tenant_a": 10, # Max 10 concurrent per tenant
},
acquire_timeout=1.5, # Fail fast if queue exceeds 1.5s
get_tenant=lambda ctx: getattr(ctx, "tenant_id", "unknown")
)
)
Prompt Injection Protection Middleware
Detects and prevents prompt injection attacks.
from mcp_composer.middleware import PromptInjectionMiddleware
app.add_middleware(
PromptInjectionMiddleware(
block_on_high_risk=True, # Block high-risk calls
threshold=0.75, # Risk threshold (0-1)
url_allowlist=[ # Allowed URL prefixes
"https://docs.company.com/",
"https://api.company.com/"
],
sanitize_on_medium=True, # Sanitize medium-risk calls
inspect_fields=["query", "prompt"] # Fields to inspect
)
)
PII and Secrets Redaction Middleware
Automatically redacts sensitive information from inputs and outputs.
from mcp_composer.middleware import SecretsAndPIIMiddleware, RedactionStrategy
app.add_middleware(
SecretsAndPIIMiddleware(
strategy=RedactionStrategy(
mode="mask", # "mask", "hash", or "tokenize"
salt="optional_salt" # For hash mode
),
allowlist_tools=["decrypt_tool"], # Tools to exempt
allowlist_fields=["public_info"], # Fields to exempt
redact_inputs=True, # Redact arguments
redact_outputs=True # Redact responses
)
)
Redaction Modes:
- mask:
[REDACTED:EMAIL]
- hash:
[HASH:EMAIL:a1b2c3d4e5f6]
- tokenize:
<EMAIL_1>
XML to JSON Conversion Middleware
Automatically converts XML responses to JSON format.
from mcp_composer.middleware import FormatXml2Json
app.add_middleware(FormatXml2Json(mcp_composer=app))
Policy/ACL Middleware
Enforces access control policies using various backends.
from mcp_composer.middleware.acl.policy import PolicyMiddleware
from mcp_composer.middleware.acl.policy.file_enforcer import FilePolicyEnforcer
app.add_middleware(
PolicyMiddleware(
policy_enforcer=FilePolicyEnforcer(
policy_file="policies/basic_policy.json"
),
identity_manager=YourIdentityManager(),
enable_audit_logging=True
)
)
Middleware Best Practices
1. Order Matters
Middleware is executed in the order it's added. Consider the execution order:
# Add authentication first
app.add_middleware(AuthMiddleware())
# Then rate limiting
app.add_middleware(RateLimiterMiddleware(...))
# Then business logic middleware
app.add_middleware(CustomBusinessLogicMiddleware())
2. Error Handling
Always handle exceptions gracefully:
async def on_call_tool(self, context, call_next):
try:
result = await call_next(context)
return result
except Exception as e:
# Log the error
logger.error(f"Middleware error: {e}")
# Re-raise or handle appropriately
raise
3. Performance Considerations
- Keep middleware lightweight
- Use async operations when possible
- Cache expensive operations
- Avoid blocking operations
4. Configuration
Make middleware configurable:
class ConfigurableMiddleware(Middleware):
def __init__(self,
enabled: bool = True,
timeout: float = 30.0,
max_retries: int = 3):
self.enabled = enabled
self.timeout = timeout
self.max_retries = max_retries
5. Logging and Monitoring
Add observability to your middleware:
import logging
from mcp_composer.core.utils.logger import LoggerFactory
logger = LoggerFactory.get_logger()
class LoggingMiddleware(Middleware):
async def on_call_tool(self, context, call_next):
start_time = time.time()
tool_name = getattr(context.message, "name", "<unknown>")
logger.info(f"Starting tool call: {tool_name}")
try:
result = await call_next(context)
duration = time.time() - start_time
logger.info(f"Tool call completed: {tool_name} in {duration:.2f}s")
return result
except Exception as e:
duration = time.time() - start_time
logger.error(f"Tool call failed: {tool_name} after {duration:.2f}s - {e}")
raise
Testing Middleware
Test your middleware in isolation:
import pytest
from unittest.mock import AsyncMock, MagicMock
@pytest.mark.asyncio
async def test_my_middleware():
middleware = MyCustomMiddleware()
context = MagicMock()
context.message.name = "test_tool"
context.message.arguments = {"param": "value"}
call_next = AsyncMock(return_value={"result": "success"})
result = await middleware.on_call_tool(context, call_next)
assert result == {"result": "success"}
call_next.assert_called_once_with(context)
Common Patterns
Request/Response Transformation
async def on_call_tool(self, context, call_next):
# Transform request
original_args = context.message.arguments
context.message.arguments = self.transform_request(original_args)
try:
result = await call_next(context)
# Transform response
return self.transform_response(result)
finally:
# Restore original arguments
context.message.arguments = original_args
Conditional Processing
async def on_call_tool(self, context, call_next):
tool_name = getattr(context.message, "name", "")
if tool_name in self.exempt_tools:
return await call_next(context)
# Apply middleware logic only to non-exempt tools
return await self.process_with_middleware(context, call_next)
State Management
class StatefulMiddleware(Middleware):
def __init__(self):
self.request_count = 0
self.lock = asyncio.Lock()
async def on_call_tool(self, context, call_next):
async with self.lock:
self.request_count += 1
return await call_next(context)
This guide provides the foundation for creating and using middleware in MCP Composer. For more specific examples, see the Middleware Examples documentation.
For more details on middleware configuration, see the Middleware Configuration