Coverage for mcpgateway / utils / verify_credentials.py: 100%
186 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/utils/verify_credentials.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Authentication verification utilities for MCP Gateway.
8This module provides JWT and Basic authentication verification functions
9for securing API endpoints. It supports authentication via Authorization
10headers and cookies.
11Examples:
12 >>> from mcpgateway.utils import verify_credentials as vc
13 >>> from mcpgateway.utils import jwt_config_helper as jch
14 >>> from pydantic import SecretStr
15 >>> class DummySettings:
16 ... jwt_secret_key = 'secret'
17 ... jwt_algorithm = 'HS256'
18 ... jwt_audience = 'mcpgateway-api'
19 ... jwt_issuer = 'mcpgateway'
20 ... jwt_issuer_verification = True
21 ... jwt_audience_verification = True
22 ... jwt_public_key_path = ''
23 ... jwt_private_key_path = ''
24 ... basic_auth_user = 'user'
25 ... basic_auth_password = SecretStr('pass')
26 ... auth_required = True
27 ... require_token_expiration = False
28 ... require_jti = False
29 ... validate_token_environment = False
30 ... docs_allow_basic_auth = False
31 >>> vc.settings = DummySettings()
32 >>> jch.settings = DummySettings()
33 >>> import jwt
34 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'secret', algorithm='HS256')
35 >>> import asyncio
36 >>> asyncio.run(vc.verify_jwt_token(token))['sub'] == 'alice'
37 True
38 >>> payload = asyncio.run(vc.verify_credentials(token))
39 >>> payload['token'] == token
40 True
41 >>> from fastapi.security import HTTPBasicCredentials
42 >>> creds = HTTPBasicCredentials(username='user', password='pass')
43 >>> asyncio.run(vc.verify_basic_credentials(creds)) == 'user'
44 True
45 >>> creds_bad = HTTPBasicCredentials(username='user', password='wrong')
46 >>> try:
47 ... asyncio.run(vc.verify_basic_credentials(creds_bad))
48 ... except Exception as e:
49 ... print('error')
50 error
51"""
53# Standard
54from base64 import b64decode
55import binascii
56from typing import Optional
58# Third-Party
59from fastapi import Cookie, Depends, HTTPException, Request, status
60from fastapi.security import HTTPAuthorizationCredentials, HTTPBasic, HTTPBasicCredentials, HTTPBearer
61from fastapi.security.utils import get_authorization_scheme_param
62import jwt
64# First-Party
65from mcpgateway.config import settings
66from mcpgateway.services.logging_service import LoggingService
67from mcpgateway.utils.jwt_config_helper import validate_jwt_algo_and_keys
69basic_security = HTTPBasic(auto_error=False)
70security = HTTPBearer(auto_error=False)
72# Initialize logging service first
73logging_service = LoggingService()
74logger = logging_service.get_logger(__name__)
77async def verify_jwt_token(token: str) -> dict:
78 """Verify and decode a JWT token in a single pass.
80 Decodes and validates a JWT token using the configured secret key
81 and algorithm from settings. Uses PyJWT's require option for claim
82 enforcement instead of a separate unverified decode.
84 Note:
85 With single-pass decoding, signature validation occurs before
86 claim validation. An invalid signature will result in "Invalid token"
87 error even if the token is also missing required claims.
89 Args:
90 token: The JWT token string to verify.
92 Returns:
93 dict: The decoded token payload containing claims (e.g., user info).
95 Raises:
96 HTTPException: If token is invalid, expired, or missing required claims.
97 """
98 try:
99 validate_jwt_algo_and_keys()
101 # Import the verification key helper
102 # First-Party
103 from mcpgateway.utils.jwt_config_helper import get_jwt_public_key_or_secret
105 options = {
106 "verify_aud": settings.jwt_audience_verification,
107 "verify_iss": settings.jwt_issuer_verification,
108 }
110 if settings.require_token_expiration:
111 options["require"] = ["exp"]
113 decode_kwargs = {
114 "key": get_jwt_public_key_or_secret(),
115 "algorithms": [settings.jwt_algorithm],
116 "options": options,
117 }
119 if settings.jwt_audience_verification:
120 decode_kwargs["audience"] = settings.jwt_audience
122 if settings.jwt_issuer_verification:
123 decode_kwargs["issuer"] = settings.jwt_issuer
125 payload = jwt.decode(token, **decode_kwargs)
127 # Log warning for tokens without expiration (when not required)
128 if not settings.require_token_expiration and "exp" not in payload:
129 logger.warning(f"JWT token without expiration accepted. Consider enabling REQUIRE_TOKEN_EXPIRATION for better security. Token sub: {payload.get('sub', 'unknown')}")
131 # Require JTI if configured
132 if settings.require_jti and "jti" not in payload:
133 raise HTTPException(
134 status_code=status.HTTP_401_UNAUTHORIZED,
135 detail="Token is missing required JTI claim. Set REQUIRE_JTI=false to allow.",
136 headers={"WWW-Authenticate": "Bearer"},
137 )
139 # Log warning for tokens without JTI (when not required)
140 if not settings.require_jti and "jti" not in payload:
141 logger.warning(f"JWT token without JTI accepted. Token cannot be revoked. Consider enabling REQUIRE_JTI for better security. Token sub: {payload.get('sub', 'unknown')}")
143 # Validate environment claim if configured (reject mismatched, allow missing for backward compatibility)
144 if settings.validate_token_environment:
145 token_env = payload.get("env")
146 if token_env is not None and token_env != settings.environment:
147 raise HTTPException(
148 status_code=status.HTTP_401_UNAUTHORIZED,
149 detail=f"Token environment mismatch: token is for '{token_env}', server is '{settings.environment}'",
150 headers={"WWW-Authenticate": "Bearer"},
151 )
153 return payload
155 except jwt.MissingRequiredClaimError:
156 raise HTTPException(
157 status_code=status.HTTP_401_UNAUTHORIZED,
158 detail="Token is missing required expiration claim. Set REQUIRE_TOKEN_EXPIRATION=false to allow.",
159 headers={"WWW-Authenticate": "Bearer"},
160 )
161 except jwt.ExpiredSignatureError:
162 raise HTTPException(
163 status_code=status.HTTP_401_UNAUTHORIZED,
164 detail="Token has expired",
165 headers={"WWW-Authenticate": "Bearer"},
166 )
167 except jwt.PyJWTError:
168 raise HTTPException(
169 status_code=status.HTTP_401_UNAUTHORIZED,
170 detail="Invalid token",
171 headers={"WWW-Authenticate": "Bearer"},
172 )
175async def verify_jwt_token_cached(token: str, request: Optional[Request] = None) -> dict:
176 """Verify JWT token with request-level caching.
178 If a request object is provided and the token has already been verified
179 for this request, returns the cached payload. Otherwise, performs
180 verification and caches the result in request.state.
182 Args:
183 token: JWT token string to verify
184 request: Optional FastAPI/Starlette request for request-level caching.
185 Must have a 'state' attribute to enable caching.
187 Returns:
188 dict: Decoded and verified JWT payload
190 Raises:
191 HTTPException: If token is invalid, expired, or missing required claims.
192 """
193 # Check request.state cache first (safely handle non-Request objects)
194 if request is not None and hasattr(request, "state"):
195 cached = getattr(request.state, "_jwt_verified_payload", None)
196 # Verify cache is a valid tuple of (token, payload) before unpacking
197 if cached is not None and isinstance(cached, tuple) and len(cached) == 2:
198 cached_token, cached_payload = cached
199 if cached_token == token:
200 return cached_payload
202 # Verify token (single decode)
203 payload = await verify_jwt_token(token)
205 # Cache in request.state for reuse across middleware
206 if request is not None and hasattr(request, "state"):
207 request.state._jwt_verified_payload = (token, payload)
209 return payload
212async def verify_credentials(token: str) -> dict:
213 """Verify credentials using a JWT token.
215 A wrapper around verify_jwt_token that adds the original token
216 to the decoded payload for reference.
218 This function uses verify_jwt_token internally which may raise exceptions.
220 Args:
221 token: The JWT token string to verify.
223 Returns:
224 dict: The validated token payload with the original token added
225 under the 'token' key.
227 Examples:
228 >>> from mcpgateway.utils import verify_credentials as vc
229 >>> from mcpgateway.utils import jwt_config_helper as jch
230 >>> from pydantic import SecretStr
231 >>> class DummySettings:
232 ... jwt_secret_key = 'secret'
233 ... jwt_algorithm = 'HS256'
234 ... jwt_audience = 'mcpgateway-api'
235 ... jwt_issuer = 'mcpgateway'
236 ... jwt_audience_verification = True
237 ... jwt_issuer_verification = True
238 ... jwt_public_key_path = ''
239 ... jwt_private_key_path = ''
240 ... basic_auth_user = 'user'
241 ... basic_auth_password = SecretStr('pass')
242 ... auth_required = True
243 ... require_token_expiration = False
244 ... require_jti = False
245 ... validate_token_environment = False
246 ... docs_allow_basic_auth = False
247 >>> vc.settings = DummySettings()
248 >>> jch.settings = DummySettings()
249 >>> import jwt
250 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'secret', algorithm='HS256')
251 >>> import asyncio
252 >>> payload = asyncio.run(vc.verify_credentials(token))
253 >>> payload['token'] == token
254 True
255 """
256 payload = await verify_jwt_token(token)
257 payload["token"] = token
258 return payload
261async def verify_credentials_cached(token: str, request: Optional[Request] = None) -> dict:
262 """Verify credentials using a JWT token with request-level caching.
264 A wrapper around verify_jwt_token_cached that adds the original token
265 to the decoded payload for reference.
267 Args:
268 token: The JWT token string to verify.
269 request: Optional FastAPI/Starlette request for request-level caching.
271 Returns:
272 dict: The validated token payload with the original token added
273 under the 'token' key. Returns a copy to avoid mutating cached payload.
274 """
275 payload = await verify_jwt_token_cached(token, request)
276 # Return a copy with token added to avoid mutating the cached payload
277 return {**payload, "token": token}
280async def require_auth(request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), jwt_token: Optional[str] = Cookie(default=None)) -> str | dict:
281 """Require authentication via JWT token or proxy headers.
283 FastAPI dependency that checks for authentication via:
284 1. Proxy headers (if mcp_client_auth_enabled=false and trust_proxy_auth=true)
285 2. JWT token in Authorization header (Bearer scheme)
286 3. JWT token in cookies
288 If authentication is required but no token is provided, raises an HTTP 401 error.
290 Args:
291 request: The FastAPI request object for accessing headers.
292 credentials: HTTP Authorization credentials from the request header.
293 jwt_token: JWT token from cookies.
295 Returns:
296 str | dict: The verified credentials payload if authenticated,
297 proxy user if proxy auth enabled, or "anonymous" if authentication is not required.
299 Raises:
300 HTTPException: 401 status if authentication is required but no valid
301 token is provided.
303 Examples:
304 >>> from mcpgateway.utils import verify_credentials as vc
305 >>> from mcpgateway.utils import jwt_config_helper as jch
306 >>> from pydantic import SecretStr
307 >>> class DummySettings:
308 ... jwt_secret_key = 'secret'
309 ... jwt_algorithm = 'HS256'
310 ... jwt_audience = 'mcpgateway-api'
311 ... jwt_issuer = 'mcpgateway'
312 ... jwt_audience_verification = True
313 ... jwt_issuer_verification = True
314 ... jwt_public_key_path = ''
315 ... jwt_private_key_path = ''
316 ... basic_auth_user = 'user'
317 ... basic_auth_password = SecretStr('pass')
318 ... auth_required = True
319 ... mcp_client_auth_enabled = True
320 ... trust_proxy_auth = False
321 ... proxy_user_header = 'X-Authenticated-User'
322 ... require_token_expiration = False
323 ... require_jti = False
324 ... validate_token_environment = False
325 ... docs_allow_basic_auth = False
326 >>> vc.settings = DummySettings()
327 >>> jch.settings = DummySettings()
328 >>> import jwt
329 >>> from fastapi.security import HTTPAuthorizationCredentials
330 >>> from fastapi import Request
331 >>> import asyncio
333 Test with valid credentials in header:
334 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'secret', algorithm='HS256')
335 >>> creds = HTTPAuthorizationCredentials(scheme='Bearer', credentials=token)
336 >>> req = Request(scope={'type': 'http', 'headers': []})
337 >>> result = asyncio.run(vc.require_auth(request=req, credentials=creds, jwt_token=None))
338 >>> result['sub'] == 'alice'
339 True
341 Test with valid token in cookie:
342 >>> result = asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=token))
343 >>> result['sub'] == 'alice'
344 True
346 Test with auth required but no token:
347 >>> try:
348 ... asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=None))
349 ... except vc.HTTPException as e:
350 ... print(e.status_code, e.detail)
351 401 Not authenticated
353 Test with auth not required:
354 >>> vc.settings.auth_required = False
355 >>> result = asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=None))
356 >>> result
357 'anonymous'
358 >>> vc.settings.auth_required = True
359 """
360 # If MCP client auth is disabled and proxy auth is trusted, use proxy headers
361 if not settings.mcp_client_auth_enabled:
362 if settings.trust_proxy_auth:
363 # Extract user from proxy header
364 proxy_user = request.headers.get(settings.proxy_user_header)
365 if proxy_user:
366 return {"sub": proxy_user, "source": "proxy", "token": None} # nosec B105 - None is not a password
367 # No proxy header - check auth_required (matches RBAC/WebSocket behavior)
368 if settings.auth_required:
369 raise HTTPException(
370 status_code=status.HTTP_401_UNAUTHORIZED,
371 detail="Proxy authentication header required",
372 headers={"WWW-Authenticate": "Bearer"},
373 )
374 return "anonymous"
375 else:
376 # Warning: MCP auth disabled without proxy trust - security risk!
377 # This case is already warned about in config validation
378 if settings.auth_required:
379 raise HTTPException(
380 status_code=status.HTTP_401_UNAUTHORIZED,
381 detail="Authentication required but no auth method configured",
382 headers={"WWW-Authenticate": "Bearer"},
383 )
384 return "anonymous"
386 # Standard JWT authentication flow - prioritize manual cookie reading
387 token = None
389 # 1. First try manual cookie reading (most reliable)
390 if hasattr(request, "cookies") and request.cookies:
391 manual_token = request.cookies.get("jwt_token")
392 if manual_token:
393 token = manual_token
395 # 2. Then try Authorization header
396 if not token and credentials and credentials.credentials:
397 token = credentials.credentials
399 # 3. Finally try FastAPI Cookie dependency (fallback)
400 if not token and jwt_token:
401 token = jwt_token
403 if settings.auth_required and not token:
404 raise HTTPException(
405 status_code=status.HTTP_401_UNAUTHORIZED,
406 detail="Not authenticated",
407 headers={"WWW-Authenticate": "Bearer"},
408 )
409 return await verify_credentials_cached(token, request) if token else "anonymous"
412async def verify_basic_credentials(credentials: HTTPBasicCredentials) -> str:
413 """Verify HTTP Basic authentication credentials.
415 Validates the provided username and password against the configured
416 basic auth credentials in settings.
418 Args:
419 credentials: HTTP Basic credentials containing username and password.
421 Returns:
422 str: The authenticated username if credentials are valid.
424 Raises:
425 HTTPException: 401 status if credentials are invalid.
427 Examples:
428 >>> from mcpgateway.utils import verify_credentials as vc
429 >>> from pydantic import SecretStr
430 >>> class DummySettings:
431 ... jwt_secret_key = 'secret'
432 ... jwt_algorithm = 'HS256'
433 ... jwt_audience = 'mcpgateway-api'
434 ... jwt_issuer = 'mcpgateway'
435 ... jwt_audience_verification = True
436 ... jwt_issuer_verification = True
437 ... basic_auth_user = 'user'
438 ... basic_auth_password = SecretStr('pass')
439 ... auth_required = True
440 ... docs_allow_basic_auth = False
441 >>> vc.settings = DummySettings()
442 >>> from fastapi.security import HTTPBasicCredentials
443 >>> creds = HTTPBasicCredentials(username='user', password='pass')
444 >>> import asyncio
445 >>> asyncio.run(vc.verify_basic_credentials(creds)) == 'user'
446 True
447 >>> creds_bad = HTTPBasicCredentials(username='user', password='wrong')
448 >>> try:
449 ... asyncio.run(vc.verify_basic_credentials(creds_bad))
450 ... except Exception as e:
451 ... print('error')
452 error
453 """
454 is_valid_user = credentials.username == settings.basic_auth_user
455 is_valid_pass = credentials.password == settings.basic_auth_password.get_secret_value()
457 if not (is_valid_user and is_valid_pass):
458 raise HTTPException(
459 status_code=status.HTTP_401_UNAUTHORIZED,
460 detail="Invalid credentials",
461 headers={"WWW-Authenticate": "Basic"},
462 )
463 return credentials.username
466async def require_basic_auth(credentials: HTTPBasicCredentials = Depends(basic_security)) -> str:
467 """Require valid HTTP Basic authentication.
469 FastAPI dependency that enforces Basic authentication when enabled.
470 Returns the authenticated username or "anonymous" if auth is not required.
472 Args:
473 credentials: HTTP Basic credentials provided by the client.
475 Returns:
476 str: The authenticated username or "anonymous" if auth is not required.
478 Raises:
479 HTTPException: 401 status if authentication is required but no valid
480 credentials are provided.
482 Examples:
483 >>> from mcpgateway.utils import verify_credentials as vc
484 >>> from pydantic import SecretStr
485 >>> class DummySettings:
486 ... jwt_secret_key = 'secret'
487 ... jwt_algorithm = 'HS256'
488 ... jwt_audience = 'mcpgateway-api'
489 ... jwt_issuer = 'mcpgateway'
490 ... jwt_audience_verification = True
491 ... jwt_issuer_verification = True
492 ... basic_auth_user = 'user'
493 ... basic_auth_password = SecretStr('pass')
494 ... auth_required = True
495 ... docs_allow_basic_auth = False
496 >>> vc.settings = DummySettings()
497 >>> from fastapi.security import HTTPBasicCredentials
498 >>> import asyncio
500 Test with valid credentials:
501 >>> creds = HTTPBasicCredentials(username='user', password='pass')
502 >>> asyncio.run(vc.require_basic_auth(creds))
503 'user'
505 Test with auth required but no credentials:
506 >>> try:
507 ... asyncio.run(vc.require_basic_auth(None))
508 ... except vc.HTTPException as e:
509 ... print(e.status_code, e.detail)
510 401 Not authenticated
512 Test with auth not required:
513 >>> vc.settings.auth_required = False
514 >>> asyncio.run(vc.require_basic_auth(None))
515 'anonymous'
516 >>> vc.settings.auth_required = True
517 """
518 if settings.auth_required:
519 if not credentials:
520 raise HTTPException(
521 status_code=status.HTTP_401_UNAUTHORIZED,
522 detail="Not authenticated",
523 headers={"WWW-Authenticate": "Basic"},
524 )
525 return await verify_basic_credentials(credentials)
526 return "anonymous"
529async def require_docs_basic_auth(auth_header: str) -> str:
530 """Dedicated handler for HTTP Basic Auth for documentation endpoints only.
532 This function is ONLY intended for /docs, /redoc, or similar endpoints, and is enabled
533 via the settings.docs_allow_basic_auth flag. It should NOT be used for general API authentication.
535 Args:
536 auth_header: Raw Authorization header value (e.g. "Basic username:password").
538 Returns:
539 str: The authenticated username if credentials are valid.
541 Raises:
542 HTTPException: If credentials are invalid or malformed.
543 ValueError: If the basic auth format is invalid (missing colon).
545 Examples:
546 >>> from mcpgateway.utils import verify_credentials as vc
547 >>> from pydantic import SecretStr
548 >>> class DummySettings:
549 ... jwt_secret_key = 'secret'
550 ... jwt_algorithm = 'HS256'
551 ... jwt_audience = 'mcpgateway-api'
552 ... jwt_issuer = 'mcpgateway'
553 ... jwt_audience_verification = True
554 ... jwt_issuer_verification = True
555 ... basic_auth_user = 'user'
556 ... basic_auth_password = SecretStr('pass')
557 ... auth_required = True
558 ... require_token_expiration = False
559 ... require_jti = False
560 ... validate_token_environment = False
561 ... docs_allow_basic_auth = True
562 >>> vc.settings = DummySettings()
563 >>> import base64, asyncio
565 Test with properly encoded credentials:
566 >>> userpass = base64.b64encode(b'user:pass').decode()
567 >>> auth_header = f'Basic {userpass}'
568 >>> asyncio.run(vc.require_docs_basic_auth(auth_header))
569 'user'
571 Test with different valid credentials:
572 >>> valid_creds = base64.b64encode(b'user:pass').decode()
573 >>> valid_header = f'Basic {valid_creds}'
574 >>> result = asyncio.run(vc.require_docs_basic_auth(valid_header))
575 >>> result == 'user'
576 True
578 Test with invalid password:
579 >>> badpass = base64.b64encode(b'user:wrong').decode()
580 >>> bad_header = f'Basic {badpass}'
581 >>> try:
582 ... asyncio.run(vc.require_docs_basic_auth(bad_header))
583 ... except vc.HTTPException as e:
584 ... e.status_code == 401
585 True
587 Test with malformed base64 (no colon):
588 >>> malformed = base64.b64encode(b'userpass').decode()
589 >>> malformed_header = f'Basic {malformed}'
590 >>> try:
591 ... asyncio.run(vc.require_docs_basic_auth(malformed_header))
592 ... except vc.HTTPException as e:
593 ... e.status_code == 401
594 True
596 Test with invalid base64 encoding:
597 >>> invalid_header = 'Basic invalid_base64!'
598 >>> try:
599 ... asyncio.run(vc.require_docs_basic_auth(invalid_header))
600 ... except vc.HTTPException as e:
601 ... 'Invalid basic auth credentials' in e.detail
602 True
604 Test when docs_allow_basic_auth is disabled:
605 >>> vc.settings.docs_allow_basic_auth = False
606 >>> try:
607 ... asyncio.run(vc.require_docs_basic_auth(auth_header))
608 ... except vc.HTTPException as e:
609 ... 'not allowed' in e.detail
610 True
611 >>> vc.settings.docs_allow_basic_auth = True
613 Test with non-Basic auth scheme:
614 >>> bearer_header = 'Bearer eyJhbGciOiJIUzI1NiJ9...'
615 >>> try:
616 ... asyncio.run(vc.require_docs_basic_auth(bearer_header))
617 ... except vc.HTTPException as e:
618 ... e.status_code == 401
619 True
621 Test with empty credentials part:
622 >>> empty_header = 'Basic '
623 >>> try:
624 ... asyncio.run(vc.require_docs_basic_auth(empty_header))
625 ... except vc.HTTPException as e:
626 ... 'not allowed' in e.detail
627 True
629 Test with Unicode decode error:
630 >>> from base64 import b64encode
631 >>> bad_bytes = bytes([0xff, 0xfe]) # Invalid UTF-8 bytes
632 >>> bad_unicode = b64encode(bad_bytes).decode()
633 >>> unicode_header = f'Basic {bad_unicode}'
634 >>> try:
635 ... asyncio.run(vc.require_docs_basic_auth(unicode_header))
636 ... except vc.HTTPException as e:
637 ... 'Invalid basic auth credentials' in e.detail
638 True
639 """
640 scheme, param = get_authorization_scheme_param(auth_header)
641 if scheme.lower() == "basic" and param and settings.docs_allow_basic_auth:
642 try:
643 data = b64decode(param).decode("ascii")
644 username, separator, password = data.partition(":")
645 if not separator:
646 raise ValueError("Invalid basic auth format")
647 credentials = HTTPBasicCredentials(username=username, password=password)
648 return await require_basic_auth(credentials=credentials)
649 except (ValueError, UnicodeDecodeError, binascii.Error):
650 raise HTTPException(
651 status_code=status.HTTP_401_UNAUTHORIZED,
652 detail="Invalid basic auth credentials",
653 headers={"WWW-Authenticate": "Basic"},
654 )
655 raise HTTPException(
656 status_code=status.HTTP_401_UNAUTHORIZED,
657 detail="Basic authentication not allowed or malformed",
658 headers={"WWW-Authenticate": "Basic"},
659 )
662async def require_docs_auth_override(
663 auth_header: str | None = None,
664 jwt_token: str | None = None,
665) -> str | dict:
666 """Require authentication for docs endpoints, bypassing global auth settings.
668 This function specifically validates JWT tokens for documentation endpoints
669 (/docs, /redoc, /openapi.json) regardless of global authentication settings
670 like mcp_client_auth_enabled or auth_required.
672 Args:
673 auth_header: Raw Authorization header value (e.g. "Bearer eyJhbGciOi...").
674 jwt_token: JWT token from cookies.
676 Returns:
677 str | dict: The decoded JWT payload.
679 Raises:
680 HTTPException: If authentication fails or credentials are invalid.
682 Examples:
683 >>> from mcpgateway.utils import verify_credentials as vc
684 >>> from mcpgateway.utils import jwt_config_helper as jch
685 >>> class DummySettings:
686 ... jwt_secret_key = 'secret'
687 ... jwt_algorithm = 'HS256'
688 ... jwt_audience = 'mcpgateway-api'
689 ... jwt_issuer = 'mcpgateway'
690 ... jwt_audience_verification = True
691 ... jwt_issuer_verification = True
692 ... jwt_public_key_path = ''
693 ... jwt_private_key_path = ''
694 ... docs_allow_basic_auth = False
695 ... require_token_expiration = False
696 ... require_jti = False
697 ... validate_token_environment = False
698 >>> vc.settings = DummySettings()
699 >>> jch.settings = DummySettings()
700 >>> import jwt
701 >>> import asyncio
703 Test with valid JWT:
704 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'secret', algorithm='HS256')
705 >>> auth_header = f'Bearer {token}'
706 >>> result = asyncio.run(vc.require_docs_auth_override(auth_header=auth_header))
707 >>> result['sub'] == 'alice'
708 True
710 Test with no token:
711 >>> try:
712 ... asyncio.run(vc.require_docs_auth_override())
713 ... except vc.HTTPException as e:
714 ... print(e.status_code, e.detail)
715 401 Not authenticated
716 """
717 # Extract token from header or cookie
718 token = jwt_token
719 if auth_header:
720 scheme, param = get_authorization_scheme_param(auth_header)
721 if scheme.lower() == "bearer" and param:
722 token = param
723 elif scheme.lower() == "basic" and param and settings.docs_allow_basic_auth:
724 # Only allow Basic Auth for docs endpoints when explicitly enabled
725 return await require_docs_basic_auth(auth_header)
727 # Always require a token for docs endpoints
728 if not token:
729 raise HTTPException(
730 status_code=status.HTTP_401_UNAUTHORIZED,
731 detail="Not authenticated",
732 headers={"WWW-Authenticate": "Bearer"},
733 )
735 # Validate the JWT token
736 return await verify_credentials(token)
739async def require_auth_override(
740 auth_header: str | None = None,
741 jwt_token: str | None = None,
742 request: Request | None = None,
743) -> str | dict:
744 """Call require_auth manually from middleware without FastAPI dependency injection.
746 This wrapper allows manual authentication verification in contexts where
747 FastAPI's dependency injection is not available (e.g., middleware).
748 It parses the Authorization header and creates the appropriate credentials
749 object before calling require_auth.
751 Args:
752 auth_header: Raw Authorization header value (e.g. "Bearer eyJhbGciOi...").
753 jwt_token: JWT taken from a cookie. If both header and cookie are
754 supplied, the header takes precedence.
755 request: Optional Request object for accessing headers (used for proxy auth).
757 Returns:
758 str | dict: The decoded JWT payload or the string "anonymous",
759 same as require_auth.
761 Raises:
762 HTTPException: If authentication fails or credentials are invalid.
763 ValueError: If basic auth credentials are malformed.
765 Note:
766 This wrapper may propagate HTTPException raised by require_auth,
767 but it does not raise anything on its own.
769 Examples:
770 >>> from mcpgateway.utils import verify_credentials as vc
771 >>> from mcpgateway.utils import jwt_config_helper as jch
772 >>> from pydantic import SecretStr
773 >>> class DummySettings:
774 ... jwt_secret_key = 'secret'
775 ... jwt_algorithm = 'HS256'
776 ... jwt_audience = 'mcpgateway-api'
777 ... jwt_issuer = 'mcpgateway'
778 ... jwt_audience_verification = True
779 ... jwt_issuer_verification = True
780 ... jwt_public_key_path = ''
781 ... jwt_private_key_path = ''
782 ... basic_auth_user = 'user'
783 ... basic_auth_password = SecretStr('pass')
784 ... auth_required = True
785 ... mcp_client_auth_enabled = True
786 ... trust_proxy_auth = False
787 ... proxy_user_header = 'X-Authenticated-User'
788 ... require_token_expiration = False
789 ... require_jti = False
790 ... validate_token_environment = False
791 ... docs_allow_basic_auth = False
792 >>> vc.settings = DummySettings()
793 >>> jch.settings = DummySettings()
794 >>> import jwt
795 >>> import asyncio
797 Test with Bearer token in auth header:
798 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'secret', algorithm='HS256')
799 >>> auth_header = f'Bearer {token}'
800 >>> result = asyncio.run(vc.require_auth_override(auth_header=auth_header))
801 >>> result['sub'] == 'alice'
802 True
804 Test with invalid auth scheme:
805 >>> auth_header = 'Basic dXNlcjpwYXNz' # Base64 encoded user:pass
806 >>> vc.settings.auth_required = False
807 >>> result = asyncio.run(vc.require_auth_override(auth_header=auth_header))
808 >>> result
809 'anonymous'
811 Test with only cookie token:
812 >>> result = asyncio.run(vc.require_auth_override(jwt_token=token))
813 >>> result['sub'] == 'alice'
814 True
816 Test with no auth:
817 >>> result = asyncio.run(vc.require_auth_override())
818 >>> result
819 'anonymous'
820 >>> vc.settings.auth_required = True
821 """
822 # Create a mock request if not provided (for backward compatibility)
823 if request is None:
824 request = Request(scope={"type": "http", "headers": []})
826 credentials = None
827 if auth_header:
828 scheme, param = get_authorization_scheme_param(auth_header)
829 if scheme.lower() == "bearer" and param:
830 credentials = HTTPAuthorizationCredentials(scheme=scheme, credentials=param)
831 elif scheme.lower() == "basic" and param and settings.docs_allow_basic_auth:
832 # Only allow Basic Auth for docs endpoints when explicitly enabled
833 return await require_docs_basic_auth(auth_header)
834 return await require_auth(request=request, credentials=credentials, jwt_token=jwt_token)
837async def require_admin_auth(
838 request: Request,
839 credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
840 jwt_token: Optional[str] = Cookie(None, alias="jwt_token"),
841 basic_credentials: Optional[HTTPBasicCredentials] = Depends(basic_security),
842) -> str:
843 """Require admin authentication supporting both email auth and basic auth.
845 This dependency supports multiple authentication methods:
846 1. Email-based JWT authentication (when EMAIL_AUTH_ENABLED=true)
847 2. Basic authentication (legacy support)
848 3. Proxy headers (if configured)
850 For email auth, the user must have is_admin=true.
851 For basic auth, uses the configured BASIC_AUTH_USER/PASSWORD.
853 Args:
854 request: FastAPI request object
855 credentials: HTTP Authorization credentials
856 jwt_token: JWT token from cookies
857 basic_credentials: HTTP Basic auth credentials
859 Returns:
860 str: Username/email of authenticated admin user
862 Raises:
863 HTTPException: 401 if authentication fails, 403 if user is not admin
864 RedirectResponse: Redirect to login page for browser requests
866 Examples:
867 >>> # This function is typically used as a FastAPI dependency
868 >>> callable(require_admin_auth)
869 True
870 """
871 # First-Party
872 from mcpgateway.config import settings
874 # Try email authentication first if enabled
875 if getattr(settings, "email_auth_enabled", False):
876 try:
877 # First-Party
878 from mcpgateway.db import get_db
879 from mcpgateway.services.email_auth_service import EmailAuthService
881 token = jwt_token
882 if not token and credentials:
883 token = credentials.credentials
885 if token:
886 db_session = next(get_db())
887 try:
888 # Decode and verify JWT token (use cached version for performance)
889 payload = await verify_jwt_token_cached(token, request)
890 username = payload.get("sub") or payload.get("username") # Support both new and legacy formats
892 if username:
893 # Get user from database
894 auth_service = EmailAuthService(db_session)
895 current_user = await auth_service.get_user_by_email(username)
897 if current_user and current_user.is_admin:
898 return current_user.email
899 elif current_user:
900 # User is authenticated but not admin - check if this is a browser request
901 accept_header = request.headers.get("accept", "")
902 if "text/html" in accept_header:
903 # Redirect browser to login page with error
904 root_path = request.scope.get("root_path", "")
905 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Admin privileges required", headers={"Location": f"{root_path}/admin/login?error=admin_required"})
906 else:
907 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required")
908 else:
909 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
910 except Exception:
911 raise Exception
912 finally:
913 db_session.close()
914 except HTTPException as e:
915 # Re-raise HTTP exceptions (403, redirects, etc.)
916 if e.status_code != status.HTTP_401_UNAUTHORIZED:
917 raise
918 # For 401, check if we should redirect browser users
919 accept_header = request.headers.get("accept", "")
920 if "text/html" in accept_header:
921 root_path = request.scope.get("root_path", "")
922 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{root_path}/admin/login"})
923 # If JWT auth fails, fall back to basic auth for backward compatibility
924 except Exception:
925 # If there's any other error with email auth, fall back to basic auth
926 pass # nosec B110 - Intentional fallback to basic auth on any email auth error
928 # Fall back to basic authentication (gated by API_ALLOW_BASIC_AUTH)
929 try:
930 if basic_credentials:
931 # SECURITY: Basic auth for API endpoints is disabled by default
932 if not settings.api_allow_basic_auth:
933 raise HTTPException(
934 status_code=status.HTTP_401_UNAUTHORIZED,
935 detail="Basic authentication is disabled for API endpoints. Use JWT or API tokens instead.",
936 headers={"WWW-Authenticate": "Bearer"},
937 )
938 return await verify_basic_credentials(basic_credentials)
939 else:
940 raise HTTPException(
941 status_code=status.HTTP_401_UNAUTHORIZED,
942 detail="Authentication required",
943 headers={"WWW-Authenticate": "Bearer"},
944 )
945 except HTTPException:
946 # If both methods fail, check if we should redirect browser users to login page
947 if getattr(settings, "email_auth_enabled", False):
948 accept_header = request.headers.get("accept", "")
949 is_htmx = request.headers.get("hx-request") == "true"
950 if "text/html" in accept_header or is_htmx:
951 root_path = request.scope.get("root_path", "")
952 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{root_path}/admin/login"})
953 else:
954 raise HTTPException(
955 status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required. Please login with email/password or use basic auth.", headers={"WWW-Authenticate": "Bearer"}
956 )
957 else:
958 # Re-raise the basic auth error
959 raise