Coverage for mcpgateway / utils / verify_credentials.py: 99%
277 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/utils/verify_credentials.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Authentication verification utilities for ContextForge.
8This module provides JWT and Basic authentication verification functions
9for securing API endpoints. It supports authentication via Authorization
10headers and cookies.
11Examples:
12 >>> from mcpgateway.utils import verify_credentials as vc
13 >>> from mcpgateway.utils import jwt_config_helper as jch
14 >>> from pydantic import SecretStr
15 >>> class DummySettings:
16 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
17 ... jwt_algorithm = 'HS256'
18 ... jwt_audience = 'mcpgateway-api'
19 ... jwt_issuer = 'mcpgateway'
20 ... jwt_issuer_verification = True
21 ... jwt_audience_verification = True
22 ... jwt_public_key_path = ''
23 ... jwt_private_key_path = ''
24 ... basic_auth_user = 'user'
25 ... basic_auth_password = SecretStr('pass')
26 ... auth_required = True
27 ... require_token_expiration = False
28 ... require_jti = False
29 ... validate_token_environment = False
30 ... docs_allow_basic_auth = False
31 >>> vc.settings = DummySettings()
32 >>> jch.settings = DummySettings()
33 >>> import jwt
34 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256')
35 >>> import asyncio
36 >>> asyncio.run(vc.verify_jwt_token(token))['sub'] == 'alice'
37 True
38 >>> payload = asyncio.run(vc.verify_credentials(token))
39 >>> payload['token'] == token
40 True
41 >>> from fastapi.security import HTTPBasicCredentials
42 >>> creds = HTTPBasicCredentials(username='user', password='pass')
43 >>> asyncio.run(vc.verify_basic_credentials(creds)) == 'user'
44 True
45 >>> creds_bad = HTTPBasicCredentials(username='user', password='wrong')
46 >>> try:
47 ... asyncio.run(vc.verify_basic_credentials(creds_bad))
48 ... except Exception as e:
49 ... print('error')
50 error
51"""
53# Standard
54import asyncio
55from base64 import b64decode
56import binascii
57from typing import Any, Optional
59# Third-Party
60from fastapi import Cookie, Depends, HTTPException, Request, status
61from fastapi.security import HTTPAuthorizationCredentials, HTTPBasic, HTTPBasicCredentials, HTTPBearer
62from fastapi.security.utils import get_authorization_scheme_param
63import jwt
65# First-Party
66from mcpgateway.config import settings
67from mcpgateway.services.logging_service import LoggingService
68from mcpgateway.utils.jwt_config_helper import validate_jwt_algo_and_keys
70basic_security = HTTPBasic(auto_error=False)
71security = HTTPBearer(auto_error=False)
73# Initialize logging service first
74logging_service = LoggingService()
75logger = logging_service.get_logger(__name__)
78def is_proxy_auth_trust_active(settings_obj: Any | None = None) -> bool:
79 """Return whether proxy-header trust mode is explicitly active.
81 Args:
82 settings_obj: Optional settings object override (defaults to global settings).
84 Returns:
85 ``True`` when proxy-header trust is explicitly enabled and acknowledged;
86 otherwise ``False``.
87 """
88 current_settings = settings_obj or settings
90 if current_settings.mcp_client_auth_enabled or not current_settings.trust_proxy_auth:
91 return False
93 if getattr(current_settings, "trust_proxy_auth_dangerously", False) is True:
94 return True
96 if not getattr(is_proxy_auth_trust_active, "_warned", False):
97 logger.warning("Ignoring trusted proxy auth because TRUST_PROXY_AUTH_DANGEROUSLY is false while MCP client auth is disabled.")
98 is_proxy_auth_trust_active._warned = True # type: ignore[attr-defined]
99 return False
102def extract_websocket_bearer_token(query_params: Any, headers: Any, *, query_param_warning: Optional[str] = None) -> Optional[str]:
103 """Extract bearer token from WebSocket Authorization headers.
105 Args:
106 query_params: WebSocket query parameters mapping-like object.
107 headers: WebSocket headers mapping-like object.
108 query_param_warning: Optional warning message when legacy query token is detected.
110 Returns:
111 Bearer token value when present, otherwise None.
112 """
113 # Do not accept tokens from query parameters. This avoids leaking bearer
114 # secrets through URL logs/history/proxy telemetry.
115 query = query_params or {}
116 legacy_token = query.get("token") if hasattr(query, "get") else None
117 if legacy_token and query_param_warning:
118 logger.warning(f"{query_param_warning}; token ignored")
120 header_values = headers or {}
121 auth_header = header_values.get("authorization") if hasattr(header_values, "get") else None
122 if not auth_header and hasattr(header_values, "get"):
123 auth_header = header_values.get("Authorization")
124 if auth_header:
125 scheme, _, credentials = auth_header.partition(" ")
126 if scheme.lower() == "bearer" and credentials:
127 return credentials.strip()
128 return None
131async def verify_jwt_token(token: str) -> dict:
132 """Verify and decode a JWT token in a single pass.
134 Decodes and validates a JWT token using the configured secret key
135 and algorithm from settings. Uses PyJWT's require option for claim
136 enforcement instead of a separate unverified decode.
138 Note:
139 With single-pass decoding, signature validation occurs before
140 claim validation. An invalid signature will result in "Invalid token"
141 error even if the token is also missing required claims.
143 Args:
144 token: The JWT token string to verify.
146 Returns:
147 dict: The decoded token payload containing claims (e.g., user info).
149 Raises:
150 HTTPException: If token is invalid, expired, or missing required claims.
151 """
152 try:
153 validate_jwt_algo_and_keys()
155 # Import the verification key helper
156 # First-Party
157 from mcpgateway.utils.jwt_config_helper import get_jwt_public_key_or_secret
159 options = {
160 "verify_aud": settings.jwt_audience_verification,
161 "verify_iss": settings.jwt_issuer_verification,
162 }
164 if settings.require_token_expiration:
165 options["require"] = ["exp"]
167 decode_kwargs = {
168 "key": get_jwt_public_key_or_secret(),
169 "algorithms": [settings.jwt_algorithm],
170 "options": options,
171 }
173 if settings.jwt_audience_verification:
174 decode_kwargs["audience"] = settings.jwt_audience
176 if settings.jwt_issuer_verification:
177 decode_kwargs["issuer"] = settings.jwt_issuer
179 payload = jwt.decode(token, **decode_kwargs)
181 # Log warning for tokens without expiration (when not required)
182 if not settings.require_token_expiration and "exp" not in payload:
183 logger.warning(f"JWT token without expiration accepted. Consider enabling REQUIRE_TOKEN_EXPIRATION for better security. Token sub: {payload.get('sub', 'unknown')}")
185 # Require JTI if configured
186 if settings.require_jti and "jti" not in payload:
187 raise HTTPException(
188 status_code=status.HTTP_401_UNAUTHORIZED,
189 detail="Token is missing required JTI claim. Set REQUIRE_JTI=false to allow.",
190 headers={"WWW-Authenticate": "Bearer"},
191 )
193 # Log warning for tokens without JTI (when not required)
194 if not settings.require_jti and "jti" not in payload:
195 logger.warning(f"JWT token without JTI accepted. Token cannot be revoked. Consider enabling REQUIRE_JTI for better security. Token sub: {payload.get('sub', 'unknown')}")
197 # Validate environment claim if configured (reject mismatched, allow missing for backward compatibility)
198 if settings.validate_token_environment:
199 token_env = payload.get("env")
200 if token_env is not None and token_env != settings.environment:
201 raise HTTPException(
202 status_code=status.HTTP_401_UNAUTHORIZED,
203 detail=f"Token environment mismatch: token is for '{token_env}', server is '{settings.environment}'",
204 headers={"WWW-Authenticate": "Bearer"},
205 )
207 return payload
209 except jwt.MissingRequiredClaimError:
210 raise HTTPException(
211 status_code=status.HTTP_401_UNAUTHORIZED,
212 detail="Token is missing required expiration claim. Set REQUIRE_TOKEN_EXPIRATION=false to allow.",
213 headers={"WWW-Authenticate": "Bearer"},
214 )
215 except jwt.ExpiredSignatureError:
216 raise HTTPException(
217 status_code=status.HTTP_401_UNAUTHORIZED,
218 detail="Token has expired",
219 headers={"WWW-Authenticate": "Bearer"},
220 )
221 except jwt.PyJWTError:
222 raise HTTPException(
223 status_code=status.HTTP_401_UNAUTHORIZED,
224 detail="Invalid token",
225 headers={"WWW-Authenticate": "Bearer"},
226 )
229async def verify_jwt_token_cached(token: str, request: Optional[Request] = None) -> dict:
230 """Verify JWT token with request-level caching.
232 If a request object is provided and the token has already been verified
233 for this request, returns the cached payload. Otherwise, performs
234 verification and caches the result in request.state.
236 Args:
237 token: JWT token string to verify
238 request: Optional FastAPI/Starlette request for request-level caching.
239 Must have a 'state' attribute to enable caching.
241 Returns:
242 dict: Decoded and verified JWT payload
244 Raises:
245 HTTPException: If token is invalid, expired, or missing required claims.
246 """
247 # Check request.state cache first (safely handle non-Request objects)
248 if request is not None and hasattr(request, "state"):
249 cached = getattr(request.state, "_jwt_verified_payload", None)
250 # Verify cache is a valid tuple of (token, payload) before unpacking
251 if cached is not None and isinstance(cached, tuple) and len(cached) == 2:
252 cached_token, cached_payload = cached
253 if cached_token == token:
254 return cached_payload
256 # Verify token (single decode)
257 payload = await verify_jwt_token(token)
259 # Cache in request.state for reuse across middleware
260 if request is not None and hasattr(request, "state"):
261 request.state._jwt_verified_payload = (token, payload)
263 return payload
266async def verify_credentials(token: str) -> dict:
267 """Verify credentials using a JWT token.
269 A wrapper around verify_jwt_token that adds the original token
270 to the decoded payload for reference.
272 This function uses verify_jwt_token internally which may raise exceptions.
274 Args:
275 token: The JWT token string to verify.
277 Returns:
278 dict: The validated token payload with the original token added
279 under the 'token' key.
281 Examples:
282 >>> from mcpgateway.utils import verify_credentials as vc
283 >>> from mcpgateway.utils import jwt_config_helper as jch
284 >>> from pydantic import SecretStr
285 >>> class DummySettings:
286 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
287 ... jwt_algorithm = 'HS256'
288 ... jwt_audience = 'mcpgateway-api'
289 ... jwt_issuer = 'mcpgateway'
290 ... jwt_audience_verification = True
291 ... jwt_issuer_verification = True
292 ... jwt_public_key_path = ''
293 ... jwt_private_key_path = ''
294 ... basic_auth_user = 'user'
295 ... basic_auth_password = SecretStr('pass')
296 ... auth_required = True
297 ... require_token_expiration = False
298 ... require_jti = False
299 ... validate_token_environment = False
300 ... docs_allow_basic_auth = False
301 >>> vc.settings = DummySettings()
302 >>> jch.settings = DummySettings()
303 >>> import jwt
304 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256')
305 >>> import asyncio
306 >>> payload = asyncio.run(vc.verify_credentials(token))
307 >>> payload['token'] == token
308 True
309 """
310 payload = await verify_jwt_token(token)
311 payload["token"] = token
312 return payload
315async def verify_credentials_cached(token: str, request: Optional[Request] = None) -> dict:
316 """Verify credentials using a JWT token with request-level caching.
318 A wrapper around verify_jwt_token_cached that adds the original token
319 to the decoded payload for reference.
321 Args:
322 token: The JWT token string to verify.
323 request: Optional FastAPI/Starlette request for request-level caching.
325 Returns:
326 dict: The validated token payload with the original token added
327 under the 'token' key. Returns a copy to avoid mutating cached payload.
328 """
329 payload = await verify_jwt_token_cached(token, request)
330 # Return a copy with token added to avoid mutating the cached payload
331 return {**payload, "token": token}
334def _raise_auth_401(detail: str) -> None:
335 """Raise a standardized bearer-auth 401 error.
337 Args:
338 detail: Error detail message for the response body.
340 Raises:
341 HTTPException: Always raises 401 Unauthorized with Bearer auth header.
342 """
343 raise HTTPException(
344 status_code=status.HTTP_401_UNAUTHORIZED,
345 detail=detail,
346 headers={"WWW-Authenticate": "Bearer"},
347 )
350async def _enforce_revocation_and_active_user(payload: dict) -> None:
351 """Enforce token revocation and active-user checks for JWT-authenticated flows.
353 Args:
354 payload: Verified JWT payload used to derive revocation and user status checks.
356 Raises:
357 HTTPException: 401 when the token is revoked, the account is disabled,
358 or strict user-in-db mode rejects a missing user.
359 """
360 # First-Party
361 from mcpgateway.auth import _check_token_revoked_sync, _get_user_by_email_sync
363 jti = payload.get("jti")
364 if jti:
365 try:
366 if await asyncio.to_thread(_check_token_revoked_sync, jti):
367 _raise_auth_401("Token has been revoked")
368 except HTTPException:
369 raise
370 except Exception as exc:
371 logger.warning("Token revocation check failed for JTI %s: %s", jti, exc)
373 username = payload.get("sub") or payload.get("email") or payload.get("username")
374 if not username:
375 return
377 try:
378 user = await asyncio.to_thread(_get_user_by_email_sync, username)
379 except Exception as exc:
380 logger.warning("User status check failed for %s: %s", username, exc)
381 return
383 if user is None:
384 if settings.require_user_in_db and username != getattr(settings, "platform_admin_email", "admin@example.com"):
385 _raise_auth_401("User not found in database")
386 return
388 if not user.is_active:
389 _raise_auth_401("Account disabled")
392async def require_auth(request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), jwt_token: Optional[str] = Cookie(default=None)) -> str | dict:
393 """Require authentication via JWT token or proxy headers.
395 FastAPI dependency that checks for authentication via:
396 1. Proxy headers (if mcp_client_auth_enabled=false and trust_proxy_auth=true)
397 2. JWT token in Authorization header (Bearer scheme)
398 3. JWT token in cookies
400 If authentication is required but no token is provided, raises an HTTP 401 error.
402 Args:
403 request: The FastAPI request object for accessing headers.
404 credentials: HTTP Authorization credentials from the request header.
405 jwt_token: JWT token from cookies.
407 Returns:
408 str | dict: The verified credentials payload if authenticated,
409 proxy user if proxy auth enabled, or "anonymous" if authentication is not required.
411 Raises:
412 HTTPException: 401 status if authentication is required but no valid
413 token is provided.
415 Examples:
416 >>> from mcpgateway.utils import verify_credentials as vc
417 >>> from mcpgateway.utils import jwt_config_helper as jch
418 >>> from pydantic import SecretStr
419 >>> class DummySettings:
420 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
421 ... jwt_algorithm = 'HS256'
422 ... jwt_audience = 'mcpgateway-api'
423 ... jwt_issuer = 'mcpgateway'
424 ... jwt_audience_verification = True
425 ... jwt_issuer_verification = True
426 ... jwt_public_key_path = ''
427 ... jwt_private_key_path = ''
428 ... basic_auth_user = 'user'
429 ... basic_auth_password = SecretStr('pass')
430 ... auth_required = True
431 ... mcp_client_auth_enabled = True
432 ... trust_proxy_auth = False
433 ... proxy_user_header = 'X-Authenticated-User'
434 ... require_token_expiration = False
435 ... require_jti = False
436 ... validate_token_environment = False
437 ... docs_allow_basic_auth = False
438 >>> vc.settings = DummySettings()
439 >>> jch.settings = DummySettings()
440 >>> import jwt
441 >>> from fastapi.security import HTTPAuthorizationCredentials
442 >>> from fastapi import Request
443 >>> import asyncio
445 Test with valid credentials in header:
446 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256')
447 >>> creds = HTTPAuthorizationCredentials(scheme='Bearer', credentials=token)
448 >>> req = Request(scope={'type': 'http', 'headers': []})
449 >>> result = asyncio.run(vc.require_auth(request=req, credentials=creds, jwt_token=None))
450 >>> result['sub'] == 'alice'
451 True
453 Test with valid token in cookie:
454 >>> result = asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=token))
455 >>> result['sub'] == 'alice'
456 True
458 Test with auth required but no token:
459 >>> try:
460 ... asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=None))
461 ... except vc.HTTPException as e:
462 ... print(e.status_code, e.detail)
463 401 Not authenticated
465 Test with auth not required:
466 >>> vc.settings.auth_required = False
467 >>> result = asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=None))
468 >>> result
469 'anonymous'
470 >>> vc.settings.auth_required = True
471 """
472 # If MCP client auth is disabled and proxy auth is trusted, use proxy headers
473 if not settings.mcp_client_auth_enabled:
474 if is_proxy_auth_trust_active():
475 # Extract user from proxy header
476 proxy_user = request.headers.get(settings.proxy_user_header)
477 if proxy_user:
478 return {"sub": proxy_user, "source": "proxy", "token": None} # nosec B105 - None is not a password
479 # No proxy header - check auth_required (matches RBAC/WebSocket behavior)
480 if settings.auth_required:
481 raise HTTPException(
482 status_code=status.HTTP_401_UNAUTHORIZED,
483 detail="Proxy authentication header required",
484 headers={"WWW-Authenticate": "Bearer"},
485 )
486 return "anonymous"
487 else:
488 # Warning: MCP auth disabled without proxy trust - security risk!
489 # This case is already warned about in config validation
490 if settings.auth_required:
491 raise HTTPException(
492 status_code=status.HTTP_401_UNAUTHORIZED,
493 detail="Authentication required but no auth method configured",
494 headers={"WWW-Authenticate": "Bearer"},
495 )
496 return "anonymous"
498 # Standard JWT authentication flow - prioritize manual cookie reading
499 token = None
501 # 1. First try manual cookie reading (most reliable)
502 if hasattr(request, "cookies") and request.cookies:
503 manual_token = request.cookies.get("jwt_token")
504 if manual_token:
505 token = manual_token
507 # 2. Then try Authorization header
508 if not token and credentials and credentials.credentials:
509 token = credentials.credentials
511 # 3. Finally try FastAPI Cookie dependency (fallback)
512 if not token and jwt_token:
513 token = jwt_token
515 if settings.auth_required and not token:
516 _raise_auth_401("Not authenticated")
518 if not token:
519 return "anonymous"
521 payload = await verify_credentials_cached(token, request)
522 await _enforce_revocation_and_active_user(payload)
523 return payload
526async def verify_basic_credentials(credentials: HTTPBasicCredentials) -> str:
527 """Verify HTTP Basic authentication credentials.
529 Validates the provided username and password against the configured
530 basic auth credentials in settings.
532 Args:
533 credentials: HTTP Basic credentials containing username and password.
535 Returns:
536 str: The authenticated username if credentials are valid.
538 Raises:
539 HTTPException: 401 status if credentials are invalid.
541 Examples:
542 >>> from mcpgateway.utils import verify_credentials as vc
543 >>> from pydantic import SecretStr
544 >>> class DummySettings:
545 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
546 ... jwt_algorithm = 'HS256'
547 ... jwt_audience = 'mcpgateway-api'
548 ... jwt_issuer = 'mcpgateway'
549 ... jwt_audience_verification = True
550 ... jwt_issuer_verification = True
551 ... basic_auth_user = 'user'
552 ... basic_auth_password = SecretStr('pass')
553 ... auth_required = True
554 ... docs_allow_basic_auth = False
555 >>> vc.settings = DummySettings()
556 >>> from fastapi.security import HTTPBasicCredentials
557 >>> creds = HTTPBasicCredentials(username='user', password='pass')
558 >>> import asyncio
559 >>> asyncio.run(vc.verify_basic_credentials(creds)) == 'user'
560 True
561 >>> creds_bad = HTTPBasicCredentials(username='user', password='wrong')
562 >>> try:
563 ... asyncio.run(vc.verify_basic_credentials(creds_bad))
564 ... except Exception as e:
565 ... print('error')
566 error
567 """
568 is_valid_user = credentials.username == settings.basic_auth_user
569 is_valid_pass = credentials.password == settings.basic_auth_password.get_secret_value()
571 if not (is_valid_user and is_valid_pass):
572 raise HTTPException(
573 status_code=status.HTTP_401_UNAUTHORIZED,
574 detail="Invalid credentials",
575 headers={"WWW-Authenticate": "Basic"},
576 )
577 return credentials.username
580async def require_basic_auth(credentials: HTTPBasicCredentials = Depends(basic_security)) -> str:
581 """Require valid HTTP Basic authentication.
583 FastAPI dependency that enforces Basic authentication when enabled.
584 Returns the authenticated username or "anonymous" if auth is not required.
586 Args:
587 credentials: HTTP Basic credentials provided by the client.
589 Returns:
590 str: The authenticated username or "anonymous" if auth is not required.
592 Raises:
593 HTTPException: 401 status if authentication is required but no valid
594 credentials are provided.
596 Examples:
597 >>> from mcpgateway.utils import verify_credentials as vc
598 >>> from pydantic import SecretStr
599 >>> class DummySettings:
600 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
601 ... jwt_algorithm = 'HS256'
602 ... jwt_audience = 'mcpgateway-api'
603 ... jwt_issuer = 'mcpgateway'
604 ... jwt_audience_verification = True
605 ... jwt_issuer_verification = True
606 ... basic_auth_user = 'user'
607 ... basic_auth_password = SecretStr('pass')
608 ... auth_required = True
609 ... docs_allow_basic_auth = False
610 >>> vc.settings = DummySettings()
611 >>> from fastapi.security import HTTPBasicCredentials
612 >>> import asyncio
614 Test with valid credentials:
615 >>> creds = HTTPBasicCredentials(username='user', password='pass')
616 >>> asyncio.run(vc.require_basic_auth(creds))
617 'user'
619 Test with auth required but no credentials:
620 >>> try:
621 ... asyncio.run(vc.require_basic_auth(None))
622 ... except vc.HTTPException as e:
623 ... print(e.status_code, e.detail)
624 401 Not authenticated
626 Test with auth not required:
627 >>> vc.settings.auth_required = False
628 >>> asyncio.run(vc.require_basic_auth(None))
629 'anonymous'
630 >>> vc.settings.auth_required = True
631 """
632 if settings.auth_required:
633 if not credentials:
634 raise HTTPException(
635 status_code=status.HTTP_401_UNAUTHORIZED,
636 detail="Not authenticated",
637 headers={"WWW-Authenticate": "Basic"},
638 )
639 return await verify_basic_credentials(credentials)
640 return "anonymous"
643async def require_docs_basic_auth(auth_header: str) -> str:
644 """Dedicated handler for HTTP Basic Auth for documentation endpoints only.
646 This function is ONLY intended for /docs, /redoc, or similar endpoints, and is enabled
647 via the settings.docs_allow_basic_auth flag. It should NOT be used for general API authentication.
649 Args:
650 auth_header: Raw Authorization header value (e.g. "Basic username:password").
652 Returns:
653 str: The authenticated username if credentials are valid.
655 Raises:
656 HTTPException: If credentials are invalid or malformed.
657 ValueError: If the basic auth format is invalid (missing colon).
659 Examples:
660 >>> from mcpgateway.utils import verify_credentials as vc
661 >>> from pydantic import SecretStr
662 >>> class DummySettings:
663 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
664 ... jwt_algorithm = 'HS256'
665 ... jwt_audience = 'mcpgateway-api'
666 ... jwt_issuer = 'mcpgateway'
667 ... jwt_audience_verification = True
668 ... jwt_issuer_verification = True
669 ... basic_auth_user = 'user'
670 ... basic_auth_password = SecretStr('pass')
671 ... auth_required = True
672 ... require_token_expiration = False
673 ... require_jti = False
674 ... validate_token_environment = False
675 ... docs_allow_basic_auth = True
676 >>> vc.settings = DummySettings()
677 >>> import base64, asyncio
679 Test with properly encoded credentials:
680 >>> userpass = base64.b64encode(b'user:pass').decode()
681 >>> auth_header = f'Basic {userpass}'
682 >>> asyncio.run(vc.require_docs_basic_auth(auth_header))
683 'user'
685 Test with different valid credentials:
686 >>> valid_creds = base64.b64encode(b'user:pass').decode()
687 >>> valid_header = f'Basic {valid_creds}'
688 >>> result = asyncio.run(vc.require_docs_basic_auth(valid_header))
689 >>> result == 'user'
690 True
692 Test with invalid password:
693 >>> badpass = base64.b64encode(b'user:wrong').decode()
694 >>> bad_header = f'Basic {badpass}'
695 >>> try:
696 ... asyncio.run(vc.require_docs_basic_auth(bad_header))
697 ... except vc.HTTPException as e:
698 ... e.status_code == 401
699 True
701 Test with malformed base64 (no colon):
702 >>> malformed = base64.b64encode(b'userpass').decode()
703 >>> malformed_header = f'Basic {malformed}'
704 >>> try:
705 ... asyncio.run(vc.require_docs_basic_auth(malformed_header))
706 ... except vc.HTTPException as e:
707 ... e.status_code == 401
708 True
710 Test with invalid base64 encoding:
711 >>> invalid_header = 'Basic invalid_base64!'
712 >>> try:
713 ... asyncio.run(vc.require_docs_basic_auth(invalid_header))
714 ... except vc.HTTPException as e:
715 ... 'Invalid basic auth credentials' in e.detail
716 True
718 Test when docs_allow_basic_auth is disabled:
719 >>> vc.settings.docs_allow_basic_auth = False
720 >>> try:
721 ... asyncio.run(vc.require_docs_basic_auth(auth_header))
722 ... except vc.HTTPException as e:
723 ... 'not allowed' in e.detail
724 True
725 >>> vc.settings.docs_allow_basic_auth = True
727 Test with non-Basic auth scheme:
728 >>> bearer_header = 'Bearer eyJhbGciOiJIUzI1NiJ9...'
729 >>> try:
730 ... asyncio.run(vc.require_docs_basic_auth(bearer_header))
731 ... except vc.HTTPException as e:
732 ... e.status_code == 401
733 True
735 Test with empty credentials part:
736 >>> empty_header = 'Basic '
737 >>> try:
738 ... asyncio.run(vc.require_docs_basic_auth(empty_header))
739 ... except vc.HTTPException as e:
740 ... 'not allowed' in e.detail
741 True
743 Test with Unicode decode error:
744 >>> from base64 import b64encode
745 >>> bad_bytes = bytes([0xff, 0xfe]) # Invalid UTF-8 bytes
746 >>> bad_unicode = b64encode(bad_bytes).decode()
747 >>> unicode_header = f'Basic {bad_unicode}'
748 >>> try:
749 ... asyncio.run(vc.require_docs_basic_auth(unicode_header))
750 ... except vc.HTTPException as e:
751 ... 'Invalid basic auth credentials' in e.detail
752 True
753 """
754 scheme, param = get_authorization_scheme_param(auth_header)
755 if scheme.lower() == "basic" and param and settings.docs_allow_basic_auth:
756 try:
757 data = b64decode(param).decode("ascii")
758 username, separator, password = data.partition(":")
759 if not separator:
760 raise ValueError("Invalid basic auth format")
761 credentials = HTTPBasicCredentials(username=username, password=password)
762 return await require_basic_auth(credentials=credentials)
763 except (ValueError, UnicodeDecodeError, binascii.Error):
764 raise HTTPException(
765 status_code=status.HTTP_401_UNAUTHORIZED,
766 detail="Invalid basic auth credentials",
767 headers={"WWW-Authenticate": "Basic"},
768 )
769 raise HTTPException(
770 status_code=status.HTTP_401_UNAUTHORIZED,
771 detail="Basic authentication not allowed or malformed",
772 headers={"WWW-Authenticate": "Basic"},
773 )
776async def require_docs_auth_override(
777 auth_header: str | None = None,
778 jwt_token: str | None = None,
779) -> str | dict:
780 """Require authentication for docs endpoints, bypassing global auth settings.
782 This function specifically validates JWT tokens for documentation endpoints
783 (/docs, /redoc, /openapi.json) regardless of global authentication settings
784 like mcp_client_auth_enabled or auth_required.
786 Args:
787 auth_header: Raw Authorization header value (e.g. "Bearer eyJhbGciOi...").
788 jwt_token: JWT token from cookies.
790 Returns:
791 str | dict: The decoded JWT payload.
793 Raises:
794 HTTPException: If authentication fails or credentials are invalid.
796 Examples:
797 >>> from mcpgateway.utils import verify_credentials as vc
798 >>> from mcpgateway.utils import jwt_config_helper as jch
799 >>> class DummySettings:
800 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
801 ... jwt_algorithm = 'HS256'
802 ... jwt_audience = 'mcpgateway-api'
803 ... jwt_issuer = 'mcpgateway'
804 ... jwt_audience_verification = True
805 ... jwt_issuer_verification = True
806 ... jwt_public_key_path = ''
807 ... jwt_private_key_path = ''
808 ... docs_allow_basic_auth = False
809 ... require_token_expiration = False
810 ... require_jti = False
811 ... validate_token_environment = False
812 >>> vc.settings = DummySettings()
813 >>> jch.settings = DummySettings()
814 >>> import jwt
815 >>> import asyncio
817 Test with valid JWT:
818 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256')
819 >>> auth_header = f'Bearer {token}'
820 >>> result = asyncio.run(vc.require_docs_auth_override(auth_header=auth_header))
821 >>> result['sub'] == 'alice'
822 True
824 Test with no token:
825 >>> try:
826 ... asyncio.run(vc.require_docs_auth_override())
827 ... except vc.HTTPException as e:
828 ... print(e.status_code, e.detail)
829 401 Not authenticated
830 """
831 # Extract token from header or cookie
832 token = jwt_token
833 if auth_header:
834 scheme, param = get_authorization_scheme_param(auth_header)
835 if scheme.lower() == "bearer" and param:
836 token = param
837 elif scheme.lower() == "basic" and param and settings.docs_allow_basic_auth:
838 # Only allow Basic Auth for docs endpoints when explicitly enabled
839 return await require_docs_basic_auth(auth_header)
841 # Always require a token for docs endpoints
842 if not token:
843 raise HTTPException(
844 status_code=status.HTTP_401_UNAUTHORIZED,
845 detail="Not authenticated",
846 headers={"WWW-Authenticate": "Bearer"},
847 )
849 # Validate JWT and enforce standard token/account status checks.
850 payload = await verify_credentials(token)
851 if isinstance(payload, dict):
852 await _enforce_revocation_and_active_user(payload)
853 return payload
856async def require_auth_override(
857 auth_header: str | None = None,
858 jwt_token: str | None = None,
859 request: Request | None = None,
860) -> str | dict:
861 """Call require_auth manually from middleware without FastAPI dependency injection.
863 This wrapper allows manual authentication verification in contexts where
864 FastAPI's dependency injection is not available (e.g., middleware).
865 It parses the Authorization header and creates the appropriate credentials
866 object before calling require_auth.
868 Args:
869 auth_header: Raw Authorization header value (e.g. "Bearer eyJhbGciOi...").
870 jwt_token: JWT taken from a cookie. If both header and cookie are
871 supplied, the header takes precedence.
872 request: Optional Request object for accessing headers (used for proxy auth).
874 Returns:
875 str | dict: The decoded JWT payload or the string "anonymous",
876 same as require_auth.
878 Raises:
879 HTTPException: If authentication fails or credentials are invalid.
880 ValueError: If basic auth credentials are malformed.
882 Note:
883 This wrapper may propagate HTTPException raised by require_auth,
884 but it does not raise anything on its own.
886 Examples:
887 >>> from mcpgateway.utils import verify_credentials as vc
888 >>> from mcpgateway.utils import jwt_config_helper as jch
889 >>> from pydantic import SecretStr
890 >>> class DummySettings:
891 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
892 ... jwt_algorithm = 'HS256'
893 ... jwt_audience = 'mcpgateway-api'
894 ... jwt_issuer = 'mcpgateway'
895 ... jwt_audience_verification = True
896 ... jwt_issuer_verification = True
897 ... jwt_public_key_path = ''
898 ... jwt_private_key_path = ''
899 ... basic_auth_user = 'user'
900 ... basic_auth_password = SecretStr('pass')
901 ... auth_required = True
902 ... mcp_client_auth_enabled = True
903 ... trust_proxy_auth = False
904 ... proxy_user_header = 'X-Authenticated-User'
905 ... require_token_expiration = False
906 ... require_jti = False
907 ... validate_token_environment = False
908 ... docs_allow_basic_auth = False
909 >>> vc.settings = DummySettings()
910 >>> jch.settings = DummySettings()
911 >>> import jwt
912 >>> import asyncio
914 Test with Bearer token in auth header:
915 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256')
916 >>> auth_header = f'Bearer {token}'
917 >>> result = asyncio.run(vc.require_auth_override(auth_header=auth_header))
918 >>> result['sub'] == 'alice'
919 True
921 Test with invalid auth scheme:
922 >>> auth_header = 'Basic dXNlcjpwYXNz' # Base64 encoded user:pass
923 >>> vc.settings.auth_required = False
924 >>> result = asyncio.run(vc.require_auth_override(auth_header=auth_header))
925 >>> result
926 'anonymous'
928 Test with only cookie token:
929 >>> result = asyncio.run(vc.require_auth_override(jwt_token=token))
930 >>> result['sub'] == 'alice'
931 True
933 Test with no auth:
934 >>> result = asyncio.run(vc.require_auth_override())
935 >>> result
936 'anonymous'
937 >>> vc.settings.auth_required = True
938 """
939 # Create a mock request if not provided (for backward compatibility)
940 if request is None:
941 request = Request(scope={"type": "http", "headers": []})
943 credentials = None
944 if auth_header:
945 scheme, param = get_authorization_scheme_param(auth_header)
946 if scheme.lower() == "bearer" and param:
947 credentials = HTTPAuthorizationCredentials(scheme=scheme, credentials=param)
948 elif scheme.lower() == "basic" and param and settings.docs_allow_basic_auth:
949 # Only allow Basic Auth for docs endpoints when explicitly enabled
950 return await require_docs_basic_auth(auth_header)
951 return await require_auth(request=request, credentials=credentials, jwt_token=jwt_token)
954async def require_auth_header_first(
955 auth_header: str | None = None,
956 jwt_token: str | None = None,
957 request: Request | None = None,
958) -> str | dict:
959 """Like require_auth_override but Authorization header takes precedence over cookies.
961 Token resolution order (matches streamable_http_auth middleware):
962 1. Authorization Bearer header (highest priority)
963 2. Cookie ``jwt_token`` from ``request.cookies``
964 3. ``jwt_token`` keyword argument
966 Use this in the stateful-session fallback (``_get_request_context_or_default``)
967 so that identity is consistent with the ASGI middleware that already
968 authenticated the primary request.
970 Args:
971 auth_header: Raw Authorization header value (e.g. "Bearer eyJhbGciOi...").
972 jwt_token: JWT taken from a cookie. Used only when no header token and no
973 request cookie are present.
974 request: Optional Request object. A bare empty request is created when
975 *None* is supplied (backward-compatible default).
977 Returns:
978 str | dict: The decoded JWT payload or the string "anonymous".
980 Raises:
981 HTTPException: If authentication fails or credentials are invalid.
983 Examples:
984 >>> from mcpgateway.utils import verify_credentials as vc
985 >>> from mcpgateway.utils import jwt_config_helper as jch
986 >>> from pydantic import SecretStr
987 >>> class DummySettings:
988 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
989 ... jwt_algorithm = 'HS256'
990 ... jwt_audience = 'mcpgateway-api'
991 ... jwt_issuer = 'mcpgateway'
992 ... jwt_audience_verification = True
993 ... jwt_issuer_verification = True
994 ... jwt_public_key_path = ''
995 ... jwt_private_key_path = ''
996 ... basic_auth_user = 'user'
997 ... basic_auth_password = SecretStr('pass')
998 ... auth_required = True
999 ... mcp_client_auth_enabled = True
1000 ... trust_proxy_auth = False
1001 ... proxy_user_header = 'X-Authenticated-User'
1002 ... require_token_expiration = False
1003 ... require_jti = False
1004 ... validate_token_environment = False
1005 ... docs_allow_basic_auth = False
1006 >>> vc.settings = DummySettings()
1007 >>> jch.settings = DummySettings()
1008 >>> import jwt
1009 >>> import asyncio
1011 Test header wins over cookie (the core fix):
1012 >>> header_tok = jwt.encode({'sub': 'header-user', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256')
1013 >>> result = asyncio.run(vc.require_auth_header_first(auth_header=f'Bearer {header_tok}'))
1014 >>> result['sub'] == 'header-user'
1015 True
1017 Test cookie fallback when no header:
1018 >>> cookie_tok = jwt.encode({'sub': 'cookie-user', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256')
1019 >>> result = asyncio.run(vc.require_auth_header_first(jwt_token=cookie_tok))
1020 >>> result['sub'] == 'cookie-user'
1021 True
1023 Test no auth when not required:
1024 >>> vc.settings.auth_required = False
1025 >>> result = asyncio.run(vc.require_auth_header_first())
1026 >>> result
1027 'anonymous'
1028 >>> vc.settings.auth_required = True
1029 """
1030 if request is None:
1031 request = Request(scope={"type": "http", "headers": []})
1033 # Proxy auth path — identical to require_auth
1034 if not settings.mcp_client_auth_enabled:
1035 if is_proxy_auth_trust_active():
1036 proxy_user = request.headers.get(settings.proxy_user_header)
1037 if proxy_user:
1038 return {"sub": proxy_user, "source": "proxy", "token": None} # nosec B105 - None is not a password
1039 if settings.auth_required:
1040 raise HTTPException(
1041 status_code=status.HTTP_401_UNAUTHORIZED,
1042 detail="Proxy authentication header required",
1043 headers={"WWW-Authenticate": "Bearer"},
1044 )
1045 return "anonymous"
1046 if settings.auth_required:
1047 raise HTTPException(
1048 status_code=status.HTTP_401_UNAUTHORIZED,
1049 detail="Authentication required but no auth method configured",
1050 headers={"WWW-Authenticate": "Bearer"},
1051 )
1052 return "anonymous"
1054 # Parse auth header once
1055 scheme = param = ""
1056 if auth_header:
1057 scheme, param = get_authorization_scheme_param(auth_header)
1058 if scheme.lower() == "basic" and param and settings.docs_allow_basic_auth:
1059 return await require_docs_basic_auth(auth_header)
1061 # Header-first JWT token resolution
1062 token: str | None = None
1064 # 1. Authorization Bearer header (highest priority — matches middleware)
1065 if scheme.lower() == "bearer" and param:
1066 token = param
1068 # 2. Cookie from request.cookies
1069 if not token and hasattr(request, "cookies") and request.cookies:
1070 token = request.cookies.get("jwt_token") or None
1072 # 3. jwt_token keyword argument
1073 if not token and jwt_token:
1074 token = jwt_token
1076 if settings.auth_required and not token:
1077 raise HTTPException(
1078 status_code=status.HTTP_401_UNAUTHORIZED,
1079 detail="Not authenticated",
1080 headers={"WWW-Authenticate": "Bearer"},
1081 )
1082 return await verify_credentials_cached(token, request) if token else "anonymous"
1085async def require_admin_auth(
1086 request: Request,
1087 credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
1088 jwt_token: Optional[str] = Cookie(None, alias="jwt_token"),
1089 basic_credentials: Optional[HTTPBasicCredentials] = Depends(basic_security),
1090) -> str:
1091 """Require admin authentication supporting both email auth and basic auth.
1093 This dependency supports multiple authentication methods:
1094 1. Email-based JWT authentication (when EMAIL_AUTH_ENABLED=true)
1095 2. Basic authentication (legacy support)
1096 3. Proxy headers (if configured)
1098 For email auth, the user must have is_admin=true.
1099 For basic auth, uses the configured BASIC_AUTH_USER/PASSWORD.
1101 Args:
1102 request: FastAPI request object
1103 credentials: HTTP Authorization credentials
1104 jwt_token: JWT token from cookies
1105 basic_credentials: HTTP Basic auth credentials
1107 Returns:
1108 str: Username/email of authenticated admin user
1110 Raises:
1111 HTTPException: 401 if authentication fails, 403 if user is not admin
1112 RedirectResponse: Redirect to login page for browser requests
1114 Examples:
1115 >>> # This function is typically used as a FastAPI dependency
1116 >>> callable(require_admin_auth)
1117 True
1118 """
1119 # First-Party
1120 from mcpgateway.config import settings
1122 # Try email authentication first if enabled
1123 if getattr(settings, "email_auth_enabled", False):
1124 try:
1125 # First-Party
1126 from mcpgateway.db import get_db
1127 from mcpgateway.services.email_auth_service import EmailAuthService
1129 token = jwt_token
1130 if not token and credentials:
1131 token = credentials.credentials
1133 if token:
1134 db_session = next(get_db())
1135 try:
1136 # Decode and verify JWT token (use cached version for performance)
1137 payload = await verify_jwt_token_cached(token, request)
1138 await _enforce_revocation_and_active_user(payload)
1139 username = payload.get("sub") or payload.get("username") # Support both new and legacy formats
1141 if username:
1142 # Get user from database
1143 auth_service = EmailAuthService(db_session)
1144 current_user = await auth_service.get_user_by_email(username)
1146 if current_user and not getattr(current_user, "is_active", True):
1147 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Account disabled")
1149 if current_user and current_user.is_admin:
1150 return current_user.email
1151 elif current_user:
1152 # User is authenticated but not admin - check if this is a browser request
1153 accept_header = request.headers.get("accept", "")
1154 if "text/html" in accept_header:
1155 # Redirect browser to login page with error
1156 root_path = request.scope.get("root_path", "")
1157 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Admin privileges required", headers={"Location": f"{root_path}/admin/login?error=admin_required"})
1158 else:
1159 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required")
1160 else:
1161 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
1162 except Exception:
1163 raise
1164 finally:
1165 db_session.close()
1166 except HTTPException as e:
1167 # Re-raise HTTP exceptions (403, redirects, etc.)
1168 if e.status_code != status.HTTP_401_UNAUTHORIZED:
1169 raise
1170 # For 401, check if we should redirect browser users
1171 accept_header = request.headers.get("accept", "")
1172 if "text/html" in accept_header:
1173 root_path = request.scope.get("root_path", "")
1174 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{root_path}/admin/login"})
1175 # If JWT auth fails, fall back to basic auth for backward compatibility
1176 except Exception:
1177 # If there's any other error with email auth, fall back to basic auth
1178 pass # nosec B110 - Intentional fallback to basic auth on any email auth error
1180 # Fall back to basic authentication (gated by API_ALLOW_BASIC_AUTH)
1181 try:
1182 if basic_credentials:
1183 # SECURITY: Basic auth for API endpoints is disabled by default
1184 if not settings.api_allow_basic_auth:
1185 raise HTTPException(
1186 status_code=status.HTTP_401_UNAUTHORIZED,
1187 detail="Basic authentication is disabled for API endpoints. Use JWT or API tokens instead.",
1188 headers={"WWW-Authenticate": "Bearer"},
1189 )
1190 return await verify_basic_credentials(basic_credentials)
1191 else:
1192 raise HTTPException(
1193 status_code=status.HTTP_401_UNAUTHORIZED,
1194 detail="Authentication required",
1195 headers={"WWW-Authenticate": "Bearer"},
1196 )
1197 except HTTPException:
1198 # If both methods fail, check if we should redirect browser users to login page
1199 if getattr(settings, "email_auth_enabled", False):
1200 accept_header = request.headers.get("accept", "")
1201 is_htmx = request.headers.get("hx-request") == "true"
1202 if "text/html" in accept_header or is_htmx:
1203 root_path = request.scope.get("root_path", "")
1204 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{root_path}/admin/login"})
1205 else:
1206 raise HTTPException(
1207 status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required. Please login with email/password or use basic auth.", headers={"WWW-Authenticate": "Bearer"}
1208 )
1209 else:
1210 # Re-raise the basic auth error
1211 raise