Coverage for mcpgateway / routers / auth.py: 100%

58 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/routers/auth.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Main Authentication Router. 

8This module provides simplified authentication endpoints for both session and API key management. 

9It serves as the primary entry point for authentication workflows. 

10""" 

11 

12# Standard 

13from typing import Optional 

14 

15# Third-Party 

16from fastapi import APIRouter, Depends, HTTPException, Request, status 

17from pydantic import BaseModel, EmailStr 

18from sqlalchemy.orm import Session 

19 

20# First-Party 

21from mcpgateway.db import SessionLocal 

22from mcpgateway.routers.email_auth import create_access_token, get_client_ip, get_user_agent 

23from mcpgateway.schemas import AuthenticationResponse, EmailUserResponse 

24from mcpgateway.services.email_auth_service import EmailAuthService 

25from mcpgateway.services.logging_service import LoggingService 

26 

27# Initialize logging 

28logging_service = LoggingService() 

29logger = logging_service.get_logger(__name__) 

30 

31# Create router 

32auth_router = APIRouter(prefix="/auth", tags=["Authentication"]) 

33 

34 

35def get_db(): 

36 """Database dependency. 

37 

38 Commits the transaction on successful completion to avoid implicit rollbacks 

39 for read-only operations. Rolls back explicitly on exception. 

40 

41 Yields: 

42 Session: SQLAlchemy database session 

43 

44 Raises: 

45 Exception: Re-raises any exception after rolling back the transaction. 

46 

47 Examples: 

48 >>> db_gen = get_db() 

49 >>> db = next(db_gen) 

50 >>> hasattr(db, 'close') 

51 True 

52 """ 

53 db = SessionLocal() 

54 try: 

55 yield db 

56 db.commit() 

57 except Exception: 

58 try: 

59 db.rollback() 

60 except Exception: 

61 try: 

62 db.invalidate() 

63 except Exception: 

64 pass # nosec B110 - Best effort cleanup on connection failure 

65 raise 

66 finally: 

67 db.close() 

68 

69 

70class LoginRequest(BaseModel): 

71 """Login request supporting both email and username formats. 

72 

73 Attributes: 

74 email: User email address (can also accept 'username' field for compatibility) 

75 password: User password 

76 """ 

77 

78 email: Optional[EmailStr] = None 

79 username: Optional[str] = None # For compatibility 

80 password: str 

81 

82 def get_email(self) -> str: 

83 """Get email from either email or username field. 

84 

85 Returns: 

86 str: Email address to use for authentication 

87 

88 Raises: 

89 ValueError: If neither email nor username is provided 

90 

91 Examples: 

92 >>> req = LoginRequest(email="test@example.com", password="pass") 

93 >>> req.get_email() 

94 'test@example.com' 

95 >>> req = LoginRequest(username="user@domain.com", password="pass") 

96 >>> req.get_email() 

97 'user@domain.com' 

98 >>> req = LoginRequest(username="invaliduser", password="pass") 

99 >>> req.get_email() # doctest: +IGNORE_EXCEPTION_DETAIL 

100 Traceback (most recent call last): 

101 ValueError: Username format not supported. Please use email address. 

102 >>> req = LoginRequest(password="pass") 

103 >>> req.get_email() # doctest: +IGNORE_EXCEPTION_DETAIL 

104 Traceback (most recent call last): 

105 ValueError: Either email or username must be provided 

106 """ 

107 if self.email: 

108 return str(self.email) 

109 elif self.username: 

110 # Support both email format and plain username 

111 if "@" in self.username: 

112 return self.username 

113 else: 

114 # If it's a plain username, we can't authenticate 

115 # (since we're email-based system) 

116 raise ValueError("Username format not supported. Please use email address.") 

117 else: 

118 raise ValueError("Either email or username must be provided") 

119 

120 

121@auth_router.post("/login", response_model=AuthenticationResponse) 

122async def login(login_request: LoginRequest, request: Request, db: Session = Depends(get_db)): 

123 """Authenticate user and return session JWT token. 

124 

125 This endpoint provides Tier 1 authentication for session-based access. 

126 The returned JWT token should be used for UI access and API key management. 

127 

128 Args: 

129 login_request: Login credentials (email/username + password) 

130 request: FastAPI request object 

131 db: Database session 

132 

133 Returns: 

134 AuthenticationResponse: Session JWT token and user info 

135 

136 Raises: 

137 HTTPException: If authentication fails 

138 

139 Examples: 

140 Email format (recommended): 

141 { 

142 "email": "admin@example.com", 

143 "password": "ChangeMe_12345678$" 

144 } 

145 

146 Username format (compatibility): 

147 { 

148 "username": "admin@example.com", 

149 "password": "ChangeMe_12345678$" 

150 } 

151 """ 

152 auth_service = EmailAuthService(db) 

153 ip_address = get_client_ip(request) 

154 user_agent = get_user_agent(request) 

155 

156 try: 

157 # Extract email from request 

158 email = login_request.get_email() 

159 

160 # Authenticate user 

161 user = await auth_service.authenticate_user(email=email, password=login_request.password, ip_address=ip_address, user_agent=user_agent) 

162 

163 if not user: 

164 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password") 

165 

166 # Create session JWT token (Tier 1 authentication) 

167 access_token, expires_in = await create_access_token(user) 

168 

169 logger.info(f"User {email} authenticated successfully") 

170 

171 # Return session token for UI access and API key management 

172 return AuthenticationResponse( 

173 access_token=access_token, token_type="bearer", expires_in=expires_in, user=EmailUserResponse.from_email_user(user) 

174 ) # nosec B106 - OAuth2 token type, not a password 

175 

176 except ValueError as e: 

177 logger.warning(f"Login validation error: {e}") 

178 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) 

179 except Exception as e: 

180 logger.error(f"Login error for {login_request.email or login_request.username}: {e}") 

181 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Authentication service error")