Coverage for mcpgateway / middleware / auth_middleware.py: 100%
92 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/middleware/auth_middleware.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Authentication Middleware for early user context extraction.
9This middleware extracts user information from JWT tokens early in the request
10lifecycle and stores it in request.state.user for use by other middleware
11(like ObservabilityMiddleware) and route handlers.
13Examples:
14 >>> from mcpgateway.middleware.auth_middleware import AuthContextMiddleware # doctest: +SKIP
15 >>> app.add_middleware(AuthContextMiddleware) # doctest: +SKIP
16"""
18# Standard
19import logging
20from typing import Callable
22# Third-Party
23from fastapi import HTTPException
24from fastapi.security import HTTPAuthorizationCredentials
25from starlette.middleware.base import BaseHTTPMiddleware
26from starlette.requests import Request
27from starlette.responses import JSONResponse, Response
29# First-Party
30from mcpgateway.auth import get_current_user
31from mcpgateway.config import settings
32from mcpgateway.db import SessionLocal
33from mcpgateway.middleware.path_filter import should_skip_auth_context
34from mcpgateway.services.security_logger import get_security_logger
36logger = logging.getLogger(__name__)
37security_logger = get_security_logger()
39# HTTPException detail strings that indicate security-critical rejections
40# (revoked tokens, disabled accounts, fail-secure validation errors).
41# Only these trigger a hard JSON deny in the auth middleware; all other
42# 401/403s fall through to route-level auth for backwards compatibility.
43_HARD_DENY_DETAILS = frozenset({"Token has been revoked", "Account disabled", "Token validation failed"})
46def _should_log_auth_success() -> bool:
47 """Check if successful authentication should be logged based on settings.
49 Returns:
50 True if security_logging_level is "all", False otherwise.
51 """
52 return settings.security_logging_level == "all"
55def _should_log_auth_failure() -> bool:
56 """Check if failed authentication should be logged based on settings.
58 Returns:
59 True if security_logging_level is "all" or "failures_only", False for "high_severity".
60 """
61 # Log failures for "all" and "failures_only" levels, not for "high_severity"
62 return settings.security_logging_level in ("all", "failures_only")
65class AuthContextMiddleware(BaseHTTPMiddleware):
66 """Middleware for extracting user authentication context early in request lifecycle.
68 This middleware attempts to authenticate requests using JWT tokens from cookies
69 or Authorization headers, and stores the user information in request.state.user
70 for downstream middleware and handlers to use.
72 Unlike route-level authentication dependencies, this runs for ALL requests,
73 allowing middleware like ObservabilityMiddleware to access user context.
75 Note:
76 Authentication failures are silent - requests continue as unauthenticated.
77 Route-level dependencies should still enforce authentication requirements.
78 """
80 async def dispatch(self, request: Request, call_next: Callable) -> Response:
81 """Process request and populate user context if authenticated.
83 Args:
84 request: Incoming HTTP request
85 call_next: Next middleware/handler in chain
87 Returns:
88 HTTP response
89 """
90 # Skip for health checks and static files
91 if should_skip_auth_context(request.url.path):
92 return await call_next(request)
94 # Try to extract token from multiple sources
95 token = None
97 # 1. Try manual cookie reading
98 if request.cookies:
99 token = request.cookies.get("jwt_token") or request.cookies.get("access_token")
101 # 2. Try Authorization header
102 if not token:
103 auth_header = request.headers.get("authorization")
104 if auth_header and auth_header.startswith("Bearer "):
105 token = auth_header.replace("Bearer ", "")
107 # If no token found, continue without user context
108 if not token:
109 return await call_next(request)
111 # Check logging settings once upfront to avoid DB session when not needed
112 log_success = _should_log_auth_success()
113 log_failure = _should_log_auth_failure()
115 # Try to authenticate and populate user context
116 # Note: get_current_user manages its own DB sessions internally
117 # We only create a DB session here when security logging is enabled
118 try:
119 credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
120 user = await get_current_user(credentials, request=request)
122 # Note: EmailUser uses 'email' as primary key, not 'id'
123 # User is already detached (created with fresh session that was closed)
124 user_email = user.email
125 user_id = user_email # For EmailUser, email IS the ID
127 # Store user in request state for downstream use
128 request.state.user = user
129 logger.info(f"✓ Authenticated user: {user_email if user_email else user_id}")
131 # Log successful authentication (only if logging level is "all")
132 # DB session created only when needed
133 if log_success:
134 db = SessionLocal()
135 try:
136 security_logger.log_authentication_attempt(
137 user_id=user_id,
138 user_email=user_email,
139 auth_method="bearer_token",
140 success=True,
141 client_ip=request.client.host if request.client else "unknown",
142 user_agent=request.headers.get("user-agent"),
143 db=db,
144 )
145 db.commit()
146 except Exception as log_error:
147 logger.debug(f"Failed to log successful auth: {log_error}")
148 finally:
149 try:
150 db.close()
151 except Exception as close_error:
152 logger.debug(f"Failed to close database session: {close_error}")
154 except HTTPException as e:
155 if e.status_code in (401, 403) and e.detail in _HARD_DENY_DETAILS:
156 logger.info(f"✗ Auth rejected ({e.status_code}): {e.detail}")
158 if log_failure:
159 db = SessionLocal()
160 try:
161 security_logger.log_authentication_attempt(
162 user_id="unknown",
163 user_email=None,
164 auth_method="bearer_token",
165 success=False,
166 client_ip=request.client.host if request.client else "unknown",
167 user_agent=request.headers.get("user-agent"),
168 failure_reason=str(e.detail),
169 db=db,
170 )
171 db.commit()
172 except Exception as log_error:
173 logger.debug(f"Failed to log auth failure: {log_error}")
174 finally:
175 try:
176 db.close()
177 except Exception as close_error:
178 logger.debug(f"Failed to close database session: {close_error}")
180 # Browser/admin requests with stale cookies: let the request continue
181 # without user context so the RBAC layer can redirect to /admin/login.
182 # API requests: return a hard JSON 401/403 deny.
183 # Detection must match rbac.py's is_browser_request logic (Accept,
184 # HX-Request, and Referer: /admin) to avoid breaking admin UI flows.
185 accept_header = request.headers.get("accept", "")
186 is_htmx = request.headers.get("hx-request") == "true"
187 referer = request.headers.get("referer", "")
188 is_browser = "text/html" in accept_header or is_htmx or "/admin" in referer
189 if is_browser:
190 logger.debug("Browser request with rejected auth — continuing without user for redirect")
191 return await call_next(request)
193 # Include essential security headers since this response bypasses
194 # SecurityHeadersMiddleware (it returns before call_next).
195 resp_headers = dict(e.headers) if e.headers else {}
196 resp_headers.setdefault("X-Content-Type-Options", "nosniff")
197 resp_headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin")
198 return JSONResponse(
199 status_code=e.status_code,
200 content={"detail": e.detail},
201 headers=resp_headers,
202 )
204 # Non-security HTTP errors (e.g. 500 from a downstream service) — continue as anonymous
205 logger.info(f"✗ Auth context extraction failed (continuing as anonymous): {e}")
206 except Exception as e:
207 # Non-HTTP errors (network, decode, etc.) — continue as anonymous
208 logger.info(f"✗ Auth context extraction failed (continuing as anonymous): {e}")
210 # Log failed authentication attempt (based on logging level)
211 # DB session created only when needed
212 if log_failure:
213 db = SessionLocal()
214 try:
215 security_logger.log_authentication_attempt(
216 user_id="unknown",
217 user_email=None,
218 auth_method="bearer_token",
219 success=False,
220 client_ip=request.client.host if request.client else "unknown",
221 user_agent=request.headers.get("user-agent"),
222 failure_reason=str(e),
223 db=db,
224 )
225 db.commit()
226 except Exception as log_error:
227 logger.debug(f"Failed to log auth failure: {log_error}")
228 finally:
229 try:
230 db.close()
231 except Exception as close_error:
232 logger.debug(f"Failed to close database session: {close_error}")
234 # Continue with request
235 return await call_next(request)