Coverage for mcpgateway / middleware / auth_middleware.py: 100%
64 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/middleware/auth_middleware.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Authentication Middleware for early user context extraction.
9This middleware extracts user information from JWT tokens early in the request
10lifecycle and stores it in request.state.user for use by other middleware
11(like ObservabilityMiddleware) and route handlers.
13Examples:
14 >>> from mcpgateway.middleware.auth_middleware import AuthContextMiddleware # doctest: +SKIP
15 >>> app.add_middleware(AuthContextMiddleware) # doctest: +SKIP
16"""
18# Standard
19import logging
20from typing import Callable
22# Third-Party
23from fastapi.security import HTTPAuthorizationCredentials
24from starlette.middleware.base import BaseHTTPMiddleware
25from starlette.requests import Request
26from starlette.responses import Response
28# First-Party
29from mcpgateway.auth import get_current_user
30from mcpgateway.config import settings
31from mcpgateway.db import SessionLocal
32from mcpgateway.middleware.path_filter import should_skip_auth_context
33from mcpgateway.services.security_logger import get_security_logger
35logger = logging.getLogger(__name__)
36security_logger = get_security_logger()
39def _should_log_auth_success() -> bool:
40 """Check if successful authentication should be logged based on settings.
42 Returns:
43 True if security_logging_level is "all", False otherwise.
44 """
45 return settings.security_logging_level == "all"
48def _should_log_auth_failure() -> bool:
49 """Check if failed authentication should be logged based on settings.
51 Returns:
52 True if security_logging_level is "all" or "failures_only", False for "high_severity".
53 """
54 # Log failures for "all" and "failures_only" levels, not for "high_severity"
55 return settings.security_logging_level in ("all", "failures_only")
58class AuthContextMiddleware(BaseHTTPMiddleware):
59 """Middleware for extracting user authentication context early in request lifecycle.
61 This middleware attempts to authenticate requests using JWT tokens from cookies
62 or Authorization headers, and stores the user information in request.state.user
63 for downstream middleware and handlers to use.
65 Unlike route-level authentication dependencies, this runs for ALL requests,
66 allowing middleware like ObservabilityMiddleware to access user context.
68 Note:
69 Authentication failures are silent - requests continue as unauthenticated.
70 Route-level dependencies should still enforce authentication requirements.
71 """
73 async def dispatch(self, request: Request, call_next: Callable) -> Response:
74 """Process request and populate user context if authenticated.
76 Args:
77 request: Incoming HTTP request
78 call_next: Next middleware/handler in chain
80 Returns:
81 HTTP response
82 """
83 # Skip for health checks and static files
84 if should_skip_auth_context(request.url.path):
85 return await call_next(request)
87 # Try to extract token from multiple sources
88 token = None
90 # 1. Try manual cookie reading
91 if request.cookies:
92 token = request.cookies.get("jwt_token") or request.cookies.get("access_token")
94 # 2. Try Authorization header
95 if not token:
96 auth_header = request.headers.get("authorization")
97 if auth_header and auth_header.startswith("Bearer "):
98 token = auth_header.replace("Bearer ", "")
100 # If no token found, continue without user context
101 if not token:
102 return await call_next(request)
104 # Check logging settings once upfront to avoid DB session when not needed
105 log_success = _should_log_auth_success()
106 log_failure = _should_log_auth_failure()
108 # Try to authenticate and populate user context
109 # Note: get_current_user manages its own DB sessions internally
110 # We only create a DB session here when security logging is enabled
111 try:
112 credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
113 user = await get_current_user(credentials, request=request)
115 # Note: EmailUser uses 'email' as primary key, not 'id'
116 # User is already detached (created with fresh session that was closed)
117 user_email = user.email
118 user_id = user_email # For EmailUser, email IS the ID
120 # Store user in request state for downstream use
121 request.state.user = user
122 logger.info(f"✓ Authenticated user: {user_email if user_email else user_id}")
124 # Log successful authentication (only if logging level is "all")
125 # DB session created only when needed
126 if log_success:
127 db = SessionLocal()
128 try:
129 security_logger.log_authentication_attempt(
130 user_id=user_id,
131 user_email=user_email,
132 auth_method="bearer_token",
133 success=True,
134 client_ip=request.client.host if request.client else "unknown",
135 user_agent=request.headers.get("user-agent"),
136 db=db,
137 )
138 db.commit()
139 except Exception as log_error:
140 logger.debug(f"Failed to log successful auth: {log_error}")
141 finally:
142 try:
143 db.close()
144 except Exception as close_error:
145 logger.debug(f"Failed to close database session: {close_error}")
147 except Exception as e:
148 # Silently fail - let route handlers enforce auth if needed
149 logger.info(f"✗ Auth context extraction failed (continuing as anonymous): {e}")
151 # Log failed authentication attempt (based on logging level)
152 # DB session created only when needed
153 if log_failure:
154 db = SessionLocal()
155 try:
156 security_logger.log_authentication_attempt(
157 user_id="unknown",
158 user_email=None,
159 auth_method="bearer_token",
160 success=False,
161 client_ip=request.client.host if request.client else "unknown",
162 user_agent=request.headers.get("user-agent"),
163 failure_reason=str(e),
164 db=db,
165 )
166 db.commit()
167 except Exception as log_error:
168 logger.debug(f"Failed to log auth failure: {log_error}")
169 finally:
170 try:
171 db.close()
172 except Exception as close_error:
173 logger.debug(f"Failed to close database session: {close_error}")
175 # Continue with request
176 return await call_next(request)