Coverage for mcpgateway / middleware / auth_middleware.py: 100%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/middleware/auth_middleware.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Authentication Middleware for early user context extraction. 

8 

9This middleware extracts user information from JWT tokens early in the request 

10lifecycle and stores it in request.state.user for use by other middleware 

11(like ObservabilityMiddleware) and route handlers. 

12 

13Examples: 

14 >>> from mcpgateway.middleware.auth_middleware import AuthContextMiddleware # doctest: +SKIP 

15 >>> app.add_middleware(AuthContextMiddleware) # doctest: +SKIP 

16""" 

17 

18# Standard 

19import logging 

20from typing import Callable 

21 

22# Third-Party 

23from fastapi import HTTPException 

24from fastapi.security import HTTPAuthorizationCredentials 

25from starlette.middleware.base import BaseHTTPMiddleware 

26from starlette.requests import Request 

27from starlette.responses import JSONResponse, Response 

28 

29# First-Party 

30from mcpgateway.auth import get_current_user 

31from mcpgateway.config import settings 

32from mcpgateway.db import SessionLocal 

33from mcpgateway.middleware.path_filter import should_skip_auth_context 

34from mcpgateway.services.security_logger import get_security_logger 

35 

36logger = logging.getLogger(__name__) 

37security_logger = get_security_logger() 

38 

39# HTTPException detail strings that indicate security-critical rejections 

40# (revoked tokens, disabled accounts, fail-secure validation errors). 

41# Only these trigger a hard JSON deny in the auth middleware; all other 

42# 401/403s fall through to route-level auth for backwards compatibility. 

43_HARD_DENY_DETAILS = frozenset({"Token has been revoked", "Account disabled", "Token validation failed"}) 

44 

45 

46def _should_log_auth_success() -> bool: 

47 """Check if successful authentication should be logged based on settings. 

48 

49 Returns: 

50 True if security_logging_level is "all", False otherwise. 

51 """ 

52 return settings.security_logging_level == "all" 

53 

54 

55def _should_log_auth_failure() -> bool: 

56 """Check if failed authentication should be logged based on settings. 

57 

58 Returns: 

59 True if security_logging_level is "all" or "failures_only", False for "high_severity". 

60 """ 

61 # Log failures for "all" and "failures_only" levels, not for "high_severity" 

62 return settings.security_logging_level in ("all", "failures_only") 

63 

64 

65class AuthContextMiddleware(BaseHTTPMiddleware): 

66 """Middleware for extracting user authentication context early in request lifecycle. 

67 

68 This middleware attempts to authenticate requests using JWT tokens from cookies 

69 or Authorization headers, and stores the user information in request.state.user 

70 for downstream middleware and handlers to use. 

71 

72 Unlike route-level authentication dependencies, this runs for ALL requests, 

73 allowing middleware like ObservabilityMiddleware to access user context. 

74 

75 Note: 

76 Authentication failures are silent - requests continue as unauthenticated. 

77 Route-level dependencies should still enforce authentication requirements. 

78 """ 

79 

80 async def dispatch(self, request: Request, call_next: Callable) -> Response: 

81 """Process request and populate user context if authenticated. 

82 

83 Args: 

84 request: Incoming HTTP request 

85 call_next: Next middleware/handler in chain 

86 

87 Returns: 

88 HTTP response 

89 """ 

90 # Skip for health checks and static files 

91 if should_skip_auth_context(request.url.path): 

92 return await call_next(request) 

93 

94 # Try to extract token from multiple sources 

95 token = None 

96 

97 # 1. Try manual cookie reading 

98 if request.cookies: 

99 token = request.cookies.get("jwt_token") or request.cookies.get("access_token") 

100 

101 # 2. Try Authorization header 

102 if not token: 

103 auth_header = request.headers.get("authorization") 

104 if auth_header and auth_header.startswith("Bearer "): 

105 token = auth_header.replace("Bearer ", "") 

106 

107 # If no token found, continue without user context 

108 if not token: 

109 return await call_next(request) 

110 

111 # Check logging settings once upfront to avoid DB session when not needed 

112 log_success = _should_log_auth_success() 

113 log_failure = _should_log_auth_failure() 

114 

115 # Try to authenticate and populate user context 

116 # Note: get_current_user manages its own DB sessions internally 

117 # We only create a DB session here when security logging is enabled 

118 try: 

119 credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token) 

120 user = await get_current_user(credentials, request=request) 

121 

122 # Note: EmailUser uses 'email' as primary key, not 'id' 

123 # User is already detached (created with fresh session that was closed) 

124 user_email = user.email 

125 user_id = user_email # For EmailUser, email IS the ID 

126 

127 # Store user in request state for downstream use 

128 request.state.user = user 

129 logger.info(f"✓ Authenticated user: {user_email if user_email else user_id}") 

130 

131 # Log successful authentication (only if logging level is "all") 

132 # DB session created only when needed 

133 if log_success: 

134 db = SessionLocal() 

135 try: 

136 security_logger.log_authentication_attempt( 

137 user_id=user_id, 

138 user_email=user_email, 

139 auth_method="bearer_token", 

140 success=True, 

141 client_ip=request.client.host if request.client else "unknown", 

142 user_agent=request.headers.get("user-agent"), 

143 db=db, 

144 ) 

145 db.commit() 

146 except Exception as log_error: 

147 logger.debug(f"Failed to log successful auth: {log_error}") 

148 finally: 

149 try: 

150 db.close() 

151 except Exception as close_error: 

152 logger.debug(f"Failed to close database session: {close_error}") 

153 

154 except HTTPException as e: 

155 if e.status_code in (401, 403) and e.detail in _HARD_DENY_DETAILS: 

156 logger.info(f"✗ Auth rejected ({e.status_code}): {e.detail}") 

157 

158 if log_failure: 

159 db = SessionLocal() 

160 try: 

161 security_logger.log_authentication_attempt( 

162 user_id="unknown", 

163 user_email=None, 

164 auth_method="bearer_token", 

165 success=False, 

166 client_ip=request.client.host if request.client else "unknown", 

167 user_agent=request.headers.get("user-agent"), 

168 failure_reason=str(e.detail), 

169 db=db, 

170 ) 

171 db.commit() 

172 except Exception as log_error: 

173 logger.debug(f"Failed to log auth failure: {log_error}") 

174 finally: 

175 try: 

176 db.close() 

177 except Exception as close_error: 

178 logger.debug(f"Failed to close database session: {close_error}") 

179 

180 # Browser/admin requests with stale cookies: let the request continue 

181 # without user context so the RBAC layer can redirect to /admin/login. 

182 # API requests: return a hard JSON 401/403 deny. 

183 # Detection must match rbac.py's is_browser_request logic (Accept, 

184 # HX-Request, and Referer: /admin) to avoid breaking admin UI flows. 

185 accept_header = request.headers.get("accept", "") 

186 is_htmx = request.headers.get("hx-request") == "true" 

187 referer = request.headers.get("referer", "") 

188 is_browser = "text/html" in accept_header or is_htmx or "/admin" in referer 

189 if is_browser: 

190 logger.debug("Browser request with rejected auth — continuing without user for redirect") 

191 return await call_next(request) 

192 

193 # Include essential security headers since this response bypasses 

194 # SecurityHeadersMiddleware (it returns before call_next). 

195 resp_headers = dict(e.headers) if e.headers else {} 

196 resp_headers.setdefault("X-Content-Type-Options", "nosniff") 

197 resp_headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin") 

198 return JSONResponse( 

199 status_code=e.status_code, 

200 content={"detail": e.detail}, 

201 headers=resp_headers, 

202 ) 

203 

204 # Non-security HTTP errors (e.g. 500 from a downstream service) — continue as anonymous 

205 logger.info(f"✗ Auth context extraction failed (continuing as anonymous): {e}") 

206 except Exception as e: 

207 # Non-HTTP errors (network, decode, etc.) — continue as anonymous 

208 logger.info(f"✗ Auth context extraction failed (continuing as anonymous): {e}") 

209 

210 # Log failed authentication attempt (based on logging level) 

211 # DB session created only when needed 

212 if log_failure: 

213 db = SessionLocal() 

214 try: 

215 security_logger.log_authentication_attempt( 

216 user_id="unknown", 

217 user_email=None, 

218 auth_method="bearer_token", 

219 success=False, 

220 client_ip=request.client.host if request.client else "unknown", 

221 user_agent=request.headers.get("user-agent"), 

222 failure_reason=str(e), 

223 db=db, 

224 ) 

225 db.commit() 

226 except Exception as log_error: 

227 logger.debug(f"Failed to log auth failure: {log_error}") 

228 finally: 

229 try: 

230 db.close() 

231 except Exception as close_error: 

232 logger.debug(f"Failed to close database session: {close_error}") 

233 

234 # Continue with request 

235 return await call_next(request)