Coverage for mcpgateway / middleware / auth_middleware.py: 100%
125 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/middleware/auth_middleware.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Authentication Middleware for early user context extraction.
9This middleware extracts user information from JWT tokens early in the request
10lifecycle and stores it in request.state.user for use by other middleware
11(like ObservabilityMiddleware) and route handlers.
13Examples:
14 >>> from mcpgateway.middleware.auth_middleware import AuthContextMiddleware # doctest: +SKIP
15 >>> app.add_middleware(AuthContextMiddleware) # doctest: +SKIP
16"""
18# Standard
19import logging
20from typing import Callable
22# Third-Party
23from fastapi import HTTPException
24from fastapi.security import HTTPAuthorizationCredentials
25from sqlalchemy.orm import Session
26from starlette.middleware.base import BaseHTTPMiddleware
27from starlette.requests import Request
28from starlette.responses import JSONResponse, Response
30# First-Party
31from mcpgateway.auth import get_current_user
32from mcpgateway.config import settings
33from mcpgateway.db import SessionLocal
34from mcpgateway.middleware.path_filter import should_skip_auth_context
35from mcpgateway.services.security_logger import get_security_logger
37logger = logging.getLogger(__name__)
38security_logger = get_security_logger()
40# HTTPException detail strings that indicate security-critical rejections
41# (revoked tokens, disabled accounts, fail-secure validation errors).
42# Only these trigger a hard JSON deny in the auth middleware; all other
43# 401/403s fall through to route-level auth for backwards compatibility.
44_HARD_DENY_DETAILS = frozenset({"Token has been revoked", "Account disabled", "Token validation failed"})
47def _should_log_auth_success() -> bool:
48 """Check if successful authentication should be logged based on settings.
50 Returns:
51 True if security_logging_level is "all", False otherwise.
52 """
53 return settings.security_logging_level == "all"
56def _should_log_auth_failure() -> bool:
57 """Check if failed authentication should be logged based on settings.
59 Returns:
60 True if security_logging_level is "all" or "failures_only", False for "high_severity".
61 """
62 # Log failures for "all" and "failures_only" levels, not for "high_severity"
63 return settings.security_logging_level in ("all", "failures_only")
66def _get_or_create_session(request: Request) -> tuple[Session, bool]:
67 """Get existing session from request.state.db or create new one.
69 This function implements the session reuse pattern established in PR #3600
70 to eliminate duplicate database sessions. It checks if a middleware (typically
71 ObservabilityMiddleware) has already created a request-scoped session and
72 reuses it. If no session exists (e.g., when observability is disabled), it
73 creates a new one as a fallback.
75 Args:
76 request: FastAPI/Starlette request object
78 Returns:
79 tuple: (session, owned) where:
80 - session: SQLAlchemy Session object
81 - owned: bool, True if we created the session (caller must close it)
83 Note:
84 When creating a new session (owned=True), it is NOT stored in
85 request.state.db. This prevents downstream code (e.g., get_db()
86 in route handlers) from reusing a session that auth middleware
87 will close after logging.
89 Examples:
90 >>> from unittest.mock import Mock
91 >>> mock_request = Mock()
92 >>> mock_request.state.db = None
93 >>> db, owned = _get_or_create_session(mock_request)
94 >>> owned
95 True
96 """
97 db = getattr(request.state, "db", None)
98 if db is not None:
99 logger.debug(f"[AUTH] Reusing session from middleware: {id(db)}")
100 return db, False
102 # Fallback: create a temporary session for auth logging only
103 # (e.g., when observability is disabled).
104 # Do NOT store in request.state.db — this session will be closed after
105 # logging; downstream get_db() should create its own session.
106 logger.debug("[AUTH] Creating new session (no middleware session available)")
107 db = SessionLocal()
108 return db, True
111class AuthContextMiddleware(BaseHTTPMiddleware):
112 """Middleware for extracting user authentication context early in request lifecycle.
114 This middleware attempts to authenticate requests using JWT tokens from cookies
115 or Authorization headers, and stores the user information in request.state.user
116 for downstream middleware and handlers to use.
118 Unlike route-level authentication dependencies, this runs for ALL requests,
119 allowing middleware like ObservabilityMiddleware to access user context.
121 Note:
122 Authentication failures are silent - requests continue as unauthenticated.
123 Route-level dependencies should still enforce authentication requirements.
124 """
126 async def dispatch(self, request: Request, call_next: Callable) -> Response:
127 """Process request and populate user context if authenticated.
129 Args:
130 request: Incoming HTTP request
131 call_next: Next middleware/handler in chain
133 Returns:
134 HTTP response
135 """
136 # Skip for health checks and static files
137 if should_skip_auth_context(request.url.path):
138 return await call_next(request)
140 # Try to extract token from multiple sources
141 token = None
143 # 1. Try manual cookie reading
144 if request.cookies:
145 token = request.cookies.get("jwt_token") or request.cookies.get("access_token")
147 # 2. Try Authorization header
148 if not token:
149 auth_header = request.headers.get("authorization")
150 if auth_header and auth_header.startswith("Bearer "):
151 token = auth_header.replace("Bearer ", "")
153 # If no token found, continue without user context
154 if not token:
155 return await call_next(request)
157 # Check logging settings once upfront to avoid DB session when not needed
158 log_success = _should_log_auth_success()
159 log_failure = _should_log_auth_failure()
161 # Try to authenticate and populate user context
162 # Note: get_current_user manages its own DB sessions internally
163 # We only create a DB session here when security logging is enabled
164 try:
165 credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
166 user = await get_current_user(credentials, request=request)
168 # Note: EmailUser uses 'email' as primary key, not 'id'
169 # User is already detached (created with fresh session that was closed)
170 user_email = user.email
171 user_id = user_email # For EmailUser, email IS the ID
173 # Store user in request state for downstream use
174 request.state.user = user
175 logger.info(f"✓ Authenticated user: {user_email if user_email else user_id}")
177 # Log successful authentication (only if logging level is "all")
178 # DB session reused from middleware or created if needed (Issue #3622)
179 if log_success:
180 db, owned = _get_or_create_session(request)
181 try:
182 security_logger.log_authentication_attempt(
183 user_id=user_id,
184 user_email=user_email,
185 auth_method="bearer_token",
186 success=True,
187 client_ip=request.client.host if request.client else "unknown",
188 user_agent=request.headers.get("user-agent"),
189 db=db,
190 )
191 # Commit immediately to persist logs even if exception occurs later in middleware chain
192 # Route handler's get_db() may commit again (no-op if no new changes)
193 db.commit()
194 except Exception as log_error:
195 logger.debug(f"Failed to log successful auth: {log_error}")
196 # Rollback shared session to clear PendingRollbackError state so
197 # downstream call_next()/get_db() does not inherit a broken session.
198 try:
199 db.rollback()
200 except Exception:
201 try:
202 db.invalidate()
203 except Exception:
204 pass # nosec B110 - Best effort cleanup
205 finally:
206 # Only close if we created the session
207 if owned:
208 try:
209 db.close()
210 except Exception as close_error:
211 logger.warning(f"Failed to close auth session: {close_error}")
213 except HTTPException as e:
214 if e.status_code in (401, 403) and e.detail in _HARD_DENY_DETAILS:
215 logger.info(f"✗ Auth rejected ({e.status_code}): {e.detail}")
217 if log_failure:
218 db, owned = _get_or_create_session(request)
219 try:
220 security_logger.log_authentication_attempt(
221 user_id="unknown",
222 user_email=None,
223 auth_method="bearer_token",
224 success=False,
225 client_ip=request.client.host if request.client else "unknown",
226 user_agent=request.headers.get("user-agent"),
227 failure_reason=str(e.detail),
228 db=db,
229 )
230 # Commit immediately to persist logs, especially for hard-deny paths (API requests)
231 # that return JSONResponse without reaching get_db()
232 # For browser requests that continue to route handler, get_db() commits again (no-op)
233 db.commit()
234 except Exception as log_error:
235 logger.debug(f"Failed to log auth failure: {log_error}")
236 # Rollback shared session to clear PendingRollbackError state so
237 # downstream call_next()/get_db() does not inherit a broken session.
238 try:
239 db.rollback()
240 except Exception:
241 try:
242 db.invalidate()
243 except Exception:
244 pass # nosec B110 - Best effort cleanup
245 finally:
246 # Only close if we created the session
247 if owned:
248 try:
249 db.close()
250 except Exception as close_error:
251 logger.warning(f"Failed to close auth session: {close_error}")
253 # Browser/admin requests with stale cookies: let the request continue
254 # without user context so the RBAC layer can redirect to /admin/login.
255 # API requests: return a hard JSON 401/403 deny.
256 # Detection must match rbac.py's is_browser_request logic (Accept,
257 # HX-Request, and Referer: /admin) to avoid breaking admin UI flows.
258 accept_header = request.headers.get("accept", "")
259 is_htmx = request.headers.get("hx-request") == "true"
260 referer = request.headers.get("referer", "")
261 is_browser = "text/html" in accept_header or is_htmx or "/admin" in referer
262 if is_browser:
263 logger.debug("Browser request with rejected auth — continuing without user for redirect")
264 return await call_next(request)
266 # Include essential security headers since this response bypasses
267 # SecurityHeadersMiddleware (it returns before call_next).
268 resp_headers = dict(e.headers) if e.headers else {}
269 resp_headers.setdefault("X-Content-Type-Options", "nosniff")
270 resp_headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin")
271 return JSONResponse(
272 status_code=e.status_code,
273 content={"detail": e.detail},
274 headers=resp_headers,
275 )
277 # Non-security HTTP errors (e.g. 500 from a downstream service) — continue as anonymous
278 logger.info(f"✗ Auth context extraction failed (continuing as anonymous): {e}")
279 except Exception as e:
280 # Non-HTTP errors (network, decode, etc.) — continue as anonymous
281 logger.info(f"✗ Auth context extraction failed (continuing as anonymous): {e}")
283 # Log failed authentication attempt (based on logging level)
284 # DB session reused from middleware or created if needed (Issue #3622)
285 if log_failure:
286 db, owned = _get_or_create_session(request)
287 try:
288 security_logger.log_authentication_attempt(
289 user_id="unknown",
290 user_email=None,
291 auth_method="bearer_token",
292 success=False,
293 client_ip=request.client.host if request.client else "unknown",
294 user_agent=request.headers.get("user-agent"),
295 failure_reason=str(e),
296 db=db,
297 )
298 # Commit immediately to persist logs even if exception occurs later
299 # When owned=True, session is closed after this block, so commit is required
300 # When owned=False, get_db() may commit again (no-op if no new changes)
301 db.commit()
302 except Exception as log_error:
303 logger.debug(f"Failed to log auth failure: {log_error}")
304 # Rollback shared session to clear PendingRollbackError state so
305 # downstream call_next()/get_db() does not inherit a broken session.
306 try:
307 db.rollback()
308 except Exception:
309 try:
310 db.invalidate()
311 except Exception:
312 pass # nosec B110 - Best effort cleanup
313 finally:
314 # Only close if we created the session
315 if owned:
316 try:
317 db.close()
318 except Exception as close_error:
319 logger.warning(f"Failed to close auth session: {close_error}")
321 # Continue with request
322 return await call_next(request)