Coverage for mcpgateway / middleware / auth_middleware.py: 100%

64 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/middleware/auth_middleware.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Authentication Middleware for early user context extraction. 

8 

9This middleware extracts user information from JWT tokens early in the request 

10lifecycle and stores it in request.state.user for use by other middleware 

11(like ObservabilityMiddleware) and route handlers. 

12 

13Examples: 

14 >>> from mcpgateway.middleware.auth_middleware import AuthContextMiddleware # doctest: +SKIP 

15 >>> app.add_middleware(AuthContextMiddleware) # doctest: +SKIP 

16""" 

17 

18# Standard 

19import logging 

20from typing import Callable 

21 

22# Third-Party 

23from fastapi.security import HTTPAuthorizationCredentials 

24from starlette.middleware.base import BaseHTTPMiddleware 

25from starlette.requests import Request 

26from starlette.responses import Response 

27 

28# First-Party 

29from mcpgateway.auth import get_current_user 

30from mcpgateway.config import settings 

31from mcpgateway.db import SessionLocal 

32from mcpgateway.middleware.path_filter import should_skip_auth_context 

33from mcpgateway.services.security_logger import get_security_logger 

34 

35logger = logging.getLogger(__name__) 

36security_logger = get_security_logger() 

37 

38 

39def _should_log_auth_success() -> bool: 

40 """Check if successful authentication should be logged based on settings. 

41 

42 Returns: 

43 True if security_logging_level is "all", False otherwise. 

44 """ 

45 return settings.security_logging_level == "all" 

46 

47 

48def _should_log_auth_failure() -> bool: 

49 """Check if failed authentication should be logged based on settings. 

50 

51 Returns: 

52 True if security_logging_level is "all" or "failures_only", False for "high_severity". 

53 """ 

54 # Log failures for "all" and "failures_only" levels, not for "high_severity" 

55 return settings.security_logging_level in ("all", "failures_only") 

56 

57 

58class AuthContextMiddleware(BaseHTTPMiddleware): 

59 """Middleware for extracting user authentication context early in request lifecycle. 

60 

61 This middleware attempts to authenticate requests using JWT tokens from cookies 

62 or Authorization headers, and stores the user information in request.state.user 

63 for downstream middleware and handlers to use. 

64 

65 Unlike route-level authentication dependencies, this runs for ALL requests, 

66 allowing middleware like ObservabilityMiddleware to access user context. 

67 

68 Note: 

69 Authentication failures are silent - requests continue as unauthenticated. 

70 Route-level dependencies should still enforce authentication requirements. 

71 """ 

72 

73 async def dispatch(self, request: Request, call_next: Callable) -> Response: 

74 """Process request and populate user context if authenticated. 

75 

76 Args: 

77 request: Incoming HTTP request 

78 call_next: Next middleware/handler in chain 

79 

80 Returns: 

81 HTTP response 

82 """ 

83 # Skip for health checks and static files 

84 if should_skip_auth_context(request.url.path): 

85 return await call_next(request) 

86 

87 # Try to extract token from multiple sources 

88 token = None 

89 

90 # 1. Try manual cookie reading 

91 if request.cookies: 

92 token = request.cookies.get("jwt_token") or request.cookies.get("access_token") 

93 

94 # 2. Try Authorization header 

95 if not token: 

96 auth_header = request.headers.get("authorization") 

97 if auth_header and auth_header.startswith("Bearer "): 

98 token = auth_header.replace("Bearer ", "") 

99 

100 # If no token found, continue without user context 

101 if not token: 

102 return await call_next(request) 

103 

104 # Check logging settings once upfront to avoid DB session when not needed 

105 log_success = _should_log_auth_success() 

106 log_failure = _should_log_auth_failure() 

107 

108 # Try to authenticate and populate user context 

109 # Note: get_current_user manages its own DB sessions internally 

110 # We only create a DB session here when security logging is enabled 

111 try: 

112 credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token) 

113 user = await get_current_user(credentials, request=request) 

114 

115 # Note: EmailUser uses 'email' as primary key, not 'id' 

116 # User is already detached (created with fresh session that was closed) 

117 user_email = user.email 

118 user_id = user_email # For EmailUser, email IS the ID 

119 

120 # Store user in request state for downstream use 

121 request.state.user = user 

122 logger.info(f"✓ Authenticated user: {user_email if user_email else user_id}") 

123 

124 # Log successful authentication (only if logging level is "all") 

125 # DB session created only when needed 

126 if log_success: 

127 db = SessionLocal() 

128 try: 

129 security_logger.log_authentication_attempt( 

130 user_id=user_id, 

131 user_email=user_email, 

132 auth_method="bearer_token", 

133 success=True, 

134 client_ip=request.client.host if request.client else "unknown", 

135 user_agent=request.headers.get("user-agent"), 

136 db=db, 

137 ) 

138 db.commit() 

139 except Exception as log_error: 

140 logger.debug(f"Failed to log successful auth: {log_error}") 

141 finally: 

142 try: 

143 db.close() 

144 except Exception as close_error: 

145 logger.debug(f"Failed to close database session: {close_error}") 

146 

147 except Exception as e: 

148 # Silently fail - let route handlers enforce auth if needed 

149 logger.info(f"✗ Auth context extraction failed (continuing as anonymous): {e}") 

150 

151 # Log failed authentication attempt (based on logging level) 

152 # DB session created only when needed 

153 if log_failure: 

154 db = SessionLocal() 

155 try: 

156 security_logger.log_authentication_attempt( 

157 user_id="unknown", 

158 user_email=None, 

159 auth_method="bearer_token", 

160 success=False, 

161 client_ip=request.client.host if request.client else "unknown", 

162 user_agent=request.headers.get("user-agent"), 

163 failure_reason=str(e), 

164 db=db, 

165 ) 

166 db.commit() 

167 except Exception as log_error: 

168 logger.debug(f"Failed to log auth failure: {log_error}") 

169 finally: 

170 try: 

171 db.close() 

172 except Exception as close_error: 

173 logger.debug(f"Failed to close database session: {close_error}") 

174 

175 # Continue with request 

176 return await call_next(request)