Coverage for mcpgateway / routers / auth.py: 100%
58 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/routers/auth.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Main Authentication Router.
8This module provides simplified authentication endpoints for both session and API key management.
9It serves as the primary entry point for authentication workflows.
10"""
12# Standard
13from typing import Optional
15# Third-Party
16from fastapi import APIRouter, Depends, HTTPException, Request, status
17from pydantic import BaseModel, EmailStr
18from sqlalchemy.orm import Session
20# First-Party
21from mcpgateway.db import SessionLocal
22from mcpgateway.routers.email_auth import create_access_token, get_client_ip, get_user_agent
23from mcpgateway.schemas import AuthenticationResponse, EmailUserResponse
24from mcpgateway.services.email_auth_service import EmailAuthService
25from mcpgateway.services.logging_service import LoggingService
27# Initialize logging
28logging_service = LoggingService()
29logger = logging_service.get_logger(__name__)
31# Create router
32auth_router = APIRouter(prefix="/auth", tags=["Authentication"])
35def get_db():
36 """Database dependency.
38 Commits the transaction on successful completion to avoid implicit rollbacks
39 for read-only operations. Rolls back explicitly on exception.
41 Yields:
42 Session: SQLAlchemy database session
44 Raises:
45 Exception: Re-raises any exception after rolling back the transaction.
47 Examples:
48 >>> db_gen = get_db()
49 >>> db = next(db_gen)
50 >>> hasattr(db, 'close')
51 True
52 """
53 db = SessionLocal()
54 try:
55 yield db
56 db.commit()
57 except Exception:
58 try:
59 db.rollback()
60 except Exception:
61 try:
62 db.invalidate()
63 except Exception:
64 pass # nosec B110 - Best effort cleanup on connection failure
65 raise
66 finally:
67 db.close()
70class LoginRequest(BaseModel):
71 """Login request supporting both email and username formats.
73 Attributes:
74 email: User email address (can also accept 'username' field for compatibility)
75 password: User password
76 """
78 email: Optional[EmailStr] = None
79 username: Optional[str] = None # For compatibility
80 password: str
82 def get_email(self) -> str:
83 """Get email from either email or username field.
85 Returns:
86 str: Email address to use for authentication
88 Raises:
89 ValueError: If neither email nor username is provided
91 Examples:
92 >>> req = LoginRequest(email="test@example.com", password="pass")
93 >>> req.get_email()
94 'test@example.com'
95 >>> req = LoginRequest(username="user@domain.com", password="pass")
96 >>> req.get_email()
97 'user@domain.com'
98 >>> req = LoginRequest(username="invaliduser", password="pass")
99 >>> req.get_email() # doctest: +IGNORE_EXCEPTION_DETAIL
100 Traceback (most recent call last):
101 ValueError: Username format not supported. Please use email address.
102 >>> req = LoginRequest(password="pass")
103 >>> req.get_email() # doctest: +IGNORE_EXCEPTION_DETAIL
104 Traceback (most recent call last):
105 ValueError: Either email or username must be provided
106 """
107 if self.email:
108 return str(self.email)
109 elif self.username:
110 # Support both email format and plain username
111 if "@" in self.username:
112 return self.username
113 else:
114 # If it's a plain username, we can't authenticate
115 # (since we're email-based system)
116 raise ValueError("Username format not supported. Please use email address.")
117 else:
118 raise ValueError("Either email or username must be provided")
121@auth_router.post("/login", response_model=AuthenticationResponse)
122async def login(login_request: LoginRequest, request: Request, db: Session = Depends(get_db)):
123 """Authenticate user and return session JWT token.
125 This endpoint provides Tier 1 authentication for session-based access.
126 The returned JWT token should be used for UI access and API key management.
128 Args:
129 login_request: Login credentials (email/username + password)
130 request: FastAPI request object
131 db: Database session
133 Returns:
134 AuthenticationResponse: Session JWT token and user info
136 Raises:
137 HTTPException: If authentication fails
139 Examples:
140 Email format (recommended):
141 {
142 "email": "admin@example.com",
143 "password": "ChangeMe_12345678$"
144 }
146 Username format (compatibility):
147 {
148 "username": "admin@example.com",
149 "password": "ChangeMe_12345678$"
150 }
151 """
152 auth_service = EmailAuthService(db)
153 ip_address = get_client_ip(request)
154 user_agent = get_user_agent(request)
156 try:
157 # Extract email from request
158 email = login_request.get_email()
160 # Authenticate user
161 user = await auth_service.authenticate_user(email=email, password=login_request.password, ip_address=ip_address, user_agent=user_agent)
163 if not user:
164 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password")
166 # Create session JWT token (Tier 1 authentication)
167 access_token, expires_in = await create_access_token(user)
169 logger.info(f"User {email} authenticated successfully")
171 # Return session token for UI access and API key management
172 return AuthenticationResponse(
173 access_token=access_token, token_type="bearer", expires_in=expires_in, user=EmailUserResponse.from_email_user(user)
174 ) # nosec B106 - OAuth2 token type, not a password
176 except ValueError as e:
177 logger.warning(f"Login validation error: {e}")
178 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
179 except Exception as e:
180 logger.error(f"Login error for {login_request.email or login_request.username}: {e}")
181 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Authentication service error")