Coverage for mcpgateway / middleware / request_logging_middleware.py: 100%
226 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""
3Location: ./mcpgateway/middleware/request_logging_middleware.py
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
7Request Logging Middleware.
9This module provides middleware for FastAPI to log incoming HTTP requests
10with sensitive data masking. It masks JWT tokens, passwords, and other
11sensitive information in headers and request bodies while preserving
12debugging information.
14Examples:
15 >>> from mcpgateway.middleware.request_logging_middleware import (
16 ... mask_sensitive_data, mask_jwt_in_cookies, mask_sensitive_headers, SENSITIVE_KEYS
17 ... )
19 Check that SENSITIVE_KEYS contains expected values:
20 >>> "password" in SENSITIVE_KEYS
21 True
22 >>> "token" in SENSITIVE_KEYS
23 True
24 >>> "authorization" in SENSITIVE_KEYS
25 True
27 Mask nested sensitive data:
28 >>> data = {"credentials": {"password": "secret", "username": "admin"}}
29 >>> masked = mask_sensitive_data(data)
30 >>> masked["credentials"]["password"]
31 '******'
32 >>> masked["credentials"]["username"]
33 'admin'
35 Test mask_jwt_in_cookies with various inputs:
36 >>> mask_jwt_in_cookies("access_token=xyz123; user=john")
37 'access_token=******; user=john'
39 Test mask_sensitive_headers with mixed headers:
40 >>> headers = {"Content-Type": "application/json", "secret": "mysecret"}
41 >>> result = mask_sensitive_headers(headers)
42 >>> result["Content-Type"]
43 'application/json'
44 >>> result["secret"]
45 '******'
46"""
48# Standard
49import logging
50import secrets
51import time
52from typing import Callable, List, Optional
54# Third-Party
55from fastapi.security import HTTPAuthorizationCredentials
56import orjson
57from starlette.middleware.base import BaseHTTPMiddleware
58from starlette.requests import Request
59from starlette.responses import Response
61# First-Party
62from mcpgateway.auth import get_current_user
63from mcpgateway.config import settings
64from mcpgateway.middleware.path_filter import should_skip_request_logging
65from mcpgateway.services.logging_service import LoggingService
66from mcpgateway.services.structured_logger import get_structured_logger
67from mcpgateway.utils.correlation_id import get_correlation_id
69# Initialize logging service first
70logging_service = LoggingService()
71logger = logging_service.get_logger(__name__)
73# Initialize structured logger for gateway boundary logging
74structured_logger = get_structured_logger("http_gateway")
76SENSITIVE_KEYS = frozenset({"password", "secret", "token", "apikey", "access_token", "refresh_token", "client_secret", "authorization", "jwt_token"})
79def mask_sensitive_data(data, max_depth: int = 10):
80 """Recursively mask sensitive keys in dict/list payloads with depth limit.
82 Args:
83 data: The data structure to mask (dict, list, or other)
84 max_depth: Maximum recursion depth to prevent stack overflow on deeply nested payloads
86 Returns:
87 The data structure with sensitive values masked
89 Examples:
90 >>> mask_sensitive_data({"username": "john", "password": "secret123"})
91 {'username': 'john', 'password': '******'}
93 >>> mask_sensitive_data({"user": {"name": "john", "token": "abc123"}})
94 {'user': {'name': 'john', 'token': '******'}}
96 >>> mask_sensitive_data([{"apikey": "key1"}, {"data": "safe"}])
97 [{'apikey': '******'}, {'data': 'safe'}]
99 >>> mask_sensitive_data("plain string")
100 'plain string'
102 >>> mask_sensitive_data({"level": {"nested": {}}}, max_depth=1)
103 {'level': '<nested too deep>'}
104 """
105 if max_depth <= 0:
106 return "<nested too deep>"
108 if isinstance(data, dict):
109 return {k: ("******" if k.lower() in SENSITIVE_KEYS else mask_sensitive_data(v, max_depth - 1)) for k, v in data.items()}
110 if isinstance(data, list):
111 return [mask_sensitive_data(i, max_depth - 1) for i in data]
112 return data
115def mask_jwt_in_cookies(cookie_header):
116 """Mask JWT tokens in cookie header while preserving other cookies.
118 Args:
119 cookie_header: The cookie header string to process
121 Returns:
122 Cookie header string with JWT tokens masked
124 Examples:
125 >>> mask_jwt_in_cookies("jwt_token=abc123; theme=dark")
126 'jwt_token=******; theme=dark'
128 >>> mask_jwt_in_cookies("session_id=xyz; auth_token=secret")
129 'session_id=******; auth_token=******'
131 >>> mask_jwt_in_cookies("user=john; preference=light")
132 'user=john; preference=light'
134 >>> mask_jwt_in_cookies("")
135 ''
137 >>> mask_jwt_in_cookies(None) is None
138 True
139 """
140 if not cookie_header:
141 return cookie_header
143 # Split cookies by semicolon
144 cookies = []
145 for cookie in cookie_header.split(";"):
146 cookie = cookie.strip()
147 if "=" in cookie:
148 name, _ = cookie.split("=", 1)
149 name = name.strip()
150 # Mask JWT tokens and other sensitive cookies
151 if any(sensitive in name.lower() for sensitive in ["jwt", "token", "auth", "session"]):
152 cookies.append(f"{name}=******")
153 else:
154 cookies.append(cookie)
155 else:
156 cookies.append(cookie)
158 return "; ".join(cookies)
161def mask_sensitive_headers(headers):
162 """Mask sensitive headers like Authorization.
164 Args:
165 headers: Dictionary of HTTP headers to mask
167 Returns:
168 Dictionary of headers with sensitive values masked
170 Examples:
171 >>> mask_sensitive_headers({"Authorization": "Bearer token123"})
172 {'Authorization': '******'}
174 >>> mask_sensitive_headers({"Content-Type": "application/json"})
175 {'Content-Type': 'application/json'}
177 >>> mask_sensitive_headers({"apikey": "secret", "X-Custom": "value"})
178 {'apikey': '******', 'X-Custom': 'value'}
180 >>> result = mask_sensitive_headers({"Cookie": "jwt_token=abc; theme=dark"})
181 >>> "******" in result["Cookie"]
182 True
183 """
184 masked_headers = {}
185 for key, value in headers.items():
186 key_lower = key.lower()
187 if key_lower in SENSITIVE_KEYS or "auth" in key_lower or "jwt" in key_lower:
188 masked_headers[key] = "******"
189 elif key_lower == "cookie":
190 # Special handling for cookies to mask only JWT tokens
191 masked_headers[key] = mask_jwt_in_cookies(value)
192 else:
193 masked_headers[key] = value
194 return masked_headers
197class RequestLoggingMiddleware(BaseHTTPMiddleware):
198 """Middleware for logging HTTP requests with sensitive data masking.
200 Logs incoming requests including method, path, headers, and body while
201 masking sensitive information like passwords, tokens, and authorization headers.
203 Examples:
204 >>> middleware = RequestLoggingMiddleware(
205 ... app=None,
206 ... enable_gateway_logging=True,
207 ... log_detailed_requests=True,
208 ... log_detailed_skip_endpoints=["/metrics", "/health"],
209 ... log_detailed_sample_rate=0.5,
210 ... )
211 >>> middleware.enable_gateway_logging
212 True
213 >>> middleware.log_detailed_requests
214 True
215 >>> middleware.log_detailed_skip_endpoints
216 ['/metrics', '/health']
217 >>> middleware.log_detailed_sample_rate
218 0.5
219 >>> middleware.log_resolve_user_identity
220 False
221 """
223 def __init__(
224 self,
225 app,
226 enable_gateway_logging: bool = True,
227 log_detailed_requests: bool = False,
228 log_level: str = "DEBUG",
229 max_body_size: Optional[int] = None,
230 log_request_start: bool = False,
231 log_resolve_user_identity: bool = False,
232 log_detailed_skip_endpoints: Optional[List[str]] = None,
233 log_detailed_sample_rate: float = 1.0,
234 ):
235 """Initialize the request logging middleware.
237 Args:
238 app: The FastAPI application instance
239 enable_gateway_logging: Whether to enable gateway boundary logging (request_started/completed)
240 log_detailed_requests: Whether to enable detailed request/response payload logging
241 log_level: The log level for requests (not used, logs at INFO)
242 max_body_size: Maximum request body size to log in bytes
243 log_request_start: Whether to log "request started" events (default: False for performance)
244 When False, only logs on request completion which halves logging overhead.
245 log_resolve_user_identity: If True, allow DB fallback to resolve user identity when no cached user
246 log_detailed_skip_endpoints: Optional list of path prefixes to skip detailed logging
247 log_detailed_sample_rate: Float in [0.0, 1.0] sampling rate for detailed logging
248 """
249 super().__init__(app)
250 self.enable_gateway_logging = enable_gateway_logging
251 self.log_detailed_requests = log_detailed_requests
252 self.log_level = log_level.upper()
253 # Use explicit configured value when provided, otherwise fall back to
254 # settings.log_detailed_max_body_size (configured in mcpgateway.config)
255 self.max_body_size = max_body_size if max_body_size is not None else settings.log_detailed_max_body_size
256 self.log_request_start = log_request_start
257 self.log_resolve_user_identity = log_resolve_user_identity
258 self.log_detailed_skip_endpoints = log_detailed_skip_endpoints or []
259 self.log_detailed_sample_rate = log_detailed_sample_rate
261 async def _resolve_user_identity(self, request: Request):
262 """Best-effort extraction of user identity for request logs.
264 Args:
265 request: The incoming HTTP request
267 Returns:
268 Tuple[Optional[str], Optional[str]]: User ID and email
269 """
270 # Prefer context injected by upstream middleware
271 if hasattr(request.state, "user") and request.state.user is not None:
272 raw_user_id = getattr(request.state.user, "id", None)
273 user_email = getattr(request.state.user, "email", None)
274 return (str(raw_user_id) if raw_user_id is not None else None, user_email)
276 # Fallback: try to authenticate using cookies/headers (matches AuthContextMiddleware)
277 # Respect configuration: avoid DB fallback unless explicitly allowed
278 if not self.log_resolve_user_identity:
279 return (None, None)
280 token = None
281 if request.cookies:
282 token = request.cookies.get("jwt_token") or request.cookies.get("access_token") or request.cookies.get("token")
284 if not token:
285 auth_header = request.headers.get("authorization")
286 if auth_header and auth_header.startswith("Bearer "):
287 token = auth_header.replace("Bearer ", "")
289 if not token:
290 return (None, None)
292 try:
293 credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
294 # get_current_user now uses fresh DB sessions internally
295 user = await get_current_user(credentials)
296 raw_user_id = getattr(user, "id", None)
297 user_email = getattr(user, "email", None)
298 return (str(raw_user_id) if raw_user_id is not None else None, user_email)
299 except Exception:
300 return (None, None)
302 async def dispatch(self, request: Request, call_next: Callable):
303 """Process incoming request and log details with sensitive data masked.
305 Args:
306 request: The incoming HTTP request
307 call_next: Function to call the next middleware/handler
309 Returns:
310 Response: The HTTP response from downstream handlers
312 Raises:
313 Exception: Any exception from downstream handlers is re-raised
314 """
315 # Track start time for total duration
316 start_time = time.time()
318 # Get basic request metadata (cheap operations)
319 path = request.url.path
320 method = request.method
322 # Determine logging needs BEFORE expensive operations
323 should_log_boundary = self.enable_gateway_logging and not should_skip_request_logging(path)
324 should_log_detailed = self.log_detailed_requests and not should_skip_request_logging(path)
326 # Honor middleware-level configured skip endpoints for detailed logging
327 if should_log_detailed and self.log_detailed_skip_endpoints:
328 for prefix in self.log_detailed_skip_endpoints:
329 if path.startswith(prefix):
330 should_log_detailed = False
331 break
333 # Sampling fast path: avoid detailed logging for sampled-out requests
334 if should_log_detailed and self.log_detailed_sample_rate < 1.0:
335 try:
336 # Use the cryptographically secure `secrets` module to avoid
337 # bandit/DUO warnings about insecure RNGs. Sampling here does
338 # not require crypto strength, but using `secrets` keeps
339 # security scanners happy.
341 r = secrets.randbelow(10**9) / 1e9
342 if r >= self.log_detailed_sample_rate:
343 should_log_detailed = False
344 except Exception as e:
345 # If sampling fails for any reason, default to logging and
346 # record the incident for diagnostics.
347 logger.debug(f"Sampling failed, defaulting to log: {e}")
349 # Fast path: if no logging needed at all, skip everything
350 if not should_log_boundary and not should_log_detailed:
351 return await call_next(request)
353 # Get correlation ID and additional metadata (only if we're logging)
354 correlation_id = get_correlation_id()
355 user_agent = request.headers.get("user-agent", "unknown")
356 client_ip = request.client.host if request.client else "unknown"
358 # Only resolve user identity if we're actually going to log boundary events
359 # This avoids potential DB queries for skipped paths and detailed-only flows
360 user_id: Optional[str] = None
361 user_email: Optional[str] = None
362 if should_log_boundary:
363 user_id, user_email = await self._resolve_user_identity(request)
364 elif should_log_detailed and hasattr(request.state, "user") and request.state.user is not None:
365 # Detailed logs: only use cached user identity, avoid DB fallback
366 raw_user_id = getattr(request.state.user, "id", None)
367 user_id = str(raw_user_id) if raw_user_id is not None else None
368 user_email = getattr(request.state.user, "email", None)
370 # Log gateway request started (optional - disabled by default for performance)
371 if should_log_boundary and self.log_request_start:
372 try:
373 structured_logger.log(
374 level="INFO",
375 message=f"Request started: {method} {path}",
376 correlation_id=correlation_id,
377 user_email=user_email,
378 user_id=user_id,
379 operation_type="http_request",
380 request_method=method,
381 request_path=path,
382 user_agent=user_agent,
383 client_ip=client_ip,
384 metadata={"event": "request_started", "query_params": str(request.query_params) if request.query_params else None},
385 )
386 except Exception as e:
387 logger.warning(f"Failed to log request start: {e}")
389 # Skip detailed logging if disabled (already checked via should_log_detailed)
390 if not should_log_detailed:
391 response = await call_next(request)
393 # Still log request completed even if detailed logging is disabled.
394 #
395 # Note: reaching this block means we didn't take the early return where both
396 # boundary and detailed logging are disabled, so boundary logging is required.
397 duration_ms = (time.time() - start_time) * 1000
398 try:
399 log_level = "ERROR" if response.status_code >= 500 else "WARNING" if response.status_code >= 400 else "INFO"
400 structured_logger.log(
401 level=log_level,
402 message=f"Request completed: {method} {path} - {response.status_code}",
403 correlation_id=correlation_id,
404 user_email=user_email,
405 user_id=user_id,
406 operation_type="http_request",
407 request_method=method,
408 request_path=path,
409 response_status_code=response.status_code,
410 user_agent=user_agent,
411 client_ip=client_ip,
412 duration_ms=duration_ms,
413 metadata={"event": "request_completed", "response_time_category": "fast" if duration_ms < 100 else "normal" if duration_ms < 1000 else "slow"},
414 )
415 except Exception as e:
416 logger.warning(f"Failed to log request completion: {e}")
418 return response
420 # Always log at INFO level for request payloads to ensure visibility
421 log_level = logging.INFO
423 # Skip if logger level is higher than INFO
424 if not logger.isEnabledFor(log_level):
425 return await call_next(request)
427 # Size-based fast path: skip detailed processing for very large bodies
428 content_length_header = request.headers.get("content-length")
429 if content_length_header:
430 try:
431 content_length = int(content_length_header)
432 # Skip if body is >4x over limit (not worth reading/parsing)
433 if content_length > self.max_body_size * 4:
434 # Log placeholder without reading body
435 masked_headers = mask_sensitive_headers(dict(request.headers))
436 request_id = get_correlation_id()
437 try:
438 logger.log(
439 log_level,
440 f"📩 Incoming request: {request.method} {request.url.path}\n"
441 f"Query params: {dict(request.query_params)}\n"
442 f"Headers: {masked_headers}\n"
443 f"Body: <body too large: {content_length} bytes>",
444 extra={"request_id": request_id},
445 )
446 except TypeError:
447 logger.log(
448 log_level,
449 f"📩 Incoming request: {request.method} {request.url.path}\n"
450 f"Query params: {dict(request.query_params)}\n"
451 f"Headers: {masked_headers}\n"
452 f"Body: <body too large: {content_length} bytes>",
453 )
455 # Continue with request processing (boundary logging handled below)
456 try:
457 response = await call_next(request)
458 except Exception as e:
459 duration_ms = (time.time() - start_time) * 1000
460 if should_log_boundary:
461 try:
462 structured_logger.log(
463 level="ERROR",
464 message=f"Request failed: {method} {path}",
465 correlation_id=correlation_id,
466 user_email=user_email,
467 user_id=user_id,
468 operation_type="http_request",
469 request_method=method,
470 request_path=path,
471 user_agent=user_agent,
472 client_ip=client_ip,
473 duration_ms=duration_ms,
474 error=e,
475 metadata={"event": "request_failed"},
476 )
477 except Exception as log_error:
478 logger.warning(f"Failed to log request failure: {log_error}")
479 raise
481 # Log boundary completion for large body requests
482 if should_log_boundary:
483 duration_ms = (time.time() - start_time) * 1000
484 try:
485 boundary_log_level = "ERROR" if response.status_code >= 500 else "WARNING" if response.status_code >= 400 else "INFO"
486 structured_logger.log(
487 level=boundary_log_level,
488 message=f"Request completed: {method} {path} - {response.status_code}",
489 correlation_id=correlation_id,
490 user_email=user_email,
491 user_id=user_id,
492 operation_type="http_request",
493 request_method=method,
494 request_path=path,
495 response_status_code=response.status_code,
496 user_agent=user_agent,
497 client_ip=client_ip,
498 duration_ms=duration_ms,
499 metadata={"event": "request_completed", "response_time_category": self._categorize_response_time(duration_ms)},
500 )
501 except Exception as e:
502 logger.warning(f"Failed to log request completion: {e}")
504 return response
505 except ValueError:
506 pass # Invalid content-length, continue with normal processing
508 body = b""
509 try:
510 body = await request.body()
511 # Avoid logging huge bodies
512 if len(body) > self.max_body_size:
513 truncated = True
514 body_to_log = body[: self.max_body_size]
515 else:
516 truncated = False
517 body_to_log = body
519 payload = body_to_log.decode("utf-8", errors="ignore").strip()
520 if payload:
521 try:
522 json_payload = orjson.loads(payload)
523 payload_to_log = mask_sensitive_data(json_payload)
524 # Use orjson without indent for performance (compact output)
525 payload_str = orjson.dumps(payload_to_log).decode()
526 except orjson.JSONDecodeError:
527 # For non-JSON payloads, still mask potential sensitive data
528 payload_str = payload
529 for sensitive_key in SENSITIVE_KEYS:
530 if sensitive_key in payload_str.lower():
531 payload_str = "<contains sensitive data - masked>"
532 break
533 else:
534 payload_str = "<empty>"
536 # Mask sensitive headers
537 masked_headers = mask_sensitive_headers(dict(request.headers))
539 # Get correlation ID for request tracking
540 request_id = get_correlation_id()
542 # Try to log with extra parameter, fall back to without if not supported
543 try:
544 logger.log(
545 log_level,
546 f"📩 Incoming request: {request.method} {request.url.path}\n"
547 f"Query params: {dict(request.query_params)}\n"
548 f"Headers: {masked_headers}\n"
549 f"Body: {payload_str}{'... [truncated]' if truncated else ''}",
550 extra={"request_id": request_id},
551 )
552 except TypeError:
553 # Fall back for test loggers that don't accept extra parameter
554 logger.log(
555 log_level,
556 f"📩 Incoming request: {request.method} {request.url.path}\n"
557 f"Query params: {dict(request.query_params)}\n"
558 f"Headers: {masked_headers}\n"
559 f"Body: {payload_str}{'... [truncated]' if truncated else ''}",
560 )
562 except Exception as e:
563 logger.warning(f"Failed to log request body: {e}")
565 # Recreate request stream for downstream handlers
566 async def receive():
567 """Recreate request body for downstream handlers.
569 Returns:
570 dict: ASGI receive message with request body
571 """
572 return {"type": "http.request", "body": body, "more_body": False}
574 # Create new request with the body we've already read
575 new_scope = request.scope.copy()
576 new_request = Request(new_scope, receive=receive)
578 # Process request
579 try:
580 response: Response = await call_next(new_request)
581 status_code = response.status_code
582 except Exception as e:
583 duration_ms = (time.time() - start_time) * 1000
585 # Log request failed
586 if should_log_boundary:
587 try:
588 structured_logger.log(
589 level="ERROR",
590 message=f"Request failed: {method} {path}",
591 correlation_id=correlation_id,
592 user_email=user_email,
593 user_id=user_id,
594 operation_type="http_request",
595 request_method=method,
596 request_path=path,
597 user_agent=user_agent,
598 client_ip=client_ip,
599 duration_ms=duration_ms,
600 error=e,
601 metadata={"event": "request_failed"},
602 )
603 except Exception as log_error:
604 logger.warning(f"Failed to log request failure: {log_error}")
606 raise
608 # Calculate total duration
609 duration_ms = (time.time() - start_time) * 1000
611 # Log gateway request completed
612 if should_log_boundary:
613 try:
614 log_level = "ERROR" if status_code >= 500 else "WARNING" if status_code >= 400 else "INFO"
616 structured_logger.log(
617 level=log_level,
618 message=f"Request completed: {method} {path} - {status_code}",
619 correlation_id=correlation_id,
620 user_email=user_email,
621 user_id=user_id,
622 operation_type="http_request",
623 request_method=method,
624 request_path=path,
625 response_status_code=status_code,
626 user_agent=user_agent,
627 client_ip=client_ip,
628 duration_ms=duration_ms,
629 metadata={"event": "request_completed", "response_time_category": self._categorize_response_time(duration_ms)},
630 )
631 except Exception as e:
632 logger.warning(f"Failed to log request completion: {e}")
634 return response
636 @staticmethod
637 def _categorize_response_time(duration_ms: float) -> str:
638 """Categorize response time for analytics.
640 Args:
641 duration_ms: Response time in milliseconds
643 Returns:
644 Category string
645 """
646 if duration_ms < 100:
647 return "fast"
648 if duration_ms < 500:
649 return "normal"
650 if duration_ms < 2000:
651 return "slow"
652 return "very_slow"