Coverage for mcpgateway / middleware / request_logging_middleware.py: 100%
243 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""
3Location: ./mcpgateway/middleware/request_logging_middleware.py
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
7Request Logging Middleware.
9This module provides middleware for FastAPI to log incoming HTTP requests
10with sensitive data masking. It masks JWT tokens, passwords, and other
11sensitive information in headers and request bodies while preserving
12debugging information.
14Examples:
15 >>> from mcpgateway.middleware.request_logging_middleware import (
16 ... mask_sensitive_data, mask_jwt_in_cookies, mask_sensitive_headers, SENSITIVE_KEYS
17 ... )
19 Check that SENSITIVE_KEYS contains expected values:
20 >>> "password" in SENSITIVE_KEYS
21 True
22 >>> "token" in SENSITIVE_KEYS
23 True
24 >>> "authorization" in SENSITIVE_KEYS
25 True
27 Mask nested sensitive data:
28 >>> data = {"credentials": {"password": "secret", "username": "admin"}}
29 >>> masked = mask_sensitive_data(data)
30 >>> masked["credentials"]["password"]
31 '******'
32 >>> masked["credentials"]["username"]
33 'admin'
35 Test mask_jwt_in_cookies with various inputs:
36 >>> mask_jwt_in_cookies("access_token=xyz123; user=john")
37 'access_token=******; user=john'
39 Test mask_sensitive_headers with mixed headers:
40 >>> headers = {"Content-Type": "application/json", "secret": "mysecret"}
41 >>> result = mask_sensitive_headers(headers)
42 >>> result["Content-Type"]
43 'application/json'
44 >>> result["secret"]
45 '******'
46"""
48# Standard
49import logging
50import re
51import secrets
52import time
53from typing import Callable, List, Optional
55# Third-Party
56from fastapi.security import HTTPAuthorizationCredentials
57import orjson
58from starlette.middleware.base import BaseHTTPMiddleware
59from starlette.requests import Request
60from starlette.responses import Response
62# First-Party
63from mcpgateway.auth import get_current_user
64from mcpgateway.config import settings
65from mcpgateway.middleware.path_filter import should_skip_request_logging
66from mcpgateway.services.logging_service import LoggingService
67from mcpgateway.services.structured_logger import get_structured_logger
68from mcpgateway.utils.correlation_id import get_correlation_id
70# Initialize logging service first
71logging_service = LoggingService()
72logger = logging_service.get_logger(__name__)
74# Initialize structured logger for gateway boundary logging
75structured_logger = get_structured_logger("http_gateway")
77SENSITIVE_KEYS = frozenset(
78 {
79 "password",
80 "passphrase",
81 "secret",
82 "token",
83 "api_key",
84 "apikey",
85 "access_token",
86 "refresh_token",
87 "client_secret",
88 "authorization",
89 "auth_token",
90 "jwt_token",
91 "private_key",
92 }
93)
94_NON_SENSITIVE_KEY_SUFFIXES = (
95 "_count",
96 "_counts",
97 "_size",
98 "_length",
99 "_ttl",
100 "_seconds",
101 "_ms",
102 "_id",
103 "_ids",
104 "_name",
105 "_type",
106 "_url",
107 "_uri",
108 "_path",
109 "_status",
110 "_code",
111)
112_SENSITIVE_KEY_PATTERN = re.compile(
113 r"(^|_)(password|passphrase|secret|token|apikey|api_key|access_token|refresh_token|client_secret|authorization|auth_token|jwt_token|private_key)($|_)",
114 re.IGNORECASE,
115)
116_AUTHENTICATION_KEYWORD_PATTERN = re.compile(r"(^|_)(auth|authorization|jwt)(_|\Z)")
119def _normalize_key_for_masking(key: str) -> str:
120 """Normalize key names for robust sensitive-key matching.
122 Args:
123 key: Original key name from request payload or headers.
125 Returns:
126 Lowercased, underscore-delimited key name for pattern checks.
127 """
128 key_with_boundaries = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", str(key))
129 normalized = re.sub(r"[^a-zA-Z0-9]+", "_", key_with_boundaries)
130 return normalized.strip("_").lower()
133def _is_sensitive_key(key: str) -> bool:
134 """Return ``True`` when a key name should be masked.
136 Args:
137 key: Candidate key to evaluate.
139 Returns:
140 ``True`` when the key matches sensitive-key heuristics; otherwise ``False``.
141 """
142 normalized_key = _normalize_key_for_masking(key)
143 if not normalized_key:
144 return False
146 has_nonsensitive_suffix = any(normalized_key.endswith(suffix) for suffix in _NON_SENSITIVE_KEY_SUFFIXES)
148 if normalized_key in SENSITIVE_KEYS:
149 return True
151 if _AUTHENTICATION_KEYWORD_PATTERN.search(normalized_key) and not has_nonsensitive_suffix:
152 return True
154 if has_nonsensitive_suffix:
155 return False
157 return bool(_SENSITIVE_KEY_PATTERN.search(normalized_key))
160def mask_sensitive_data(data, max_depth: int = 10):
161 """Recursively mask sensitive keys in dict/list payloads with depth limit.
163 Args:
164 data: The data structure to mask (dict, list, or other)
165 max_depth: Maximum recursion depth to prevent stack overflow on deeply nested payloads
167 Returns:
168 The data structure with sensitive values masked
170 Examples:
171 >>> mask_sensitive_data({"username": "john", "password": "secret123"})
172 {'username': 'john', 'password': '******'}
174 >>> mask_sensitive_data({"user": {"name": "john", "token": "abc123"}})
175 {'user': {'name': 'john', 'token': '******'}}
177 >>> mask_sensitive_data([{"apikey": "key1"}, {"data": "safe"}])
178 [{'apikey': '******'}, {'data': 'safe'}]
180 >>> mask_sensitive_data("plain string")
181 'plain string'
183 >>> mask_sensitive_data({"level": {"nested": {}}}, max_depth=1)
184 {'level': '<nested too deep>'}
185 """
186 if max_depth <= 0:
187 return "<nested too deep>"
189 if isinstance(data, dict):
190 return {k: ("******" if _is_sensitive_key(k) else mask_sensitive_data(v, max_depth - 1)) for k, v in data.items()}
191 if isinstance(data, list):
192 return [mask_sensitive_data(i, max_depth - 1) for i in data]
193 return data
196def mask_jwt_in_cookies(cookie_header):
197 """Mask JWT tokens in cookie header while preserving other cookies.
199 Args:
200 cookie_header: The cookie header string to process
202 Returns:
203 Cookie header string with JWT tokens masked
205 Examples:
206 >>> mask_jwt_in_cookies("jwt_token=abc123; theme=dark")
207 'jwt_token=******; theme=dark'
209 >>> mask_jwt_in_cookies("session_id=xyz; auth_token=secret")
210 'session_id=******; auth_token=******'
212 >>> mask_jwt_in_cookies("user=john; preference=light")
213 'user=john; preference=light'
215 >>> mask_jwt_in_cookies("")
216 ''
218 >>> mask_jwt_in_cookies(None) is None
219 True
220 """
221 if not cookie_header:
222 return cookie_header
224 # Split cookies by semicolon
225 cookies = []
226 for cookie in cookie_header.split(";"):
227 cookie = cookie.strip()
228 if "=" in cookie:
229 name, _ = cookie.split("=", 1)
230 name = name.strip()
231 # Mask JWT tokens and other sensitive cookies
232 if any(sensitive in name.lower() for sensitive in ["jwt", "token", "auth", "session"]):
233 cookies.append(f"{name}=******")
234 else:
235 cookies.append(cookie)
236 else:
237 cookies.append(cookie)
239 return "; ".join(cookies)
242def mask_sensitive_headers(headers):
243 """Mask sensitive headers like Authorization.
245 Args:
246 headers: Dictionary of HTTP headers to mask
248 Returns:
249 Dictionary of headers with sensitive values masked
251 Examples:
252 >>> mask_sensitive_headers({"Authorization": "Bearer token123"})
253 {'Authorization': '******'}
255 >>> mask_sensitive_headers({"Content-Type": "application/json"})
256 {'Content-Type': 'application/json'}
258 >>> mask_sensitive_headers({"apikey": "secret", "X-Custom": "value"})
259 {'apikey': '******', 'X-Custom': 'value'}
261 >>> result = mask_sensitive_headers({"Cookie": "jwt_token=abc; theme=dark"})
262 >>> "******" in result["Cookie"]
263 True
264 """
265 masked_headers = {}
266 for key, value in headers.items():
267 key_lower = key.lower()
268 if _is_sensitive_key(key):
269 masked_headers[key] = "******"
270 elif key_lower == "cookie":
271 # Special handling for cookies to mask only JWT tokens
272 masked_headers[key] = mask_jwt_in_cookies(value)
273 else:
274 masked_headers[key] = value
275 return masked_headers
278class RequestLoggingMiddleware(BaseHTTPMiddleware):
279 """Middleware for logging HTTP requests with sensitive data masking.
281 Logs incoming requests including method, path, headers, and body while
282 masking sensitive information like passwords, tokens, and authorization headers.
284 Examples:
285 >>> middleware = RequestLoggingMiddleware(
286 ... app=None,
287 ... enable_gateway_logging=True,
288 ... log_detailed_requests=True,
289 ... log_detailed_skip_endpoints=["/metrics", "/health"],
290 ... log_detailed_sample_rate=0.5,
291 ... )
292 >>> middleware.enable_gateway_logging
293 True
294 >>> middleware.log_detailed_requests
295 True
296 >>> middleware.log_detailed_skip_endpoints
297 ['/metrics', '/health']
298 >>> middleware.log_detailed_sample_rate
299 0.5
300 >>> middleware.log_resolve_user_identity
301 False
302 """
304 def __init__(
305 self,
306 app,
307 enable_gateway_logging: bool = True,
308 log_detailed_requests: bool = False,
309 log_level: str = "DEBUG",
310 max_body_size: Optional[int] = None,
311 log_request_start: bool = False,
312 log_resolve_user_identity: bool = False,
313 log_detailed_skip_endpoints: Optional[List[str]] = None,
314 log_detailed_sample_rate: float = 1.0,
315 ):
316 """Initialize the request logging middleware.
318 Args:
319 app: The FastAPI application instance
320 enable_gateway_logging: Whether to enable gateway boundary logging (request_started/completed)
321 log_detailed_requests: Whether to enable detailed request/response payload logging
322 log_level: The log level for requests (not used, logs at INFO)
323 max_body_size: Maximum request body size to log in bytes
324 log_request_start: Whether to log "request started" events (default: False for performance)
325 When False, only logs on request completion which halves logging overhead.
326 log_resolve_user_identity: If True, allow DB fallback to resolve user identity when no cached user
327 log_detailed_skip_endpoints: Optional list of path prefixes to skip detailed logging
328 log_detailed_sample_rate: Float in [0.0, 1.0] sampling rate for detailed logging
329 """
330 super().__init__(app)
331 self.enable_gateway_logging = enable_gateway_logging
332 self.log_detailed_requests = log_detailed_requests
333 self.log_level = log_level.upper()
334 # Use explicit configured value when provided, otherwise fall back to
335 # settings.log_detailed_max_body_size (configured in mcpgateway.config)
336 self.max_body_size = max_body_size if max_body_size is not None else settings.log_detailed_max_body_size
337 self.log_request_start = log_request_start
338 self.log_resolve_user_identity = log_resolve_user_identity
339 self.log_detailed_skip_endpoints = log_detailed_skip_endpoints or []
340 self.log_detailed_sample_rate = log_detailed_sample_rate
342 async def _resolve_user_identity(self, request: Request):
343 """Best-effort extraction of user identity for request logs.
345 Args:
346 request: The incoming HTTP request
348 Returns:
349 Tuple[Optional[str], Optional[str]]: User ID and email
350 """
351 # Prefer context injected by upstream middleware
352 if hasattr(request.state, "user") and request.state.user is not None:
353 raw_user_id = getattr(request.state.user, "id", None)
354 user_email = getattr(request.state.user, "email", None)
355 return (str(raw_user_id) if raw_user_id is not None else None, user_email)
357 # Fallback: try to authenticate using cookies/headers (matches AuthContextMiddleware)
358 # Respect configuration: avoid DB fallback unless explicitly allowed
359 if not self.log_resolve_user_identity:
360 return (None, None)
361 token = None
362 if request.cookies:
363 token = request.cookies.get("jwt_token") or request.cookies.get("access_token") or request.cookies.get("token")
365 if not token:
366 auth_header = request.headers.get("authorization")
367 if auth_header and auth_header.startswith("Bearer "):
368 token = auth_header.replace("Bearer ", "")
370 if not token:
371 return (None, None)
373 try:
374 credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
375 # get_current_user now uses fresh DB sessions internally
376 user = await get_current_user(credentials)
377 raw_user_id = getattr(user, "id", None)
378 user_email = getattr(user, "email", None)
379 return (str(raw_user_id) if raw_user_id is not None else None, user_email)
380 except Exception:
381 return (None, None)
383 async def dispatch(self, request: Request, call_next: Callable):
384 """Process incoming request and log details with sensitive data masked.
386 Args:
387 request: The incoming HTTP request
388 call_next: Function to call the next middleware/handler
390 Returns:
391 Response: The HTTP response from downstream handlers
393 Raises:
394 Exception: Any exception from downstream handlers is re-raised
395 """
396 # Track start time for total duration
397 start_time = time.time()
399 # Get basic request metadata (cheap operations)
400 path = request.url.path
401 method = request.method
403 # Determine logging needs BEFORE expensive operations
404 should_log_boundary = self.enable_gateway_logging and not should_skip_request_logging(path)
405 should_log_detailed = self.log_detailed_requests and not should_skip_request_logging(path)
407 # Honor middleware-level configured skip endpoints for detailed logging
408 if should_log_detailed and self.log_detailed_skip_endpoints:
409 for prefix in self.log_detailed_skip_endpoints:
410 if path.startswith(prefix):
411 should_log_detailed = False
412 break
414 # Sampling fast path: avoid detailed logging for sampled-out requests
415 if should_log_detailed and self.log_detailed_sample_rate < 1.0:
416 try:
417 # Use the cryptographically secure `secrets` module to avoid
418 # bandit/DUO warnings about insecure RNGs. Sampling here does
419 # not require crypto strength, but using `secrets` keeps
420 # security scanners happy.
422 r = secrets.randbelow(10**9) / 1e9
423 if r >= self.log_detailed_sample_rate:
424 should_log_detailed = False
425 except Exception as e:
426 # If sampling fails for any reason, default to logging and
427 # record the incident for diagnostics.
428 logger.debug(f"Sampling failed, defaulting to log: {e}")
430 # Fast path: if no logging needed at all, skip everything
431 if not should_log_boundary and not should_log_detailed:
432 return await call_next(request)
434 # Get correlation ID and additional metadata (only if we're logging)
435 correlation_id = get_correlation_id()
436 user_agent = request.headers.get("user-agent", "unknown")
437 client_ip = request.client.host if request.client else "unknown"
439 # Only resolve user identity if we're actually going to log boundary events
440 # This avoids potential DB queries for skipped paths and detailed-only flows
441 user_id: Optional[str] = None
442 user_email: Optional[str] = None
443 if should_log_boundary:
444 user_id, user_email = await self._resolve_user_identity(request)
445 elif should_log_detailed and hasattr(request.state, "user") and request.state.user is not None:
446 # Detailed logs: only use cached user identity, avoid DB fallback
447 raw_user_id = getattr(request.state.user, "id", None)
448 user_id = str(raw_user_id) if raw_user_id is not None else None
449 user_email = getattr(request.state.user, "email", None)
451 # Log gateway request started (optional - disabled by default for performance)
452 if should_log_boundary and self.log_request_start:
453 try:
454 structured_logger.log(
455 level="INFO",
456 message=f"Request started: {method} {path}",
457 correlation_id=correlation_id,
458 user_email=user_email,
459 user_id=user_id,
460 operation_type="http_request",
461 request_method=method,
462 request_path=path,
463 user_agent=user_agent,
464 client_ip=client_ip,
465 metadata={"event": "request_started", "query_params": str(request.query_params) if request.query_params else None},
466 )
467 except Exception as e:
468 logger.warning(f"Failed to log request start: {e}")
470 # Skip detailed logging if disabled (already checked via should_log_detailed)
471 if not should_log_detailed:
472 response = await call_next(request)
474 # Still log request completed even if detailed logging is disabled.
475 #
476 # Note: reaching this block means we didn't take the early return where both
477 # boundary and detailed logging are disabled, so boundary logging is required.
478 duration_ms = (time.time() - start_time) * 1000
479 try:
480 log_level = "ERROR" if response.status_code >= 500 else "WARNING" if response.status_code >= 400 else "INFO"
481 structured_logger.log(
482 level=log_level,
483 message=f"Request completed: {method} {path} - {response.status_code}",
484 correlation_id=correlation_id,
485 user_email=user_email,
486 user_id=user_id,
487 operation_type="http_request",
488 request_method=method,
489 request_path=path,
490 response_status_code=response.status_code,
491 user_agent=user_agent,
492 client_ip=client_ip,
493 duration_ms=duration_ms,
494 metadata={"event": "request_completed", "response_time_category": "fast" if duration_ms < 100 else "normal" if duration_ms < 1000 else "slow"},
495 )
496 except Exception as e:
497 logger.warning(f"Failed to log request completion: {e}")
499 return response
501 # Always log at INFO level for request payloads to ensure visibility
502 log_level = logging.INFO
504 # Skip if logger level is higher than INFO
505 if not logger.isEnabledFor(log_level):
506 return await call_next(request)
508 # Size-based fast path: skip detailed processing for very large bodies
509 content_length_header = request.headers.get("content-length")
510 if content_length_header:
511 try:
512 content_length = int(content_length_header)
513 # Skip if body is >4x over limit (not worth reading/parsing)
514 if content_length > self.max_body_size * 4:
515 # Log placeholder without reading body
516 masked_headers = mask_sensitive_headers(dict(request.headers))
517 request_id = get_correlation_id()
518 try:
519 logger.log(
520 log_level,
521 f"📩 Incoming request: {request.method} {request.url.path}\n"
522 f"Query params: {dict(request.query_params)}\n"
523 f"Headers: {masked_headers}\n"
524 f"Body: <body too large: {content_length} bytes>",
525 extra={"request_id": request_id},
526 )
527 except TypeError:
528 logger.log(
529 log_level,
530 f"📩 Incoming request: {request.method} {request.url.path}\n"
531 f"Query params: {dict(request.query_params)}\n"
532 f"Headers: {masked_headers}\n"
533 f"Body: <body too large: {content_length} bytes>",
534 )
536 # Continue with request processing (boundary logging handled below)
537 try:
538 response = await call_next(request)
539 except Exception as e:
540 duration_ms = (time.time() - start_time) * 1000
541 if should_log_boundary:
542 try:
543 structured_logger.log(
544 level="ERROR",
545 message=f"Request failed: {method} {path}",
546 correlation_id=correlation_id,
547 user_email=user_email,
548 user_id=user_id,
549 operation_type="http_request",
550 request_method=method,
551 request_path=path,
552 user_agent=user_agent,
553 client_ip=client_ip,
554 duration_ms=duration_ms,
555 error=e,
556 metadata={"event": "request_failed"},
557 )
558 except Exception as log_error:
559 logger.warning(f"Failed to log request failure: {log_error}")
560 raise
562 # Log boundary completion for large body requests
563 if should_log_boundary:
564 duration_ms = (time.time() - start_time) * 1000
565 try:
566 boundary_log_level = "ERROR" if response.status_code >= 500 else "WARNING" if response.status_code >= 400 else "INFO"
567 structured_logger.log(
568 level=boundary_log_level,
569 message=f"Request completed: {method} {path} - {response.status_code}",
570 correlation_id=correlation_id,
571 user_email=user_email,
572 user_id=user_id,
573 operation_type="http_request",
574 request_method=method,
575 request_path=path,
576 response_status_code=response.status_code,
577 user_agent=user_agent,
578 client_ip=client_ip,
579 duration_ms=duration_ms,
580 metadata={"event": "request_completed", "response_time_category": self._categorize_response_time(duration_ms)},
581 )
582 except Exception as e:
583 logger.warning(f"Failed to log request completion: {e}")
585 return response
586 except ValueError:
587 pass # Invalid content-length, continue with normal processing
589 body = b""
590 try:
591 body = await request.body()
592 # Avoid logging huge bodies
593 if len(body) > self.max_body_size:
594 truncated = True
595 body_to_log = body[: self.max_body_size]
596 else:
597 truncated = False
598 body_to_log = body
600 payload = body_to_log.decode("utf-8", errors="ignore").strip()
601 if payload:
602 try:
603 json_payload = orjson.loads(payload)
604 payload_to_log = mask_sensitive_data(json_payload)
605 # Use orjson without indent for performance (compact output)
606 payload_str = orjson.dumps(payload_to_log).decode()
607 except orjson.JSONDecodeError:
608 # For non-JSON payloads, still mask potential sensitive data
609 payload_str = payload
610 for sensitive_key in SENSITIVE_KEYS:
611 if sensitive_key in payload_str.lower():
612 payload_str = "<contains sensitive data - masked>"
613 break
614 else:
615 payload_str = "<empty>"
617 # Mask sensitive headers
618 masked_headers = mask_sensitive_headers(dict(request.headers))
620 # Get correlation ID for request tracking
621 request_id = get_correlation_id()
623 # Try to log with extra parameter, fall back to without if not supported
624 try:
625 logger.log(
626 log_level,
627 f"📩 Incoming request: {request.method} {request.url.path}\n"
628 f"Query params: {dict(request.query_params)}\n"
629 f"Headers: {masked_headers}\n"
630 f"Body: {payload_str}{'... [truncated]' if truncated else ''}",
631 extra={"request_id": request_id},
632 )
633 except TypeError:
634 # Fall back for test loggers that don't accept extra parameter
635 logger.log(
636 log_level,
637 f"📩 Incoming request: {request.method} {request.url.path}\n"
638 f"Query params: {dict(request.query_params)}\n"
639 f"Headers: {masked_headers}\n"
640 f"Body: {payload_str}{'... [truncated]' if truncated else ''}",
641 )
643 except Exception as e:
644 logger.warning(f"Failed to log request body: {e}")
646 # Cache body on original request so downstream handlers can re-read safely.
647 request._body = body # pylint: disable=protected-access
649 # Process request
650 try:
651 response: Response = await call_next(request)
652 status_code = response.status_code
653 except Exception as e:
654 duration_ms = (time.time() - start_time) * 1000
656 # Log request failed
657 if should_log_boundary:
658 try:
659 structured_logger.log(
660 level="ERROR",
661 message=f"Request failed: {method} {path}",
662 correlation_id=correlation_id,
663 user_email=user_email,
664 user_id=user_id,
665 operation_type="http_request",
666 request_method=method,
667 request_path=path,
668 user_agent=user_agent,
669 client_ip=client_ip,
670 duration_ms=duration_ms,
671 error=e,
672 metadata={"event": "request_failed"},
673 )
674 except Exception as log_error:
675 logger.warning(f"Failed to log request failure: {log_error}")
677 raise
679 # Calculate total duration
680 duration_ms = (time.time() - start_time) * 1000
682 # Log gateway request completed
683 if should_log_boundary:
684 try:
685 log_level = "ERROR" if status_code >= 500 else "WARNING" if status_code >= 400 else "INFO"
687 structured_logger.log(
688 level=log_level,
689 message=f"Request completed: {method} {path} - {status_code}",
690 correlation_id=correlation_id,
691 user_email=user_email,
692 user_id=user_id,
693 operation_type="http_request",
694 request_method=method,
695 request_path=path,
696 response_status_code=status_code,
697 user_agent=user_agent,
698 client_ip=client_ip,
699 duration_ms=duration_ms,
700 metadata={"event": "request_completed", "response_time_category": self._categorize_response_time(duration_ms)},
701 )
702 except Exception as e:
703 logger.warning(f"Failed to log request completion: {e}")
705 return response
707 @staticmethod
708 def _categorize_response_time(duration_ms: float) -> str:
709 """Categorize response time for analytics.
711 Args:
712 duration_ms: Response time in milliseconds
714 Returns:
715 Category string
716 """
717 if duration_ms < 100:
718 return "fast"
719 if duration_ms < 500:
720 return "normal"
721 if duration_ms < 2000:
722 return "slow"
723 return "very_slow"