Coverage for mcpgateway / routers / auth.py: 100%
63 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/routers/auth.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Main Authentication Router.
8This module provides simplified authentication endpoints for both session and API key management.
9It serves as the primary entry point for authentication workflows.
10"""
12# Standard
13from typing import Optional
15# Third-Party
16from fastapi import APIRouter, Depends, HTTPException, Request, status
17from pydantic import BaseModel, EmailStr
18from sqlalchemy.orm import Session
20# First-Party
21from mcpgateway.config import settings
22from mcpgateway.db import SessionLocal
23from mcpgateway.routers.email_auth import create_access_token, get_client_ip, get_user_agent
24from mcpgateway.schemas import AuthenticationResponse, EmailUserResponse
25from mcpgateway.services.email_auth_service import EmailAuthService
26from mcpgateway.services.logging_service import LoggingService
28# Initialize logging
29logging_service = LoggingService()
30logger = logging_service.get_logger(__name__)
32# Create router
33auth_router = APIRouter(prefix="/auth", tags=["Authentication"])
36def get_db():
37 """Database dependency.
39 Commits the transaction on successful completion to avoid implicit rollbacks
40 for read-only operations. Rolls back explicitly on exception.
42 Yields:
43 Session: SQLAlchemy database session
45 Raises:
46 Exception: Re-raises any exception after rolling back the transaction.
48 Examples:
49 >>> db_gen = get_db()
50 >>> db = next(db_gen)
51 >>> hasattr(db, 'close')
52 True
53 """
54 db = SessionLocal()
55 try:
56 yield db
57 db.commit()
58 except Exception:
59 try:
60 db.rollback()
61 except Exception:
62 try:
63 db.invalidate()
64 except Exception:
65 pass # nosec B110 - Best effort cleanup on connection failure
66 raise
67 finally:
68 db.close()
71class LoginRequest(BaseModel):
72 """Login request supporting both email and username formats.
74 Attributes:
75 email: User email address (can also accept 'username' field for compatibility)
76 password: User password
77 """
79 email: Optional[EmailStr] = None
80 username: Optional[str] = None # For compatibility
81 password: str
83 def get_email(self) -> str:
84 """Get email from either email or username field.
86 Returns:
87 str: Email address to use for authentication
89 Raises:
90 ValueError: If neither email nor username is provided
92 Examples:
93 >>> req = LoginRequest(email="test@example.com", password="pass")
94 >>> req.get_email()
95 'test@example.com'
96 >>> req = LoginRequest(username="user@domain.com", password="pass")
97 >>> req.get_email()
98 'user@domain.com'
99 >>> req = LoginRequest(username="invaliduser", password="pass")
100 >>> req.get_email() # doctest: +IGNORE_EXCEPTION_DETAIL
101 Traceback (most recent call last):
102 ValueError: Username format not supported. Please use email address.
103 >>> req = LoginRequest(password="pass")
104 >>> req.get_email() # doctest: +IGNORE_EXCEPTION_DETAIL
105 Traceback (most recent call last):
106 ValueError: Either email or username must be provided
107 """
108 if self.email:
109 return str(self.email)
110 elif self.username:
111 # Support both email format and plain username
112 if "@" in self.username:
113 return self.username
114 else:
115 # If it's a plain username, we can't authenticate
116 # (since we're email-based system)
117 raise ValueError("Username format not supported. Please use email address.")
118 else:
119 raise ValueError("Either email or username must be provided")
122@auth_router.post("/login", response_model=AuthenticationResponse)
123async def login(login_request: LoginRequest, request: Request, db: Session = Depends(get_db)):
124 """Authenticate user and return session JWT token.
126 This endpoint provides Tier 1 authentication for session-based access.
127 The returned JWT token should be used for UI access and API key management.
129 Args:
130 login_request: Login credentials (email/username + password)
131 request: FastAPI request object
132 db: Database session
134 Returns:
135 AuthenticationResponse: Session JWT token and user info
137 Raises:
138 HTTPException: If authentication fails
140 Examples:
141 Email format (recommended):
142 {
143 "email": "admin@example.com",
144 "password": "ChangeMe_12345678$"
145 }
147 Username format (compatibility):
148 {
149 "username": "admin@example.com",
150 "password": "ChangeMe_12345678$"
151 }
152 """
153 auth_service = EmailAuthService(db)
154 ip_address = get_client_ip(request)
155 user_agent = get_user_agent(request)
157 try:
158 # Extract email from request
159 email = login_request.get_email()
161 # Authenticate user
162 user = await auth_service.authenticate_user(email=email, password=login_request.password, ip_address=ip_address, user_agent=user_agent)
164 if not user:
165 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password")
167 if settings.sso_enabled and settings.sso_preserve_admin_auth and not bool(getattr(user, "is_admin", False)):
168 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Password authentication is restricted to admin accounts while SSO is enabled.")
170 # Create session JWT token (Tier 1 authentication)
171 access_token, expires_in = await create_access_token(user)
173 logger.info(f"User {email} authenticated successfully")
175 # Return session token for UI access and API key management
176 return AuthenticationResponse(
177 access_token=access_token, token_type="bearer", expires_in=expires_in, user=EmailUserResponse.from_email_user(user)
178 ) # nosec B106 - OAuth2 token type, not a password
180 except HTTPException:
181 raise
182 except ValueError as e:
183 logger.warning(f"Login validation error: {e}")
184 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
185 except Exception as e:
186 logger.error(f"Login error for {login_request.email or login_request.username}: {e}")
187 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Authentication service error")