Coverage for mcpgateway / utils / verify_credentials.py: 99%
279 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/utils/verify_credentials.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Authentication verification utilities for ContextForge.
8This module provides JWT and Basic authentication verification functions
9for securing API endpoints. It supports authentication via Authorization
10headers and cookies.
11Examples:
12 >>> from mcpgateway.utils import verify_credentials as vc
13 >>> from mcpgateway.utils import jwt_config_helper as jch
14 >>> from pydantic import SecretStr
15 >>> class DummySettings:
16 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
17 ... jwt_algorithm = 'HS256'
18 ... jwt_audience = 'mcpgateway-api'
19 ... jwt_issuer = 'mcpgateway'
20 ... jwt_issuer_verification = True
21 ... jwt_audience_verification = True
22 ... jwt_public_key_path = ''
23 ... jwt_private_key_path = ''
24 ... basic_auth_user = 'user'
25 ... basic_auth_password = SecretStr('pass')
26 ... auth_required = True
27 ... require_token_expiration = False
28 ... require_jti = False
29 ... validate_token_environment = False
30 ... docs_allow_basic_auth = False
31 >>> vc.settings = DummySettings()
32 >>> jch.settings = DummySettings()
33 >>> jch.clear_jwt_caches()
34 >>> import jwt
35 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256')
36 >>> import asyncio
37 >>> asyncio.run(vc.verify_jwt_token(token))['sub'] == 'alice'
38 True
39 >>> payload = asyncio.run(vc.verify_credentials(token))
40 >>> payload['token'] == token
41 True
42 >>> from fastapi.security import HTTPBasicCredentials
43 >>> creds = HTTPBasicCredentials(username='user', password='pass')
44 >>> asyncio.run(vc.verify_basic_credentials(creds)) == 'user'
45 True
46 >>> creds_bad = HTTPBasicCredentials(username='user', password='wrong')
47 >>> try:
48 ... asyncio.run(vc.verify_basic_credentials(creds_bad))
49 ... except Exception as e:
50 ... print('error')
51 error
52"""
54# Standard
55import asyncio
56from base64 import b64decode
57import binascii
58from typing import Any, Optional
60# Third-Party
61from fastapi import Cookie, Depends, HTTPException, Request, status
62from fastapi.security import HTTPAuthorizationCredentials, HTTPBasic, HTTPBasicCredentials, HTTPBearer
63from fastapi.security.utils import get_authorization_scheme_param
64import jwt
66# First-Party
67from mcpgateway.config import settings
68from mcpgateway.services.logging_service import LoggingService
69from mcpgateway.utils.jwt_config_helper import validate_jwt_algo_and_keys
70from mcpgateway.utils.time_restrictions import validate_time_restrictions
72basic_security = HTTPBasic(auto_error=False)
73security = HTTPBearer(auto_error=False)
75# Initialize logging service first
76logging_service = LoggingService()
77logger = logging_service.get_logger(__name__)
80def is_proxy_auth_trust_active(settings_obj: Any | None = None) -> bool:
81 """Return whether proxy-header trust mode is explicitly active.
83 Args:
84 settings_obj: Optional settings object override (defaults to global settings).
86 Returns:
87 ``True`` when proxy-header trust is explicitly enabled and acknowledged;
88 otherwise ``False``.
89 """
90 current_settings = settings_obj or settings
92 if current_settings.mcp_client_auth_enabled or not current_settings.trust_proxy_auth:
93 return False
95 if getattr(current_settings, "trust_proxy_auth_dangerously", False) is True:
96 return True
98 if not getattr(is_proxy_auth_trust_active, "_warned", False):
99 logger.warning("Ignoring trusted proxy auth because TRUST_PROXY_AUTH_DANGEROUSLY is false while MCP client auth is disabled.")
100 is_proxy_auth_trust_active._warned = True # type: ignore[attr-defined]
101 return False
104def extract_websocket_bearer_token(query_params: Any, headers: Any, *, query_param_warning: Optional[str] = None) -> Optional[str]:
105 """Extract bearer token from WebSocket Authorization headers.
107 Args:
108 query_params: WebSocket query parameters mapping-like object.
109 headers: WebSocket headers mapping-like object.
110 query_param_warning: Optional warning message when legacy query token is detected.
112 Returns:
113 Bearer token value when present, otherwise None.
114 """
115 # Do not accept tokens from query parameters. This avoids leaking bearer
116 # secrets through URL logs/history/proxy telemetry.
117 query = query_params or {}
118 legacy_token = query.get("token") if hasattr(query, "get") else None
119 if legacy_token and query_param_warning:
120 logger.warning(f"{query_param_warning}; token ignored")
122 header_values = headers or {}
123 auth_header = header_values.get("authorization") if hasattr(header_values, "get") else None
124 if not auth_header and hasattr(header_values, "get"):
125 auth_header = header_values.get("Authorization")
126 if auth_header:
127 scheme, _, credentials = auth_header.partition(" ")
128 if scheme.lower() == "bearer" and credentials:
129 return credentials.strip()
130 return None
133async def verify_jwt_token(token: str) -> dict:
134 """Verify and decode a JWT token in a single pass.
136 Decodes and validates a JWT token using the configured secret key
137 and algorithm from settings. Uses PyJWT's require option for claim
138 enforcement instead of a separate unverified decode.
140 Note:
141 With single-pass decoding, signature validation occurs before
142 claim validation. An invalid signature will result in "Invalid token"
143 error even if the token is also missing required claims.
145 Args:
146 token: The JWT token string to verify.
148 Returns:
149 dict: The decoded token payload containing claims (e.g., user info).
151 Raises:
152 HTTPException: If token is invalid, expired, or missing required claims.
153 """
154 try:
155 validate_jwt_algo_and_keys()
157 # Import the verification key helper
158 # First-Party
159 from mcpgateway.utils.jwt_config_helper import get_jwt_public_key_or_secret
161 options = {
162 "verify_aud": settings.jwt_audience_verification,
163 "verify_iss": settings.jwt_issuer_verification,
164 }
166 if settings.require_token_expiration:
167 options["require"] = ["exp"]
169 decode_kwargs = {
170 "key": get_jwt_public_key_or_secret(),
171 "algorithms": [settings.jwt_algorithm],
172 "options": options,
173 }
175 if settings.jwt_audience_verification:
176 decode_kwargs["audience"] = settings.jwt_audience
178 if settings.jwt_issuer_verification:
179 decode_kwargs["issuer"] = settings.jwt_issuer
181 payload = jwt.decode(token, **decode_kwargs)
183 # Log warning for tokens without expiration (when not required)
184 if not settings.require_token_expiration and "exp" not in payload:
185 logger.warning(f"JWT token without expiration accepted. Consider enabling REQUIRE_TOKEN_EXPIRATION for better security. Token sub: {payload.get('sub', 'unknown')}")
187 # Require JTI if configured
188 if settings.require_jti and "jti" not in payload:
189 raise HTTPException(
190 status_code=status.HTTP_401_UNAUTHORIZED,
191 detail="Token is missing required JTI claim. Set REQUIRE_JTI=false to allow.",
192 headers={"WWW-Authenticate": "Bearer"},
193 )
195 # Log warning for tokens without JTI (when not required)
196 if not settings.require_jti and "jti" not in payload:
197 logger.warning(f"JWT token without JTI accepted. Token cannot be revoked. Consider enabling REQUIRE_JTI for better security. Token sub: {payload.get('sub', 'unknown')}")
199 # Validate environment claim if configured (reject mismatched, allow missing for backward compatibility)
200 if settings.validate_token_environment:
201 token_env = payload.get("env")
202 if token_env is not None and token_env != settings.environment:
203 raise HTTPException(
204 status_code=status.HTTP_401_UNAUTHORIZED,
205 detail=f"Token environment mismatch: token is for '{token_env}', server is '{settings.environment}'",
206 headers={"WWW-Authenticate": "Bearer"},
207 )
209 # Validate time restrictions if present in token scopes
210 validate_time_restrictions(payload)
212 return payload
214 except jwt.MissingRequiredClaimError:
215 raise HTTPException(
216 status_code=status.HTTP_401_UNAUTHORIZED,
217 detail="Token is missing required expiration claim. Set REQUIRE_TOKEN_EXPIRATION=false to allow.",
218 headers={"WWW-Authenticate": "Bearer"},
219 )
220 except jwt.ExpiredSignatureError:
221 raise HTTPException(
222 status_code=status.HTTP_401_UNAUTHORIZED,
223 detail="Token has expired",
224 headers={"WWW-Authenticate": "Bearer"},
225 )
226 except jwt.PyJWTError:
227 raise HTTPException(
228 status_code=status.HTTP_401_UNAUTHORIZED,
229 detail="Invalid token",
230 headers={"WWW-Authenticate": "Bearer"},
231 )
234async def verify_jwt_token_cached(token: str, request: Optional[Request] = None) -> dict:
235 """Verify JWT token with request-level caching.
237 If a request object is provided and the token has already been verified
238 for this request, returns the cached payload. Otherwise, performs
239 verification and caches the result in request.state.
241 Args:
242 token: JWT token string to verify
243 request: Optional FastAPI/Starlette request for request-level caching.
244 Must have a 'state' attribute to enable caching.
246 Returns:
247 dict: Decoded and verified JWT payload
249 Raises:
250 HTTPException: If token is invalid, expired, or missing required claims.
251 """
252 # Check request.state cache first (safely handle non-Request objects)
253 if request is not None and hasattr(request, "state"):
254 cached = getattr(request.state, "_jwt_verified_payload", None)
255 # Verify cache is a valid tuple of (token, payload) before unpacking
256 if cached is not None and isinstance(cached, tuple) and len(cached) == 2:
257 cached_token, cached_payload = cached
258 if cached_token == token:
259 return cached_payload
261 # Verify token (single decode)
262 payload = await verify_jwt_token(token)
264 # Cache in request.state for reuse across middleware
265 if request is not None and hasattr(request, "state"):
266 request.state._jwt_verified_payload = (token, payload)
268 return payload
271async def verify_credentials(token: str) -> dict:
272 """Verify credentials using a JWT token.
274 A wrapper around verify_jwt_token that adds the original token
275 to the decoded payload for reference.
277 This function uses verify_jwt_token internally which may raise exceptions.
279 Args:
280 token: The JWT token string to verify.
282 Returns:
283 dict: The validated token payload with the original token added
284 under the 'token' key.
286 Examples:
287 >>> from mcpgateway.utils import verify_credentials as vc
288 >>> from mcpgateway.utils import jwt_config_helper as jch
289 >>> from pydantic import SecretStr
290 >>> class DummySettings:
291 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
292 ... jwt_algorithm = 'HS256'
293 ... jwt_audience = 'mcpgateway-api'
294 ... jwt_issuer = 'mcpgateway'
295 ... jwt_audience_verification = True
296 ... jwt_issuer_verification = True
297 ... jwt_public_key_path = ''
298 ... jwt_private_key_path = ''
299 ... basic_auth_user = 'user'
300 ... basic_auth_password = SecretStr('pass')
301 ... auth_required = True
302 ... require_token_expiration = False
303 ... require_jti = False
304 ... validate_token_environment = False
305 ... docs_allow_basic_auth = False
306 >>> vc.settings = DummySettings()
307 >>> jch.settings = DummySettings()
308 >>> jch.clear_jwt_caches()
309 >>> import jwt
310 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256')
311 >>> import asyncio
312 >>> payload = asyncio.run(vc.verify_credentials(token))
313 >>> payload['token'] == token
314 True
315 """
316 payload = await verify_jwt_token(token)
317 payload["token"] = token
318 return payload
321async def verify_credentials_cached(token: str, request: Optional[Request] = None) -> dict:
322 """Verify credentials using a JWT token with request-level caching.
324 A wrapper around verify_jwt_token_cached that adds the original token
325 to the decoded payload for reference.
327 Args:
328 token: The JWT token string to verify.
329 request: Optional FastAPI/Starlette request for request-level caching.
331 Returns:
332 dict: The validated token payload with the original token added
333 under the 'token' key. Returns a copy to avoid mutating cached payload.
334 """
335 payload = await verify_jwt_token_cached(token, request)
336 # Return a copy with token added to avoid mutating the cached payload
337 return {**payload, "token": token}
340def _raise_auth_401(detail: str) -> None:
341 """Raise a standardized bearer-auth 401 error.
343 Args:
344 detail: Error detail message for the response body.
346 Raises:
347 HTTPException: Always raises 401 Unauthorized with Bearer auth header.
348 """
349 raise HTTPException(
350 status_code=status.HTTP_401_UNAUTHORIZED,
351 detail=detail,
352 headers={"WWW-Authenticate": "Bearer"},
353 )
356async def _enforce_revocation_and_active_user(payload: dict) -> None:
357 """Enforce token revocation and active-user checks for JWT-authenticated flows.
359 Args:
360 payload: Verified JWT payload used to derive revocation and user status checks.
362 Raises:
363 HTTPException: 401 when the token is revoked, the account is disabled,
364 or strict user-in-db mode rejects a missing user.
365 """
366 # First-Party
367 from mcpgateway.auth import _check_token_revoked_sync, _get_user_by_email_sync
369 jti = payload.get("jti")
370 if jti:
371 try:
372 if await asyncio.to_thread(_check_token_revoked_sync, jti):
373 _raise_auth_401("Token has been revoked")
374 except HTTPException:
375 raise
376 except Exception as exc:
377 logger.warning("Token revocation check failed for JTI %s: %s", jti, exc)
379 username = payload.get("sub") or payload.get("email") or payload.get("username")
380 if not username:
381 return
383 try:
384 user = await asyncio.to_thread(_get_user_by_email_sync, username)
385 except Exception as exc:
386 logger.warning("User status check failed for %s: %s", username, exc)
387 return
389 if user is None:
390 if settings.require_user_in_db and username != getattr(settings, "platform_admin_email", "admin@example.com"):
391 _raise_auth_401("User not found in database")
392 return
394 if not user.is_active:
395 _raise_auth_401("Account disabled")
398async def require_auth(request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), jwt_token: Optional[str] = Cookie(default=None)) -> str | dict:
399 """Require authentication via JWT token or proxy headers.
401 FastAPI dependency that checks for authentication via:
402 1. Proxy headers (if mcp_client_auth_enabled=false and trust_proxy_auth=true)
403 2. JWT token in Authorization header (Bearer scheme)
404 3. JWT token in cookies
406 If authentication is required but no token is provided, raises an HTTP 401 error.
408 Args:
409 request: The FastAPI request object for accessing headers.
410 credentials: HTTP Authorization credentials from the request header.
411 jwt_token: JWT token from cookies.
413 Returns:
414 str | dict: The verified credentials payload if authenticated,
415 proxy user if proxy auth enabled, or "anonymous" if authentication is not required.
417 Raises:
418 HTTPException: 401 status if authentication is required but no valid
419 token is provided.
421 Examples:
422 >>> from mcpgateway.utils import verify_credentials as vc
423 >>> from mcpgateway.utils import jwt_config_helper as jch
424 >>> from pydantic import SecretStr
425 >>> class DummySettings:
426 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
427 ... jwt_algorithm = 'HS256'
428 ... jwt_audience = 'mcpgateway-api'
429 ... jwt_issuer = 'mcpgateway'
430 ... jwt_audience_verification = True
431 ... jwt_issuer_verification = True
432 ... jwt_public_key_path = ''
433 ... jwt_private_key_path = ''
434 ... basic_auth_user = 'user'
435 ... basic_auth_password = SecretStr('pass')
436 ... auth_required = True
437 ... mcp_client_auth_enabled = True
438 ... trust_proxy_auth = False
439 ... proxy_user_header = 'X-Authenticated-User'
440 ... require_token_expiration = False
441 ... require_jti = False
442 ... validate_token_environment = False
443 ... docs_allow_basic_auth = False
444 >>> vc.settings = DummySettings()
445 >>> jch.settings = DummySettings()
446 >>> jch.clear_jwt_caches()
447 >>> import jwt
448 >>> from fastapi.security import HTTPAuthorizationCredentials
449 >>> from fastapi import Request
450 >>> import asyncio
452 Test with valid credentials in header:
453 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256')
454 >>> creds = HTTPAuthorizationCredentials(scheme='Bearer', credentials=token)
455 >>> req = Request(scope={'type': 'http', 'headers': []})
456 >>> result = asyncio.run(vc.require_auth(request=req, credentials=creds, jwt_token=None))
457 >>> result['sub'] == 'alice'
458 True
460 Test with valid token in cookie:
461 >>> result = asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=token))
462 >>> result['sub'] == 'alice'
463 True
465 Test with auth required but no token:
466 >>> try:
467 ... asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=None))
468 ... except vc.HTTPException as e:
469 ... print(e.status_code, e.detail)
470 401 Not authenticated
472 Test with auth not required:
473 >>> vc.settings.auth_required = False
474 >>> result = asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=None))
475 >>> result
476 'anonymous'
477 >>> vc.settings.auth_required = True
478 """
479 # If MCP client auth is disabled and proxy auth is trusted, use proxy headers
480 if not settings.mcp_client_auth_enabled:
481 if is_proxy_auth_trust_active():
482 # Extract user from proxy header
483 proxy_user = request.headers.get(settings.proxy_user_header)
484 if proxy_user:
485 return {"sub": proxy_user, "source": "proxy", "token": None} # nosec B105 - None is not a password
486 # No proxy header - check auth_required (matches RBAC/WebSocket behavior)
487 if settings.auth_required:
488 raise HTTPException(
489 status_code=status.HTTP_401_UNAUTHORIZED,
490 detail="Proxy authentication header required",
491 headers={"WWW-Authenticate": "Bearer"},
492 )
493 return "anonymous"
494 else:
495 # Warning: MCP auth disabled without proxy trust - security risk!
496 # This case is already warned about in config validation
497 if settings.auth_required:
498 raise HTTPException(
499 status_code=status.HTTP_401_UNAUTHORIZED,
500 detail="Authentication required but no auth method configured",
501 headers={"WWW-Authenticate": "Bearer"},
502 )
503 return "anonymous"
505 # Standard JWT authentication flow - prioritize manual cookie reading
506 token = None
508 # 1. First try manual cookie reading (most reliable)
509 if hasattr(request, "cookies") and request.cookies:
510 manual_token = request.cookies.get("jwt_token")
511 if manual_token:
512 token = manual_token
514 # 2. Then try Authorization header
515 if not token and credentials and credentials.credentials:
516 token = credentials.credentials
518 # 3. Finally try FastAPI Cookie dependency (fallback)
519 if not token and jwt_token:
520 token = jwt_token
522 if settings.auth_required and not token:
523 _raise_auth_401("Not authenticated")
525 if not token:
526 return "anonymous"
528 payload = await verify_credentials_cached(token, request)
529 await _enforce_revocation_and_active_user(payload)
530 return payload
533async def verify_basic_credentials(credentials: HTTPBasicCredentials) -> str:
534 """Verify HTTP Basic authentication credentials.
536 Validates the provided username and password against the configured
537 basic auth credentials in settings.
539 Args:
540 credentials: HTTP Basic credentials containing username and password.
542 Returns:
543 str: The authenticated username if credentials are valid.
545 Raises:
546 HTTPException: 401 status if credentials are invalid.
548 Examples:
549 >>> from mcpgateway.utils import verify_credentials as vc
550 >>> from pydantic import SecretStr
551 >>> class DummySettings:
552 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
553 ... jwt_algorithm = 'HS256'
554 ... jwt_audience = 'mcpgateway-api'
555 ... jwt_issuer = 'mcpgateway'
556 ... jwt_audience_verification = True
557 ... jwt_issuer_verification = True
558 ... basic_auth_user = 'user'
559 ... basic_auth_password = SecretStr('pass')
560 ... auth_required = True
561 ... docs_allow_basic_auth = False
562 >>> vc.settings = DummySettings()
563 >>> from fastapi.security import HTTPBasicCredentials
564 >>> creds = HTTPBasicCredentials(username='user', password='pass')
565 >>> import asyncio
566 >>> asyncio.run(vc.verify_basic_credentials(creds)) == 'user'
567 True
568 >>> creds_bad = HTTPBasicCredentials(username='user', password='wrong')
569 >>> try:
570 ... asyncio.run(vc.verify_basic_credentials(creds_bad))
571 ... except Exception as e:
572 ... print('error')
573 error
574 """
575 is_valid_user = credentials.username == settings.basic_auth_user
576 is_valid_pass = credentials.password == settings.basic_auth_password.get_secret_value()
578 if not (is_valid_user and is_valid_pass):
579 raise HTTPException(
580 status_code=status.HTTP_401_UNAUTHORIZED,
581 detail="Invalid credentials",
582 headers={"WWW-Authenticate": "Basic"},
583 )
584 return credentials.username
587async def require_basic_auth(credentials: HTTPBasicCredentials = Depends(basic_security)) -> str:
588 """Require valid HTTP Basic authentication.
590 FastAPI dependency that enforces Basic authentication when enabled.
591 Returns the authenticated username or "anonymous" if auth is not required.
593 Args:
594 credentials: HTTP Basic credentials provided by the client.
596 Returns:
597 str: The authenticated username or "anonymous" if auth is not required.
599 Raises:
600 HTTPException: 401 status if authentication is required but no valid
601 credentials are provided.
603 Examples:
604 >>> from mcpgateway.utils import verify_credentials as vc
605 >>> from pydantic import SecretStr
606 >>> class DummySettings:
607 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
608 ... jwt_algorithm = 'HS256'
609 ... jwt_audience = 'mcpgateway-api'
610 ... jwt_issuer = 'mcpgateway'
611 ... jwt_audience_verification = True
612 ... jwt_issuer_verification = True
613 ... basic_auth_user = 'user'
614 ... basic_auth_password = SecretStr('pass')
615 ... auth_required = True
616 ... docs_allow_basic_auth = False
617 >>> vc.settings = DummySettings()
618 >>> from fastapi.security import HTTPBasicCredentials
619 >>> import asyncio
621 Test with valid credentials:
622 >>> creds = HTTPBasicCredentials(username='user', password='pass')
623 >>> asyncio.run(vc.require_basic_auth(creds))
624 'user'
626 Test with auth required but no credentials:
627 >>> try:
628 ... asyncio.run(vc.require_basic_auth(None))
629 ... except vc.HTTPException as e:
630 ... print(e.status_code, e.detail)
631 401 Not authenticated
633 Test with auth not required:
634 >>> vc.settings.auth_required = False
635 >>> asyncio.run(vc.require_basic_auth(None))
636 'anonymous'
637 >>> vc.settings.auth_required = True
638 """
639 if settings.auth_required:
640 if not credentials:
641 raise HTTPException(
642 status_code=status.HTTP_401_UNAUTHORIZED,
643 detail="Not authenticated",
644 headers={"WWW-Authenticate": "Basic"},
645 )
646 return await verify_basic_credentials(credentials)
647 return "anonymous"
650async def require_docs_basic_auth(auth_header: str) -> str:
651 """Dedicated handler for HTTP Basic Auth for documentation endpoints only.
653 This function is ONLY intended for /docs, /redoc, or similar endpoints, and is enabled
654 via the settings.docs_allow_basic_auth flag. It should NOT be used for general API authentication.
656 Args:
657 auth_header: Raw Authorization header value (e.g. "Basic username:password").
659 Returns:
660 str: The authenticated username if credentials are valid.
662 Raises:
663 HTTPException: If credentials are invalid or malformed.
664 ValueError: If the basic auth format is invalid (missing colon).
666 Examples:
667 >>> from mcpgateway.utils import verify_credentials as vc
668 >>> from pydantic import SecretStr
669 >>> class DummySettings:
670 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
671 ... jwt_algorithm = 'HS256'
672 ... jwt_audience = 'mcpgateway-api'
673 ... jwt_issuer = 'mcpgateway'
674 ... jwt_audience_verification = True
675 ... jwt_issuer_verification = True
676 ... basic_auth_user = 'user'
677 ... basic_auth_password = SecretStr('pass')
678 ... auth_required = True
679 ... require_token_expiration = False
680 ... require_jti = False
681 ... validate_token_environment = False
682 ... docs_allow_basic_auth = True
683 >>> vc.settings = DummySettings()
684 >>> import base64, asyncio
686 Test with properly encoded credentials:
687 >>> userpass = base64.b64encode(b'user:pass').decode()
688 >>> auth_header = f'Basic {userpass}'
689 >>> asyncio.run(vc.require_docs_basic_auth(auth_header))
690 'user'
692 Test with different valid credentials:
693 >>> valid_creds = base64.b64encode(b'user:pass').decode()
694 >>> valid_header = f'Basic {valid_creds}'
695 >>> result = asyncio.run(vc.require_docs_basic_auth(valid_header))
696 >>> result == 'user'
697 True
699 Test with invalid password:
700 >>> badpass = base64.b64encode(b'user:wrong').decode()
701 >>> bad_header = f'Basic {badpass}'
702 >>> try:
703 ... asyncio.run(vc.require_docs_basic_auth(bad_header))
704 ... except vc.HTTPException as e:
705 ... e.status_code == 401
706 True
708 Test with malformed base64 (no colon):
709 >>> malformed = base64.b64encode(b'userpass').decode()
710 >>> malformed_header = f'Basic {malformed}'
711 >>> try:
712 ... asyncio.run(vc.require_docs_basic_auth(malformed_header))
713 ... except vc.HTTPException as e:
714 ... e.status_code == 401
715 True
717 Test with invalid base64 encoding:
718 >>> invalid_header = 'Basic invalid_base64!'
719 >>> try:
720 ... asyncio.run(vc.require_docs_basic_auth(invalid_header))
721 ... except vc.HTTPException as e:
722 ... 'Invalid basic auth credentials' in e.detail
723 True
725 Test when docs_allow_basic_auth is disabled:
726 >>> vc.settings.docs_allow_basic_auth = False
727 >>> try:
728 ... asyncio.run(vc.require_docs_basic_auth(auth_header))
729 ... except vc.HTTPException as e:
730 ... 'not allowed' in e.detail
731 True
732 >>> vc.settings.docs_allow_basic_auth = True
734 Test with non-Basic auth scheme:
735 >>> bearer_header = 'Bearer eyJhbGciOiJIUzI1NiJ9...'
736 >>> try:
737 ... asyncio.run(vc.require_docs_basic_auth(bearer_header))
738 ... except vc.HTTPException as e:
739 ... e.status_code == 401
740 True
742 Test with empty credentials part:
743 >>> empty_header = 'Basic '
744 >>> try:
745 ... asyncio.run(vc.require_docs_basic_auth(empty_header))
746 ... except vc.HTTPException as e:
747 ... 'not allowed' in e.detail
748 True
750 Test with Unicode decode error:
751 >>> from base64 import b64encode
752 >>> bad_bytes = bytes([0xff, 0xfe]) # Invalid UTF-8 bytes
753 >>> bad_unicode = b64encode(bad_bytes).decode()
754 >>> unicode_header = f'Basic {bad_unicode}'
755 >>> try:
756 ... asyncio.run(vc.require_docs_basic_auth(unicode_header))
757 ... except vc.HTTPException as e:
758 ... 'Invalid basic auth credentials' in e.detail
759 True
760 """
761 scheme, param = get_authorization_scheme_param(auth_header)
762 if scheme.lower() == "basic" and param and settings.docs_allow_basic_auth:
763 try:
764 data = b64decode(param).decode("ascii")
765 username, separator, password = data.partition(":")
766 if not separator:
767 raise ValueError("Invalid basic auth format")
768 credentials = HTTPBasicCredentials(username=username, password=password)
769 return await require_basic_auth(credentials=credentials)
770 except (ValueError, UnicodeDecodeError, binascii.Error):
771 raise HTTPException(
772 status_code=status.HTTP_401_UNAUTHORIZED,
773 detail="Invalid basic auth credentials",
774 headers={"WWW-Authenticate": "Basic"},
775 )
776 raise HTTPException(
777 status_code=status.HTTP_401_UNAUTHORIZED,
778 detail="Basic authentication not allowed or malformed",
779 headers={"WWW-Authenticate": "Basic"},
780 )
783async def require_docs_auth_override(
784 auth_header: str | None = None,
785 jwt_token: str | None = None,
786) -> str | dict:
787 """Require authentication for docs endpoints, bypassing global auth settings.
789 This function specifically validates JWT tokens for documentation endpoints
790 (/docs, /redoc, /openapi.json) regardless of global authentication settings
791 like mcp_client_auth_enabled or auth_required.
793 Args:
794 auth_header: Raw Authorization header value (e.g. "Bearer eyJhbGciOi...").
795 jwt_token: JWT token from cookies.
797 Returns:
798 str | dict: The decoded JWT payload.
800 Raises:
801 HTTPException: If authentication fails or credentials are invalid.
803 Examples:
804 >>> from mcpgateway.utils import verify_credentials as vc
805 >>> from mcpgateway.utils import jwt_config_helper as jch
806 >>> class DummySettings:
807 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
808 ... jwt_algorithm = 'HS256'
809 ... jwt_audience = 'mcpgateway-api'
810 ... jwt_issuer = 'mcpgateway'
811 ... jwt_audience_verification = True
812 ... jwt_issuer_verification = True
813 ... jwt_public_key_path = ''
814 ... jwt_private_key_path = ''
815 ... docs_allow_basic_auth = False
816 ... require_token_expiration = False
817 ... require_jti = False
818 ... validate_token_environment = False
819 >>> vc.settings = DummySettings()
820 >>> jch.settings = DummySettings()
821 >>> jch.clear_jwt_caches()
822 >>> import jwt
823 >>> import asyncio
825 Test with valid JWT:
826 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256')
827 >>> auth_header = f'Bearer {token}'
828 >>> result = asyncio.run(vc.require_docs_auth_override(auth_header=auth_header))
829 >>> result['sub'] == 'alice'
830 True
832 Test with no token:
833 >>> try:
834 ... asyncio.run(vc.require_docs_auth_override())
835 ... except vc.HTTPException as e:
836 ... print(e.status_code, e.detail)
837 401 Not authenticated
838 """
839 # Extract token from header or cookie
840 token = jwt_token
841 if auth_header:
842 scheme, param = get_authorization_scheme_param(auth_header)
843 if scheme.lower() == "bearer" and param:
844 token = param
845 elif scheme.lower() == "basic" and param and settings.docs_allow_basic_auth:
846 # Only allow Basic Auth for docs endpoints when explicitly enabled
847 return await require_docs_basic_auth(auth_header)
849 # Always require a token for docs endpoints
850 if not token:
851 raise HTTPException(
852 status_code=status.HTTP_401_UNAUTHORIZED,
853 detail="Not authenticated",
854 headers={"WWW-Authenticate": "Bearer"},
855 )
857 # Validate JWT and enforce standard token/account status checks.
858 payload = await verify_credentials(token)
859 if isinstance(payload, dict):
860 await _enforce_revocation_and_active_user(payload)
861 return payload
864async def require_auth_override(
865 auth_header: str | None = None,
866 jwt_token: str | None = None,
867 request: Request | None = None,
868) -> str | dict:
869 """Call require_auth manually from middleware without FastAPI dependency injection.
871 This wrapper allows manual authentication verification in contexts where
872 FastAPI's dependency injection is not available (e.g., middleware).
873 It parses the Authorization header and creates the appropriate credentials
874 object before calling require_auth.
876 Args:
877 auth_header: Raw Authorization header value (e.g. "Bearer eyJhbGciOi...").
878 jwt_token: JWT taken from a cookie. If both header and cookie are
879 supplied, the header takes precedence.
880 request: Optional Request object for accessing headers (used for proxy auth).
882 Returns:
883 str | dict: The decoded JWT payload or the string "anonymous",
884 same as require_auth.
886 Raises:
887 HTTPException: If authentication fails or credentials are invalid.
888 ValueError: If basic auth credentials are malformed.
890 Note:
891 This wrapper may propagate HTTPException raised by require_auth,
892 but it does not raise anything on its own.
894 Examples:
895 >>> from mcpgateway.utils import verify_credentials as vc
896 >>> from mcpgateway.utils import jwt_config_helper as jch
897 >>> from pydantic import SecretStr
898 >>> class DummySettings:
899 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
900 ... jwt_algorithm = 'HS256'
901 ... jwt_audience = 'mcpgateway-api'
902 ... jwt_issuer = 'mcpgateway'
903 ... jwt_audience_verification = True
904 ... jwt_issuer_verification = True
905 ... jwt_public_key_path = ''
906 ... jwt_private_key_path = ''
907 ... basic_auth_user = 'user'
908 ... basic_auth_password = SecretStr('pass')
909 ... auth_required = True
910 ... mcp_client_auth_enabled = True
911 ... trust_proxy_auth = False
912 ... proxy_user_header = 'X-Authenticated-User'
913 ... require_token_expiration = False
914 ... require_jti = False
915 ... validate_token_environment = False
916 ... docs_allow_basic_auth = False
917 >>> vc.settings = DummySettings()
918 >>> jch.settings = DummySettings()
919 >>> jch.clear_jwt_caches()
920 >>> import jwt
921 >>> import asyncio
923 Test with Bearer token in auth header:
924 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256')
925 >>> auth_header = f'Bearer {token}'
926 >>> result = asyncio.run(vc.require_auth_override(auth_header=auth_header))
927 >>> result['sub'] == 'alice'
928 True
930 Test with invalid auth scheme:
931 >>> auth_header = 'Basic dXNlcjpwYXNz' # Base64 encoded user:pass
932 >>> vc.settings.auth_required = False
933 >>> result = asyncio.run(vc.require_auth_override(auth_header=auth_header))
934 >>> result
935 'anonymous'
937 Test with only cookie token:
938 >>> result = asyncio.run(vc.require_auth_override(jwt_token=token))
939 >>> result['sub'] == 'alice'
940 True
942 Test with no auth:
943 >>> result = asyncio.run(vc.require_auth_override())
944 >>> result
945 'anonymous'
946 >>> vc.settings.auth_required = True
947 """
948 # Create a mock request if not provided (for backward compatibility)
949 if request is None:
950 request = Request(scope={"type": "http", "headers": []})
952 credentials = None
953 if auth_header:
954 scheme, param = get_authorization_scheme_param(auth_header)
955 if scheme.lower() == "bearer" and param:
956 credentials = HTTPAuthorizationCredentials(scheme=scheme, credentials=param)
957 elif scheme.lower() == "basic" and param and settings.docs_allow_basic_auth:
958 # Only allow Basic Auth for docs endpoints when explicitly enabled
959 return await require_docs_basic_auth(auth_header)
960 return await require_auth(request=request, credentials=credentials, jwt_token=jwt_token)
963async def require_auth_header_first(
964 auth_header: str | None = None,
965 jwt_token: str | None = None,
966 request: Request | None = None,
967) -> str | dict:
968 """Like require_auth_override but Authorization header takes precedence over cookies.
970 Token resolution order (matches streamable_http_auth middleware):
971 1. Authorization Bearer header (highest priority)
972 2. Cookie ``jwt_token`` from ``request.cookies``
973 3. ``jwt_token`` keyword argument
975 Use this in the stateful-session fallback (``_get_request_context_or_default``)
976 so that identity is consistent with the ASGI middleware that already
977 authenticated the primary request.
979 Args:
980 auth_header: Raw Authorization header value (e.g. "Bearer eyJhbGciOi...").
981 jwt_token: JWT taken from a cookie. Used only when no header token and no
982 request cookie are present.
983 request: Optional Request object. A bare empty request is created when
984 *None* is supplied (backward-compatible default).
986 Returns:
987 str | dict: The decoded JWT payload or the string "anonymous".
989 Raises:
990 HTTPException: If authentication fails or credentials are invalid.
992 Examples:
993 >>> from mcpgateway.utils import verify_credentials as vc
994 >>> from mcpgateway.utils import jwt_config_helper as jch
995 >>> from pydantic import SecretStr
996 >>> class DummySettings:
997 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
998 ... jwt_algorithm = 'HS256'
999 ... jwt_audience = 'mcpgateway-api'
1000 ... jwt_issuer = 'mcpgateway'
1001 ... jwt_audience_verification = True
1002 ... jwt_issuer_verification = True
1003 ... jwt_public_key_path = ''
1004 ... jwt_private_key_path = ''
1005 ... basic_auth_user = 'user'
1006 ... basic_auth_password = SecretStr('pass')
1007 ... auth_required = True
1008 ... mcp_client_auth_enabled = True
1009 ... trust_proxy_auth = False
1010 ... proxy_user_header = 'X-Authenticated-User'
1011 ... require_token_expiration = False
1012 ... require_jti = False
1013 ... validate_token_environment = False
1014 ... docs_allow_basic_auth = False
1015 >>> vc.settings = DummySettings()
1016 >>> jch.settings = DummySettings()
1017 >>> jch.clear_jwt_caches()
1018 >>> import jwt
1019 >>> import asyncio
1021 Test header wins over cookie (the core fix):
1022 >>> header_tok = jwt.encode({'sub': 'header-user', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256')
1023 >>> result = asyncio.run(vc.require_auth_header_first(auth_header=f'Bearer {header_tok}'))
1024 >>> result['sub'] == 'header-user'
1025 True
1027 Test cookie fallback when no header:
1028 >>> cookie_tok = jwt.encode({'sub': 'cookie-user', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256')
1029 >>> result = asyncio.run(vc.require_auth_header_first(jwt_token=cookie_tok))
1030 >>> result['sub'] == 'cookie-user'
1031 True
1033 Test no auth when not required:
1034 >>> vc.settings.auth_required = False
1035 >>> result = asyncio.run(vc.require_auth_header_first())
1036 >>> result
1037 'anonymous'
1038 >>> vc.settings.auth_required = True
1039 """
1040 if request is None:
1041 request = Request(scope={"type": "http", "headers": []})
1043 # Proxy auth path — identical to require_auth
1044 if not settings.mcp_client_auth_enabled:
1045 if is_proxy_auth_trust_active():
1046 proxy_user = request.headers.get(settings.proxy_user_header)
1047 if proxy_user:
1048 return {"sub": proxy_user, "source": "proxy", "token": None} # nosec B105 - None is not a password
1049 if settings.auth_required:
1050 raise HTTPException(
1051 status_code=status.HTTP_401_UNAUTHORIZED,
1052 detail="Proxy authentication header required",
1053 headers={"WWW-Authenticate": "Bearer"},
1054 )
1055 return "anonymous"
1056 if settings.auth_required:
1057 raise HTTPException(
1058 status_code=status.HTTP_401_UNAUTHORIZED,
1059 detail="Authentication required but no auth method configured",
1060 headers={"WWW-Authenticate": "Bearer"},
1061 )
1062 return "anonymous"
1064 # Parse auth header once
1065 scheme = param = ""
1066 if auth_header:
1067 scheme, param = get_authorization_scheme_param(auth_header)
1068 if scheme.lower() == "basic" and param and settings.docs_allow_basic_auth:
1069 return await require_docs_basic_auth(auth_header)
1071 # Header-first JWT token resolution
1072 token: str | None = None
1074 # 1. Authorization Bearer header (highest priority — matches middleware)
1075 if scheme.lower() == "bearer" and param:
1076 token = param
1078 # 2. Cookie from request.cookies
1079 if not token and hasattr(request, "cookies") and request.cookies:
1080 token = request.cookies.get("jwt_token") or None
1082 # 3. jwt_token keyword argument
1083 if not token and jwt_token:
1084 token = jwt_token
1086 if settings.auth_required and not token:
1087 raise HTTPException(
1088 status_code=status.HTTP_401_UNAUTHORIZED,
1089 detail="Not authenticated",
1090 headers={"WWW-Authenticate": "Bearer"},
1091 )
1092 return await verify_credentials_cached(token, request) if token else "anonymous"
1095async def require_admin_auth(
1096 request: Request,
1097 credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
1098 jwt_token: Optional[str] = Cookie(None, alias="jwt_token"),
1099 basic_credentials: Optional[HTTPBasicCredentials] = Depends(basic_security),
1100) -> str:
1101 """Require admin authentication supporting both email auth and basic auth.
1103 This dependency supports multiple authentication methods:
1104 1. Email-based JWT authentication (when EMAIL_AUTH_ENABLED=true)
1105 2. Basic authentication (legacy support)
1106 3. Proxy headers (if configured)
1108 For email auth, the user must have is_admin=true.
1109 For basic auth, uses the configured BASIC_AUTH_USER/PASSWORD.
1111 Args:
1112 request: FastAPI request object
1113 credentials: HTTP Authorization credentials
1114 jwt_token: JWT token from cookies
1115 basic_credentials: HTTP Basic auth credentials
1117 Returns:
1118 str: Username/email of authenticated admin user
1120 Raises:
1121 HTTPException: 401 if authentication fails, 403 if user is not admin
1122 RedirectResponse: Redirect to login page for browser requests
1124 Examples:
1125 >>> # This function is typically used as a FastAPI dependency
1126 >>> callable(require_admin_auth)
1127 True
1128 """
1129 # First-Party
1130 from mcpgateway.config import settings
1132 # Try email authentication first if enabled
1133 if getattr(settings, "email_auth_enabled", False):
1134 try:
1135 # First-Party
1136 from mcpgateway.db import get_db
1137 from mcpgateway.services.email_auth_service import EmailAuthService
1139 token = jwt_token
1140 if not token and credentials:
1141 token = credentials.credentials
1143 if token:
1144 db_session = next(get_db())
1145 try:
1146 # Decode and verify JWT token (use cached version for performance)
1147 payload = await verify_jwt_token_cached(token, request)
1148 await _enforce_revocation_and_active_user(payload)
1149 username = payload.get("sub") or payload.get("username") # Support both new and legacy formats
1151 if username:
1152 # Get user from database
1153 auth_service = EmailAuthService(db_session)
1154 current_user = await auth_service.get_user_by_email(username)
1156 if current_user and not getattr(current_user, "is_active", True):
1157 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Account disabled")
1159 if current_user and current_user.is_admin:
1160 return current_user.email
1161 elif current_user:
1162 # User is authenticated but not admin - check if this is a browser request
1163 accept_header = request.headers.get("accept", "")
1164 if "text/html" in accept_header:
1165 # Redirect browser to login page with error
1166 root_path = request.scope.get("root_path", "")
1167 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Admin privileges required", headers={"Location": f"{root_path}/admin/login?error=admin_required"})
1168 else:
1169 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required")
1170 else:
1171 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
1172 except Exception:
1173 raise
1174 finally:
1175 db_session.close()
1176 except HTTPException as e:
1177 # Re-raise HTTP exceptions (403, redirects, etc.)
1178 if e.status_code != status.HTTP_401_UNAUTHORIZED:
1179 raise
1180 # For 401, check if we should redirect browser users
1181 accept_header = request.headers.get("accept", "")
1182 if "text/html" in accept_header:
1183 root_path = request.scope.get("root_path", "")
1184 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{root_path}/admin/login"})
1185 # If JWT auth fails, fall back to basic auth for backward compatibility
1186 except Exception:
1187 # If there's any other error with email auth, fall back to basic auth
1188 pass # nosec B110 - Intentional fallback to basic auth on any email auth error
1190 # Fall back to basic authentication (gated by API_ALLOW_BASIC_AUTH)
1191 try:
1192 if basic_credentials:
1193 # SECURITY: Basic auth for API endpoints is disabled by default
1194 if not settings.api_allow_basic_auth:
1195 raise HTTPException(
1196 status_code=status.HTTP_401_UNAUTHORIZED,
1197 detail="Basic authentication is disabled for API endpoints. Use JWT or API tokens instead.",
1198 headers={"WWW-Authenticate": "Bearer"},
1199 )
1200 return await verify_basic_credentials(basic_credentials)
1201 else:
1202 raise HTTPException(
1203 status_code=status.HTTP_401_UNAUTHORIZED,
1204 detail="Authentication required",
1205 headers={"WWW-Authenticate": "Bearer"},
1206 )
1207 except HTTPException:
1208 # If both methods fail, check if we should redirect browser users to login page
1209 if getattr(settings, "email_auth_enabled", False):
1210 accept_header = request.headers.get("accept", "")
1211 is_htmx = request.headers.get("hx-request") == "true"
1212 if "text/html" in accept_header or is_htmx:
1213 root_path = request.scope.get("root_path", "")
1214 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{root_path}/admin/login"})
1215 else:
1216 raise HTTPException(
1217 status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required. Please login with email/password or use basic auth.", headers={"WWW-Authenticate": "Bearer"}
1218 )
1219 else:
1220 # Re-raise the basic auth error
1221 raise