Coverage for mcpgateway / routers / auth.py: 100%

63 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/routers/auth.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Main Authentication Router. 

8This module provides simplified authentication endpoints for both session and API key management. 

9It serves as the primary entry point for authentication workflows. 

10""" 

11 

12# Standard 

13from typing import Optional 

14 

15# Third-Party 

16from fastapi import APIRouter, Depends, HTTPException, Request, status 

17from pydantic import BaseModel, EmailStr 

18from sqlalchemy.orm import Session 

19 

20# First-Party 

21from mcpgateway.config import settings 

22from mcpgateway.db import SessionLocal 

23from mcpgateway.routers.email_auth import create_access_token, get_client_ip, get_user_agent 

24from mcpgateway.schemas import AuthenticationResponse, EmailUserResponse 

25from mcpgateway.services.email_auth_service import EmailAuthService 

26from mcpgateway.services.logging_service import LoggingService 

27 

28# Initialize logging 

29logging_service = LoggingService() 

30logger = logging_service.get_logger(__name__) 

31 

32# Create router 

33auth_router = APIRouter(prefix="/auth", tags=["Authentication"]) 

34 

35 

36def get_db(): 

37 """Database dependency. 

38 

39 Commits the transaction on successful completion to avoid implicit rollbacks 

40 for read-only operations. Rolls back explicitly on exception. 

41 

42 Yields: 

43 Session: SQLAlchemy database session 

44 

45 Raises: 

46 Exception: Re-raises any exception after rolling back the transaction. 

47 

48 Examples: 

49 >>> db_gen = get_db() 

50 >>> db = next(db_gen) 

51 >>> hasattr(db, 'close') 

52 True 

53 """ 

54 db = SessionLocal() 

55 try: 

56 yield db 

57 db.commit() 

58 except Exception: 

59 try: 

60 db.rollback() 

61 except Exception: 

62 try: 

63 db.invalidate() 

64 except Exception: 

65 pass # nosec B110 - Best effort cleanup on connection failure 

66 raise 

67 finally: 

68 db.close() 

69 

70 

71class LoginRequest(BaseModel): 

72 """Login request supporting both email and username formats. 

73 

74 Attributes: 

75 email: User email address (can also accept 'username' field for compatibility) 

76 password: User password 

77 """ 

78 

79 email: Optional[EmailStr] = None 

80 username: Optional[str] = None # For compatibility 

81 password: str 

82 

83 def get_email(self) -> str: 

84 """Get email from either email or username field. 

85 

86 Returns: 

87 str: Email address to use for authentication 

88 

89 Raises: 

90 ValueError: If neither email nor username is provided 

91 

92 Examples: 

93 >>> req = LoginRequest(email="test@example.com", password="pass") 

94 >>> req.get_email() 

95 'test@example.com' 

96 >>> req = LoginRequest(username="user@domain.com", password="pass") 

97 >>> req.get_email() 

98 'user@domain.com' 

99 >>> req = LoginRequest(username="invaliduser", password="pass") 

100 >>> req.get_email() # doctest: +IGNORE_EXCEPTION_DETAIL 

101 Traceback (most recent call last): 

102 ValueError: Username format not supported. Please use email address. 

103 >>> req = LoginRequest(password="pass") 

104 >>> req.get_email() # doctest: +IGNORE_EXCEPTION_DETAIL 

105 Traceback (most recent call last): 

106 ValueError: Either email or username must be provided 

107 """ 

108 if self.email: 

109 return str(self.email) 

110 elif self.username: 

111 # Support both email format and plain username 

112 if "@" in self.username: 

113 return self.username 

114 else: 

115 # If it's a plain username, we can't authenticate 

116 # (since we're email-based system) 

117 raise ValueError("Username format not supported. Please use email address.") 

118 else: 

119 raise ValueError("Either email or username must be provided") 

120 

121 

122@auth_router.post("/login", response_model=AuthenticationResponse) 

123async def login(login_request: LoginRequest, request: Request, db: Session = Depends(get_db)): 

124 """Authenticate user and return session JWT token. 

125 

126 This endpoint provides Tier 1 authentication for session-based access. 

127 The returned JWT token should be used for UI access and API key management. 

128 

129 Args: 

130 login_request: Login credentials (email/username + password) 

131 request: FastAPI request object 

132 db: Database session 

133 

134 Returns: 

135 AuthenticationResponse: Session JWT token and user info 

136 

137 Raises: 

138 HTTPException: If authentication fails 

139 

140 Examples: 

141 Email format (recommended): 

142 { 

143 "email": "admin@example.com", 

144 "password": "ChangeMe_12345678$" 

145 } 

146 

147 Username format (compatibility): 

148 { 

149 "username": "admin@example.com", 

150 "password": "ChangeMe_12345678$" 

151 } 

152 """ 

153 auth_service = EmailAuthService(db) 

154 ip_address = get_client_ip(request) 

155 user_agent = get_user_agent(request) 

156 

157 try: 

158 # Extract email from request 

159 email = login_request.get_email() 

160 

161 # Authenticate user 

162 user = await auth_service.authenticate_user(email=email, password=login_request.password, ip_address=ip_address, user_agent=user_agent) 

163 

164 if not user: 

165 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password") 

166 

167 if settings.sso_enabled and settings.sso_preserve_admin_auth and not bool(getattr(user, "is_admin", False)): 

168 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Password authentication is restricted to admin accounts while SSO is enabled.") 

169 

170 # Create session JWT token (Tier 1 authentication) 

171 access_token, expires_in = await create_access_token(user) 

172 

173 logger.info(f"User {email} authenticated successfully") 

174 

175 # Return session token for UI access and API key management 

176 return AuthenticationResponse( 

177 access_token=access_token, token_type="bearer", expires_in=expires_in, user=EmailUserResponse.from_email_user(user) 

178 ) # nosec B106 - OAuth2 token type, not a password 

179 

180 except HTTPException: 

181 raise 

182 except ValueError as e: 

183 logger.warning(f"Login validation error: {e}") 

184 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

185 except Exception as e: 

186 logger.error(f"Login error for {login_request.email or login_request.username}: {e}") 

187 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Authentication service error")