Coverage for mcpgateway / utils / verify_credentials.py: 99%

277 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/utils/verify_credentials.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Authentication verification utilities for ContextForge. 

8This module provides JWT and Basic authentication verification functions 

9for securing API endpoints. It supports authentication via Authorization 

10headers and cookies. 

11Examples: 

12 >>> from mcpgateway.utils import verify_credentials as vc 

13 >>> from mcpgateway.utils import jwt_config_helper as jch 

14 >>> from pydantic import SecretStr 

15 >>> class DummySettings: 

16 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

17 ... jwt_algorithm = 'HS256' 

18 ... jwt_audience = 'mcpgateway-api' 

19 ... jwt_issuer = 'mcpgateway' 

20 ... jwt_issuer_verification = True 

21 ... jwt_audience_verification = True 

22 ... jwt_public_key_path = '' 

23 ... jwt_private_key_path = '' 

24 ... basic_auth_user = 'user' 

25 ... basic_auth_password = SecretStr('pass') 

26 ... auth_required = True 

27 ... require_token_expiration = False 

28 ... require_jti = False 

29 ... validate_token_environment = False 

30 ... docs_allow_basic_auth = False 

31 >>> vc.settings = DummySettings() 

32 >>> jch.settings = DummySettings() 

33 >>> import jwt 

34 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256') 

35 >>> import asyncio 

36 >>> asyncio.run(vc.verify_jwt_token(token))['sub'] == 'alice' 

37 True 

38 >>> payload = asyncio.run(vc.verify_credentials(token)) 

39 >>> payload['token'] == token 

40 True 

41 >>> from fastapi.security import HTTPBasicCredentials 

42 >>> creds = HTTPBasicCredentials(username='user', password='pass') 

43 >>> asyncio.run(vc.verify_basic_credentials(creds)) == 'user' 

44 True 

45 >>> creds_bad = HTTPBasicCredentials(username='user', password='wrong') 

46 >>> try: 

47 ... asyncio.run(vc.verify_basic_credentials(creds_bad)) 

48 ... except Exception as e: 

49 ... print('error') 

50 error 

51""" 

52 

53# Standard 

54import asyncio 

55from base64 import b64decode 

56import binascii 

57from typing import Any, Optional 

58 

59# Third-Party 

60from fastapi import Cookie, Depends, HTTPException, Request, status 

61from fastapi.security import HTTPAuthorizationCredentials, HTTPBasic, HTTPBasicCredentials, HTTPBearer 

62from fastapi.security.utils import get_authorization_scheme_param 

63import jwt 

64 

65# First-Party 

66from mcpgateway.config import settings 

67from mcpgateway.services.logging_service import LoggingService 

68from mcpgateway.utils.jwt_config_helper import validate_jwt_algo_and_keys 

69 

70basic_security = HTTPBasic(auto_error=False) 

71security = HTTPBearer(auto_error=False) 

72 

73# Initialize logging service first 

74logging_service = LoggingService() 

75logger = logging_service.get_logger(__name__) 

76 

77 

78def is_proxy_auth_trust_active(settings_obj: Any | None = None) -> bool: 

79 """Return whether proxy-header trust mode is explicitly active. 

80 

81 Args: 

82 settings_obj: Optional settings object override (defaults to global settings). 

83 

84 Returns: 

85 ``True`` when proxy-header trust is explicitly enabled and acknowledged; 

86 otherwise ``False``. 

87 """ 

88 current_settings = settings_obj or settings 

89 

90 if current_settings.mcp_client_auth_enabled or not current_settings.trust_proxy_auth: 

91 return False 

92 

93 if getattr(current_settings, "trust_proxy_auth_dangerously", False) is True: 

94 return True 

95 

96 if not getattr(is_proxy_auth_trust_active, "_warned", False): 

97 logger.warning("Ignoring trusted proxy auth because TRUST_PROXY_AUTH_DANGEROUSLY is false while MCP client auth is disabled.") 

98 is_proxy_auth_trust_active._warned = True # type: ignore[attr-defined] 

99 return False 

100 

101 

102def extract_websocket_bearer_token(query_params: Any, headers: Any, *, query_param_warning: Optional[str] = None) -> Optional[str]: 

103 """Extract bearer token from WebSocket Authorization headers. 

104 

105 Args: 

106 query_params: WebSocket query parameters mapping-like object. 

107 headers: WebSocket headers mapping-like object. 

108 query_param_warning: Optional warning message when legacy query token is detected. 

109 

110 Returns: 

111 Bearer token value when present, otherwise None. 

112 """ 

113 # Do not accept tokens from query parameters. This avoids leaking bearer 

114 # secrets through URL logs/history/proxy telemetry. 

115 query = query_params or {} 

116 legacy_token = query.get("token") if hasattr(query, "get") else None 

117 if legacy_token and query_param_warning: 

118 logger.warning(f"{query_param_warning}; token ignored") 

119 

120 header_values = headers or {} 

121 auth_header = header_values.get("authorization") if hasattr(header_values, "get") else None 

122 if not auth_header and hasattr(header_values, "get"): 

123 auth_header = header_values.get("Authorization") 

124 if auth_header: 

125 scheme, _, credentials = auth_header.partition(" ") 

126 if scheme.lower() == "bearer" and credentials: 

127 return credentials.strip() 

128 return None 

129 

130 

131async def verify_jwt_token(token: str) -> dict: 

132 """Verify and decode a JWT token in a single pass. 

133 

134 Decodes and validates a JWT token using the configured secret key 

135 and algorithm from settings. Uses PyJWT's require option for claim 

136 enforcement instead of a separate unverified decode. 

137 

138 Note: 

139 With single-pass decoding, signature validation occurs before 

140 claim validation. An invalid signature will result in "Invalid token" 

141 error even if the token is also missing required claims. 

142 

143 Args: 

144 token: The JWT token string to verify. 

145 

146 Returns: 

147 dict: The decoded token payload containing claims (e.g., user info). 

148 

149 Raises: 

150 HTTPException: If token is invalid, expired, or missing required claims. 

151 """ 

152 try: 

153 validate_jwt_algo_and_keys() 

154 

155 # Import the verification key helper 

156 # First-Party 

157 from mcpgateway.utils.jwt_config_helper import get_jwt_public_key_or_secret 

158 

159 options = { 

160 "verify_aud": settings.jwt_audience_verification, 

161 "verify_iss": settings.jwt_issuer_verification, 

162 } 

163 

164 if settings.require_token_expiration: 

165 options["require"] = ["exp"] 

166 

167 decode_kwargs = { 

168 "key": get_jwt_public_key_or_secret(), 

169 "algorithms": [settings.jwt_algorithm], 

170 "options": options, 

171 } 

172 

173 if settings.jwt_audience_verification: 

174 decode_kwargs["audience"] = settings.jwt_audience 

175 

176 if settings.jwt_issuer_verification: 

177 decode_kwargs["issuer"] = settings.jwt_issuer 

178 

179 payload = jwt.decode(token, **decode_kwargs) 

180 

181 # Log warning for tokens without expiration (when not required) 

182 if not settings.require_token_expiration and "exp" not in payload: 

183 logger.warning(f"JWT token without expiration accepted. Consider enabling REQUIRE_TOKEN_EXPIRATION for better security. Token sub: {payload.get('sub', 'unknown')}") 

184 

185 # Require JTI if configured 

186 if settings.require_jti and "jti" not in payload: 

187 raise HTTPException( 

188 status_code=status.HTTP_401_UNAUTHORIZED, 

189 detail="Token is missing required JTI claim. Set REQUIRE_JTI=false to allow.", 

190 headers={"WWW-Authenticate": "Bearer"}, 

191 ) 

192 

193 # Log warning for tokens without JTI (when not required) 

194 if not settings.require_jti and "jti" not in payload: 

195 logger.warning(f"JWT token without JTI accepted. Token cannot be revoked. Consider enabling REQUIRE_JTI for better security. Token sub: {payload.get('sub', 'unknown')}") 

196 

197 # Validate environment claim if configured (reject mismatched, allow missing for backward compatibility) 

198 if settings.validate_token_environment: 

199 token_env = payload.get("env") 

200 if token_env is not None and token_env != settings.environment: 

201 raise HTTPException( 

202 status_code=status.HTTP_401_UNAUTHORIZED, 

203 detail=f"Token environment mismatch: token is for '{token_env}', server is '{settings.environment}'", 

204 headers={"WWW-Authenticate": "Bearer"}, 

205 ) 

206 

207 return payload 

208 

209 except jwt.MissingRequiredClaimError: 

210 raise HTTPException( 

211 status_code=status.HTTP_401_UNAUTHORIZED, 

212 detail="Token is missing required expiration claim. Set REQUIRE_TOKEN_EXPIRATION=false to allow.", 

213 headers={"WWW-Authenticate": "Bearer"}, 

214 ) 

215 except jwt.ExpiredSignatureError: 

216 raise HTTPException( 

217 status_code=status.HTTP_401_UNAUTHORIZED, 

218 detail="Token has expired", 

219 headers={"WWW-Authenticate": "Bearer"}, 

220 ) 

221 except jwt.PyJWTError: 

222 raise HTTPException( 

223 status_code=status.HTTP_401_UNAUTHORIZED, 

224 detail="Invalid token", 

225 headers={"WWW-Authenticate": "Bearer"}, 

226 ) 

227 

228 

229async def verify_jwt_token_cached(token: str, request: Optional[Request] = None) -> dict: 

230 """Verify JWT token with request-level caching. 

231 

232 If a request object is provided and the token has already been verified 

233 for this request, returns the cached payload. Otherwise, performs 

234 verification and caches the result in request.state. 

235 

236 Args: 

237 token: JWT token string to verify 

238 request: Optional FastAPI/Starlette request for request-level caching. 

239 Must have a 'state' attribute to enable caching. 

240 

241 Returns: 

242 dict: Decoded and verified JWT payload 

243 

244 Raises: 

245 HTTPException: If token is invalid, expired, or missing required claims. 

246 """ 

247 # Check request.state cache first (safely handle non-Request objects) 

248 if request is not None and hasattr(request, "state"): 

249 cached = getattr(request.state, "_jwt_verified_payload", None) 

250 # Verify cache is a valid tuple of (token, payload) before unpacking 

251 if cached is not None and isinstance(cached, tuple) and len(cached) == 2: 

252 cached_token, cached_payload = cached 

253 if cached_token == token: 

254 return cached_payload 

255 

256 # Verify token (single decode) 

257 payload = await verify_jwt_token(token) 

258 

259 # Cache in request.state for reuse across middleware 

260 if request is not None and hasattr(request, "state"): 

261 request.state._jwt_verified_payload = (token, payload) 

262 

263 return payload 

264 

265 

266async def verify_credentials(token: str) -> dict: 

267 """Verify credentials using a JWT token. 

268 

269 A wrapper around verify_jwt_token that adds the original token 

270 to the decoded payload for reference. 

271 

272 This function uses verify_jwt_token internally which may raise exceptions. 

273 

274 Args: 

275 token: The JWT token string to verify. 

276 

277 Returns: 

278 dict: The validated token payload with the original token added 

279 under the 'token' key. 

280 

281 Examples: 

282 >>> from mcpgateway.utils import verify_credentials as vc 

283 >>> from mcpgateway.utils import jwt_config_helper as jch 

284 >>> from pydantic import SecretStr 

285 >>> class DummySettings: 

286 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

287 ... jwt_algorithm = 'HS256' 

288 ... jwt_audience = 'mcpgateway-api' 

289 ... jwt_issuer = 'mcpgateway' 

290 ... jwt_audience_verification = True 

291 ... jwt_issuer_verification = True 

292 ... jwt_public_key_path = '' 

293 ... jwt_private_key_path = '' 

294 ... basic_auth_user = 'user' 

295 ... basic_auth_password = SecretStr('pass') 

296 ... auth_required = True 

297 ... require_token_expiration = False 

298 ... require_jti = False 

299 ... validate_token_environment = False 

300 ... docs_allow_basic_auth = False 

301 >>> vc.settings = DummySettings() 

302 >>> jch.settings = DummySettings() 

303 >>> import jwt 

304 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256') 

305 >>> import asyncio 

306 >>> payload = asyncio.run(vc.verify_credentials(token)) 

307 >>> payload['token'] == token 

308 True 

309 """ 

310 payload = await verify_jwt_token(token) 

311 payload["token"] = token 

312 return payload 

313 

314 

315async def verify_credentials_cached(token: str, request: Optional[Request] = None) -> dict: 

316 """Verify credentials using a JWT token with request-level caching. 

317 

318 A wrapper around verify_jwt_token_cached that adds the original token 

319 to the decoded payload for reference. 

320 

321 Args: 

322 token: The JWT token string to verify. 

323 request: Optional FastAPI/Starlette request for request-level caching. 

324 

325 Returns: 

326 dict: The validated token payload with the original token added 

327 under the 'token' key. Returns a copy to avoid mutating cached payload. 

328 """ 

329 payload = await verify_jwt_token_cached(token, request) 

330 # Return a copy with token added to avoid mutating the cached payload 

331 return {**payload, "token": token} 

332 

333 

334def _raise_auth_401(detail: str) -> None: 

335 """Raise a standardized bearer-auth 401 error. 

336 

337 Args: 

338 detail: Error detail message for the response body. 

339 

340 Raises: 

341 HTTPException: Always raises 401 Unauthorized with Bearer auth header. 

342 """ 

343 raise HTTPException( 

344 status_code=status.HTTP_401_UNAUTHORIZED, 

345 detail=detail, 

346 headers={"WWW-Authenticate": "Bearer"}, 

347 ) 

348 

349 

350async def _enforce_revocation_and_active_user(payload: dict) -> None: 

351 """Enforce token revocation and active-user checks for JWT-authenticated flows. 

352 

353 Args: 

354 payload: Verified JWT payload used to derive revocation and user status checks. 

355 

356 Raises: 

357 HTTPException: 401 when the token is revoked, the account is disabled, 

358 or strict user-in-db mode rejects a missing user. 

359 """ 

360 # First-Party 

361 from mcpgateway.auth import _check_token_revoked_sync, _get_user_by_email_sync 

362 

363 jti = payload.get("jti") 

364 if jti: 

365 try: 

366 if await asyncio.to_thread(_check_token_revoked_sync, jti): 

367 _raise_auth_401("Token has been revoked") 

368 except HTTPException: 

369 raise 

370 except Exception as exc: 

371 logger.warning("Token revocation check failed for JTI %s: %s", jti, exc) 

372 

373 username = payload.get("sub") or payload.get("email") or payload.get("username") 

374 if not username: 

375 return 

376 

377 try: 

378 user = await asyncio.to_thread(_get_user_by_email_sync, username) 

379 except Exception as exc: 

380 logger.warning("User status check failed for %s: %s", username, exc) 

381 return 

382 

383 if user is None: 

384 if settings.require_user_in_db and username != getattr(settings, "platform_admin_email", "admin@example.com"): 

385 _raise_auth_401("User not found in database") 

386 return 

387 

388 if not user.is_active: 

389 _raise_auth_401("Account disabled") 

390 

391 

392async def require_auth(request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), jwt_token: Optional[str] = Cookie(default=None)) -> str | dict: 

393 """Require authentication via JWT token or proxy headers. 

394 

395 FastAPI dependency that checks for authentication via: 

396 1. Proxy headers (if mcp_client_auth_enabled=false and trust_proxy_auth=true) 

397 2. JWT token in Authorization header (Bearer scheme) 

398 3. JWT token in cookies 

399 

400 If authentication is required but no token is provided, raises an HTTP 401 error. 

401 

402 Args: 

403 request: The FastAPI request object for accessing headers. 

404 credentials: HTTP Authorization credentials from the request header. 

405 jwt_token: JWT token from cookies. 

406 

407 Returns: 

408 str | dict: The verified credentials payload if authenticated, 

409 proxy user if proxy auth enabled, or "anonymous" if authentication is not required. 

410 

411 Raises: 

412 HTTPException: 401 status if authentication is required but no valid 

413 token is provided. 

414 

415 Examples: 

416 >>> from mcpgateway.utils import verify_credentials as vc 

417 >>> from mcpgateway.utils import jwt_config_helper as jch 

418 >>> from pydantic import SecretStr 

419 >>> class DummySettings: 

420 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

421 ... jwt_algorithm = 'HS256' 

422 ... jwt_audience = 'mcpgateway-api' 

423 ... jwt_issuer = 'mcpgateway' 

424 ... jwt_audience_verification = True 

425 ... jwt_issuer_verification = True 

426 ... jwt_public_key_path = '' 

427 ... jwt_private_key_path = '' 

428 ... basic_auth_user = 'user' 

429 ... basic_auth_password = SecretStr('pass') 

430 ... auth_required = True 

431 ... mcp_client_auth_enabled = True 

432 ... trust_proxy_auth = False 

433 ... proxy_user_header = 'X-Authenticated-User' 

434 ... require_token_expiration = False 

435 ... require_jti = False 

436 ... validate_token_environment = False 

437 ... docs_allow_basic_auth = False 

438 >>> vc.settings = DummySettings() 

439 >>> jch.settings = DummySettings() 

440 >>> import jwt 

441 >>> from fastapi.security import HTTPAuthorizationCredentials 

442 >>> from fastapi import Request 

443 >>> import asyncio 

444 

445 Test with valid credentials in header: 

446 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256') 

447 >>> creds = HTTPAuthorizationCredentials(scheme='Bearer', credentials=token) 

448 >>> req = Request(scope={'type': 'http', 'headers': []}) 

449 >>> result = asyncio.run(vc.require_auth(request=req, credentials=creds, jwt_token=None)) 

450 >>> result['sub'] == 'alice' 

451 True 

452 

453 Test with valid token in cookie: 

454 >>> result = asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=token)) 

455 >>> result['sub'] == 'alice' 

456 True 

457 

458 Test with auth required but no token: 

459 >>> try: 

460 ... asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=None)) 

461 ... except vc.HTTPException as e: 

462 ... print(e.status_code, e.detail) 

463 401 Not authenticated 

464 

465 Test with auth not required: 

466 >>> vc.settings.auth_required = False 

467 >>> result = asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=None)) 

468 >>> result 

469 'anonymous' 

470 >>> vc.settings.auth_required = True 

471 """ 

472 # If MCP client auth is disabled and proxy auth is trusted, use proxy headers 

473 if not settings.mcp_client_auth_enabled: 

474 if is_proxy_auth_trust_active(): 

475 # Extract user from proxy header 

476 proxy_user = request.headers.get(settings.proxy_user_header) 

477 if proxy_user: 

478 return {"sub": proxy_user, "source": "proxy", "token": None} # nosec B105 - None is not a password 

479 # No proxy header - check auth_required (matches RBAC/WebSocket behavior) 

480 if settings.auth_required: 

481 raise HTTPException( 

482 status_code=status.HTTP_401_UNAUTHORIZED, 

483 detail="Proxy authentication header required", 

484 headers={"WWW-Authenticate": "Bearer"}, 

485 ) 

486 return "anonymous" 

487 else: 

488 # Warning: MCP auth disabled without proxy trust - security risk! 

489 # This case is already warned about in config validation 

490 if settings.auth_required: 

491 raise HTTPException( 

492 status_code=status.HTTP_401_UNAUTHORIZED, 

493 detail="Authentication required but no auth method configured", 

494 headers={"WWW-Authenticate": "Bearer"}, 

495 ) 

496 return "anonymous" 

497 

498 # Standard JWT authentication flow - prioritize manual cookie reading 

499 token = None 

500 

501 # 1. First try manual cookie reading (most reliable) 

502 if hasattr(request, "cookies") and request.cookies: 

503 manual_token = request.cookies.get("jwt_token") 

504 if manual_token: 

505 token = manual_token 

506 

507 # 2. Then try Authorization header 

508 if not token and credentials and credentials.credentials: 

509 token = credentials.credentials 

510 

511 # 3. Finally try FastAPI Cookie dependency (fallback) 

512 if not token and jwt_token: 

513 token = jwt_token 

514 

515 if settings.auth_required and not token: 

516 _raise_auth_401("Not authenticated") 

517 

518 if not token: 

519 return "anonymous" 

520 

521 payload = await verify_credentials_cached(token, request) 

522 await _enforce_revocation_and_active_user(payload) 

523 return payload 

524 

525 

526async def verify_basic_credentials(credentials: HTTPBasicCredentials) -> str: 

527 """Verify HTTP Basic authentication credentials. 

528 

529 Validates the provided username and password against the configured 

530 basic auth credentials in settings. 

531 

532 Args: 

533 credentials: HTTP Basic credentials containing username and password. 

534 

535 Returns: 

536 str: The authenticated username if credentials are valid. 

537 

538 Raises: 

539 HTTPException: 401 status if credentials are invalid. 

540 

541 Examples: 

542 >>> from mcpgateway.utils import verify_credentials as vc 

543 >>> from pydantic import SecretStr 

544 >>> class DummySettings: 

545 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

546 ... jwt_algorithm = 'HS256' 

547 ... jwt_audience = 'mcpgateway-api' 

548 ... jwt_issuer = 'mcpgateway' 

549 ... jwt_audience_verification = True 

550 ... jwt_issuer_verification = True 

551 ... basic_auth_user = 'user' 

552 ... basic_auth_password = SecretStr('pass') 

553 ... auth_required = True 

554 ... docs_allow_basic_auth = False 

555 >>> vc.settings = DummySettings() 

556 >>> from fastapi.security import HTTPBasicCredentials 

557 >>> creds = HTTPBasicCredentials(username='user', password='pass') 

558 >>> import asyncio 

559 >>> asyncio.run(vc.verify_basic_credentials(creds)) == 'user' 

560 True 

561 >>> creds_bad = HTTPBasicCredentials(username='user', password='wrong') 

562 >>> try: 

563 ... asyncio.run(vc.verify_basic_credentials(creds_bad)) 

564 ... except Exception as e: 

565 ... print('error') 

566 error 

567 """ 

568 is_valid_user = credentials.username == settings.basic_auth_user 

569 is_valid_pass = credentials.password == settings.basic_auth_password.get_secret_value() 

570 

571 if not (is_valid_user and is_valid_pass): 

572 raise HTTPException( 

573 status_code=status.HTTP_401_UNAUTHORIZED, 

574 detail="Invalid credentials", 

575 headers={"WWW-Authenticate": "Basic"}, 

576 ) 

577 return credentials.username 

578 

579 

580async def require_basic_auth(credentials: HTTPBasicCredentials = Depends(basic_security)) -> str: 

581 """Require valid HTTP Basic authentication. 

582 

583 FastAPI dependency that enforces Basic authentication when enabled. 

584 Returns the authenticated username or "anonymous" if auth is not required. 

585 

586 Args: 

587 credentials: HTTP Basic credentials provided by the client. 

588 

589 Returns: 

590 str: The authenticated username or "anonymous" if auth is not required. 

591 

592 Raises: 

593 HTTPException: 401 status if authentication is required but no valid 

594 credentials are provided. 

595 

596 Examples: 

597 >>> from mcpgateway.utils import verify_credentials as vc 

598 >>> from pydantic import SecretStr 

599 >>> class DummySettings: 

600 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

601 ... jwt_algorithm = 'HS256' 

602 ... jwt_audience = 'mcpgateway-api' 

603 ... jwt_issuer = 'mcpgateway' 

604 ... jwt_audience_verification = True 

605 ... jwt_issuer_verification = True 

606 ... basic_auth_user = 'user' 

607 ... basic_auth_password = SecretStr('pass') 

608 ... auth_required = True 

609 ... docs_allow_basic_auth = False 

610 >>> vc.settings = DummySettings() 

611 >>> from fastapi.security import HTTPBasicCredentials 

612 >>> import asyncio 

613 

614 Test with valid credentials: 

615 >>> creds = HTTPBasicCredentials(username='user', password='pass') 

616 >>> asyncio.run(vc.require_basic_auth(creds)) 

617 'user' 

618 

619 Test with auth required but no credentials: 

620 >>> try: 

621 ... asyncio.run(vc.require_basic_auth(None)) 

622 ... except vc.HTTPException as e: 

623 ... print(e.status_code, e.detail) 

624 401 Not authenticated 

625 

626 Test with auth not required: 

627 >>> vc.settings.auth_required = False 

628 >>> asyncio.run(vc.require_basic_auth(None)) 

629 'anonymous' 

630 >>> vc.settings.auth_required = True 

631 """ 

632 if settings.auth_required: 

633 if not credentials: 

634 raise HTTPException( 

635 status_code=status.HTTP_401_UNAUTHORIZED, 

636 detail="Not authenticated", 

637 headers={"WWW-Authenticate": "Basic"}, 

638 ) 

639 return await verify_basic_credentials(credentials) 

640 return "anonymous" 

641 

642 

643async def require_docs_basic_auth(auth_header: str) -> str: 

644 """Dedicated handler for HTTP Basic Auth for documentation endpoints only. 

645 

646 This function is ONLY intended for /docs, /redoc, or similar endpoints, and is enabled 

647 via the settings.docs_allow_basic_auth flag. It should NOT be used for general API authentication. 

648 

649 Args: 

650 auth_header: Raw Authorization header value (e.g. "Basic username:password"). 

651 

652 Returns: 

653 str: The authenticated username if credentials are valid. 

654 

655 Raises: 

656 HTTPException: If credentials are invalid or malformed. 

657 ValueError: If the basic auth format is invalid (missing colon). 

658 

659 Examples: 

660 >>> from mcpgateway.utils import verify_credentials as vc 

661 >>> from pydantic import SecretStr 

662 >>> class DummySettings: 

663 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

664 ... jwt_algorithm = 'HS256' 

665 ... jwt_audience = 'mcpgateway-api' 

666 ... jwt_issuer = 'mcpgateway' 

667 ... jwt_audience_verification = True 

668 ... jwt_issuer_verification = True 

669 ... basic_auth_user = 'user' 

670 ... basic_auth_password = SecretStr('pass') 

671 ... auth_required = True 

672 ... require_token_expiration = False 

673 ... require_jti = False 

674 ... validate_token_environment = False 

675 ... docs_allow_basic_auth = True 

676 >>> vc.settings = DummySettings() 

677 >>> import base64, asyncio 

678 

679 Test with properly encoded credentials: 

680 >>> userpass = base64.b64encode(b'user:pass').decode() 

681 >>> auth_header = f'Basic {userpass}' 

682 >>> asyncio.run(vc.require_docs_basic_auth(auth_header)) 

683 'user' 

684 

685 Test with different valid credentials: 

686 >>> valid_creds = base64.b64encode(b'user:pass').decode() 

687 >>> valid_header = f'Basic {valid_creds}' 

688 >>> result = asyncio.run(vc.require_docs_basic_auth(valid_header)) 

689 >>> result == 'user' 

690 True 

691 

692 Test with invalid password: 

693 >>> badpass = base64.b64encode(b'user:wrong').decode() 

694 >>> bad_header = f'Basic {badpass}' 

695 >>> try: 

696 ... asyncio.run(vc.require_docs_basic_auth(bad_header)) 

697 ... except vc.HTTPException as e: 

698 ... e.status_code == 401 

699 True 

700 

701 Test with malformed base64 (no colon): 

702 >>> malformed = base64.b64encode(b'userpass').decode() 

703 >>> malformed_header = f'Basic {malformed}' 

704 >>> try: 

705 ... asyncio.run(vc.require_docs_basic_auth(malformed_header)) 

706 ... except vc.HTTPException as e: 

707 ... e.status_code == 401 

708 True 

709 

710 Test with invalid base64 encoding: 

711 >>> invalid_header = 'Basic invalid_base64!' 

712 >>> try: 

713 ... asyncio.run(vc.require_docs_basic_auth(invalid_header)) 

714 ... except vc.HTTPException as e: 

715 ... 'Invalid basic auth credentials' in e.detail 

716 True 

717 

718 Test when docs_allow_basic_auth is disabled: 

719 >>> vc.settings.docs_allow_basic_auth = False 

720 >>> try: 

721 ... asyncio.run(vc.require_docs_basic_auth(auth_header)) 

722 ... except vc.HTTPException as e: 

723 ... 'not allowed' in e.detail 

724 True 

725 >>> vc.settings.docs_allow_basic_auth = True 

726 

727 Test with non-Basic auth scheme: 

728 >>> bearer_header = 'Bearer eyJhbGciOiJIUzI1NiJ9...' 

729 >>> try: 

730 ... asyncio.run(vc.require_docs_basic_auth(bearer_header)) 

731 ... except vc.HTTPException as e: 

732 ... e.status_code == 401 

733 True 

734 

735 Test with empty credentials part: 

736 >>> empty_header = 'Basic ' 

737 >>> try: 

738 ... asyncio.run(vc.require_docs_basic_auth(empty_header)) 

739 ... except vc.HTTPException as e: 

740 ... 'not allowed' in e.detail 

741 True 

742 

743 Test with Unicode decode error: 

744 >>> from base64 import b64encode 

745 >>> bad_bytes = bytes([0xff, 0xfe]) # Invalid UTF-8 bytes 

746 >>> bad_unicode = b64encode(bad_bytes).decode() 

747 >>> unicode_header = f'Basic {bad_unicode}' 

748 >>> try: 

749 ... asyncio.run(vc.require_docs_basic_auth(unicode_header)) 

750 ... except vc.HTTPException as e: 

751 ... 'Invalid basic auth credentials' in e.detail 

752 True 

753 """ 

754 scheme, param = get_authorization_scheme_param(auth_header) 

755 if scheme.lower() == "basic" and param and settings.docs_allow_basic_auth: 

756 try: 

757 data = b64decode(param).decode("ascii") 

758 username, separator, password = data.partition(":") 

759 if not separator: 

760 raise ValueError("Invalid basic auth format") 

761 credentials = HTTPBasicCredentials(username=username, password=password) 

762 return await require_basic_auth(credentials=credentials) 

763 except (ValueError, UnicodeDecodeError, binascii.Error): 

764 raise HTTPException( 

765 status_code=status.HTTP_401_UNAUTHORIZED, 

766 detail="Invalid basic auth credentials", 

767 headers={"WWW-Authenticate": "Basic"}, 

768 ) 

769 raise HTTPException( 

770 status_code=status.HTTP_401_UNAUTHORIZED, 

771 detail="Basic authentication not allowed or malformed", 

772 headers={"WWW-Authenticate": "Basic"}, 

773 ) 

774 

775 

776async def require_docs_auth_override( 

777 auth_header: str | None = None, 

778 jwt_token: str | None = None, 

779) -> str | dict: 

780 """Require authentication for docs endpoints, bypassing global auth settings. 

781 

782 This function specifically validates JWT tokens for documentation endpoints 

783 (/docs, /redoc, /openapi.json) regardless of global authentication settings 

784 like mcp_client_auth_enabled or auth_required. 

785 

786 Args: 

787 auth_header: Raw Authorization header value (e.g. "Bearer eyJhbGciOi..."). 

788 jwt_token: JWT token from cookies. 

789 

790 Returns: 

791 str | dict: The decoded JWT payload. 

792 

793 Raises: 

794 HTTPException: If authentication fails or credentials are invalid. 

795 

796 Examples: 

797 >>> from mcpgateway.utils import verify_credentials as vc 

798 >>> from mcpgateway.utils import jwt_config_helper as jch 

799 >>> class DummySettings: 

800 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

801 ... jwt_algorithm = 'HS256' 

802 ... jwt_audience = 'mcpgateway-api' 

803 ... jwt_issuer = 'mcpgateway' 

804 ... jwt_audience_verification = True 

805 ... jwt_issuer_verification = True 

806 ... jwt_public_key_path = '' 

807 ... jwt_private_key_path = '' 

808 ... docs_allow_basic_auth = False 

809 ... require_token_expiration = False 

810 ... require_jti = False 

811 ... validate_token_environment = False 

812 >>> vc.settings = DummySettings() 

813 >>> jch.settings = DummySettings() 

814 >>> import jwt 

815 >>> import asyncio 

816 

817 Test with valid JWT: 

818 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256') 

819 >>> auth_header = f'Bearer {token}' 

820 >>> result = asyncio.run(vc.require_docs_auth_override(auth_header=auth_header)) 

821 >>> result['sub'] == 'alice' 

822 True 

823 

824 Test with no token: 

825 >>> try: 

826 ... asyncio.run(vc.require_docs_auth_override()) 

827 ... except vc.HTTPException as e: 

828 ... print(e.status_code, e.detail) 

829 401 Not authenticated 

830 """ 

831 # Extract token from header or cookie 

832 token = jwt_token 

833 if auth_header: 

834 scheme, param = get_authorization_scheme_param(auth_header) 

835 if scheme.lower() == "bearer" and param: 

836 token = param 

837 elif scheme.lower() == "basic" and param and settings.docs_allow_basic_auth: 

838 # Only allow Basic Auth for docs endpoints when explicitly enabled 

839 return await require_docs_basic_auth(auth_header) 

840 

841 # Always require a token for docs endpoints 

842 if not token: 

843 raise HTTPException( 

844 status_code=status.HTTP_401_UNAUTHORIZED, 

845 detail="Not authenticated", 

846 headers={"WWW-Authenticate": "Bearer"}, 

847 ) 

848 

849 # Validate JWT and enforce standard token/account status checks. 

850 payload = await verify_credentials(token) 

851 if isinstance(payload, dict): 

852 await _enforce_revocation_and_active_user(payload) 

853 return payload 

854 

855 

856async def require_auth_override( 

857 auth_header: str | None = None, 

858 jwt_token: str | None = None, 

859 request: Request | None = None, 

860) -> str | dict: 

861 """Call require_auth manually from middleware without FastAPI dependency injection. 

862 

863 This wrapper allows manual authentication verification in contexts where 

864 FastAPI's dependency injection is not available (e.g., middleware). 

865 It parses the Authorization header and creates the appropriate credentials 

866 object before calling require_auth. 

867 

868 Args: 

869 auth_header: Raw Authorization header value (e.g. "Bearer eyJhbGciOi..."). 

870 jwt_token: JWT taken from a cookie. If both header and cookie are 

871 supplied, the header takes precedence. 

872 request: Optional Request object for accessing headers (used for proxy auth). 

873 

874 Returns: 

875 str | dict: The decoded JWT payload or the string "anonymous", 

876 same as require_auth. 

877 

878 Raises: 

879 HTTPException: If authentication fails or credentials are invalid. 

880 ValueError: If basic auth credentials are malformed. 

881 

882 Note: 

883 This wrapper may propagate HTTPException raised by require_auth, 

884 but it does not raise anything on its own. 

885 

886 Examples: 

887 >>> from mcpgateway.utils import verify_credentials as vc 

888 >>> from mcpgateway.utils import jwt_config_helper as jch 

889 >>> from pydantic import SecretStr 

890 >>> class DummySettings: 

891 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

892 ... jwt_algorithm = 'HS256' 

893 ... jwt_audience = 'mcpgateway-api' 

894 ... jwt_issuer = 'mcpgateway' 

895 ... jwt_audience_verification = True 

896 ... jwt_issuer_verification = True 

897 ... jwt_public_key_path = '' 

898 ... jwt_private_key_path = '' 

899 ... basic_auth_user = 'user' 

900 ... basic_auth_password = SecretStr('pass') 

901 ... auth_required = True 

902 ... mcp_client_auth_enabled = True 

903 ... trust_proxy_auth = False 

904 ... proxy_user_header = 'X-Authenticated-User' 

905 ... require_token_expiration = False 

906 ... require_jti = False 

907 ... validate_token_environment = False 

908 ... docs_allow_basic_auth = False 

909 >>> vc.settings = DummySettings() 

910 >>> jch.settings = DummySettings() 

911 >>> import jwt 

912 >>> import asyncio 

913 

914 Test with Bearer token in auth header: 

915 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256') 

916 >>> auth_header = f'Bearer {token}' 

917 >>> result = asyncio.run(vc.require_auth_override(auth_header=auth_header)) 

918 >>> result['sub'] == 'alice' 

919 True 

920 

921 Test with invalid auth scheme: 

922 >>> auth_header = 'Basic dXNlcjpwYXNz' # Base64 encoded user:pass 

923 >>> vc.settings.auth_required = False 

924 >>> result = asyncio.run(vc.require_auth_override(auth_header=auth_header)) 

925 >>> result 

926 'anonymous' 

927 

928 Test with only cookie token: 

929 >>> result = asyncio.run(vc.require_auth_override(jwt_token=token)) 

930 >>> result['sub'] == 'alice' 

931 True 

932 

933 Test with no auth: 

934 >>> result = asyncio.run(vc.require_auth_override()) 

935 >>> result 

936 'anonymous' 

937 >>> vc.settings.auth_required = True 

938 """ 

939 # Create a mock request if not provided (for backward compatibility) 

940 if request is None: 

941 request = Request(scope={"type": "http", "headers": []}) 

942 

943 credentials = None 

944 if auth_header: 

945 scheme, param = get_authorization_scheme_param(auth_header) 

946 if scheme.lower() == "bearer" and param: 

947 credentials = HTTPAuthorizationCredentials(scheme=scheme, credentials=param) 

948 elif scheme.lower() == "basic" and param and settings.docs_allow_basic_auth: 

949 # Only allow Basic Auth for docs endpoints when explicitly enabled 

950 return await require_docs_basic_auth(auth_header) 

951 return await require_auth(request=request, credentials=credentials, jwt_token=jwt_token) 

952 

953 

954async def require_auth_header_first( 

955 auth_header: str | None = None, 

956 jwt_token: str | None = None, 

957 request: Request | None = None, 

958) -> str | dict: 

959 """Like require_auth_override but Authorization header takes precedence over cookies. 

960 

961 Token resolution order (matches streamable_http_auth middleware): 

962 1. Authorization Bearer header (highest priority) 

963 2. Cookie ``jwt_token`` from ``request.cookies`` 

964 3. ``jwt_token`` keyword argument 

965 

966 Use this in the stateful-session fallback (``_get_request_context_or_default``) 

967 so that identity is consistent with the ASGI middleware that already 

968 authenticated the primary request. 

969 

970 Args: 

971 auth_header: Raw Authorization header value (e.g. "Bearer eyJhbGciOi..."). 

972 jwt_token: JWT taken from a cookie. Used only when no header token and no 

973 request cookie are present. 

974 request: Optional Request object. A bare empty request is created when 

975 *None* is supplied (backward-compatible default). 

976 

977 Returns: 

978 str | dict: The decoded JWT payload or the string "anonymous". 

979 

980 Raises: 

981 HTTPException: If authentication fails or credentials are invalid. 

982 

983 Examples: 

984 >>> from mcpgateway.utils import verify_credentials as vc 

985 >>> from mcpgateway.utils import jwt_config_helper as jch 

986 >>> from pydantic import SecretStr 

987 >>> class DummySettings: 

988 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

989 ... jwt_algorithm = 'HS256' 

990 ... jwt_audience = 'mcpgateway-api' 

991 ... jwt_issuer = 'mcpgateway' 

992 ... jwt_audience_verification = True 

993 ... jwt_issuer_verification = True 

994 ... jwt_public_key_path = '' 

995 ... jwt_private_key_path = '' 

996 ... basic_auth_user = 'user' 

997 ... basic_auth_password = SecretStr('pass') 

998 ... auth_required = True 

999 ... mcp_client_auth_enabled = True 

1000 ... trust_proxy_auth = False 

1001 ... proxy_user_header = 'X-Authenticated-User' 

1002 ... require_token_expiration = False 

1003 ... require_jti = False 

1004 ... validate_token_environment = False 

1005 ... docs_allow_basic_auth = False 

1006 >>> vc.settings = DummySettings() 

1007 >>> jch.settings = DummySettings() 

1008 >>> import jwt 

1009 >>> import asyncio 

1010 

1011 Test header wins over cookie (the core fix): 

1012 >>> header_tok = jwt.encode({'sub': 'header-user', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256') 

1013 >>> result = asyncio.run(vc.require_auth_header_first(auth_header=f'Bearer {header_tok}')) 

1014 >>> result['sub'] == 'header-user' 

1015 True 

1016 

1017 Test cookie fallback when no header: 

1018 >>> cookie_tok = jwt.encode({'sub': 'cookie-user', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256') 

1019 >>> result = asyncio.run(vc.require_auth_header_first(jwt_token=cookie_tok)) 

1020 >>> result['sub'] == 'cookie-user' 

1021 True 

1022 

1023 Test no auth when not required: 

1024 >>> vc.settings.auth_required = False 

1025 >>> result = asyncio.run(vc.require_auth_header_first()) 

1026 >>> result 

1027 'anonymous' 

1028 >>> vc.settings.auth_required = True 

1029 """ 

1030 if request is None: 

1031 request = Request(scope={"type": "http", "headers": []}) 

1032 

1033 # Proxy auth path — identical to require_auth 

1034 if not settings.mcp_client_auth_enabled: 

1035 if is_proxy_auth_trust_active(): 

1036 proxy_user = request.headers.get(settings.proxy_user_header) 

1037 if proxy_user: 

1038 return {"sub": proxy_user, "source": "proxy", "token": None} # nosec B105 - None is not a password 

1039 if settings.auth_required: 

1040 raise HTTPException( 

1041 status_code=status.HTTP_401_UNAUTHORIZED, 

1042 detail="Proxy authentication header required", 

1043 headers={"WWW-Authenticate": "Bearer"}, 

1044 ) 

1045 return "anonymous" 

1046 if settings.auth_required: 

1047 raise HTTPException( 

1048 status_code=status.HTTP_401_UNAUTHORIZED, 

1049 detail="Authentication required but no auth method configured", 

1050 headers={"WWW-Authenticate": "Bearer"}, 

1051 ) 

1052 return "anonymous" 

1053 

1054 # Parse auth header once 

1055 scheme = param = "" 

1056 if auth_header: 

1057 scheme, param = get_authorization_scheme_param(auth_header) 

1058 if scheme.lower() == "basic" and param and settings.docs_allow_basic_auth: 

1059 return await require_docs_basic_auth(auth_header) 

1060 

1061 # Header-first JWT token resolution 

1062 token: str | None = None 

1063 

1064 # 1. Authorization Bearer header (highest priority — matches middleware) 

1065 if scheme.lower() == "bearer" and param: 

1066 token = param 

1067 

1068 # 2. Cookie from request.cookies 

1069 if not token and hasattr(request, "cookies") and request.cookies: 

1070 token = request.cookies.get("jwt_token") or None 

1071 

1072 # 3. jwt_token keyword argument 

1073 if not token and jwt_token: 

1074 token = jwt_token 

1075 

1076 if settings.auth_required and not token: 

1077 raise HTTPException( 

1078 status_code=status.HTTP_401_UNAUTHORIZED, 

1079 detail="Not authenticated", 

1080 headers={"WWW-Authenticate": "Bearer"}, 

1081 ) 

1082 return await verify_credentials_cached(token, request) if token else "anonymous" 

1083 

1084 

1085async def require_admin_auth( 

1086 request: Request, 

1087 credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), 

1088 jwt_token: Optional[str] = Cookie(None, alias="jwt_token"), 

1089 basic_credentials: Optional[HTTPBasicCredentials] = Depends(basic_security), 

1090) -> str: 

1091 """Require admin authentication supporting both email auth and basic auth. 

1092 

1093 This dependency supports multiple authentication methods: 

1094 1. Email-based JWT authentication (when EMAIL_AUTH_ENABLED=true) 

1095 2. Basic authentication (legacy support) 

1096 3. Proxy headers (if configured) 

1097 

1098 For email auth, the user must have is_admin=true. 

1099 For basic auth, uses the configured BASIC_AUTH_USER/PASSWORD. 

1100 

1101 Args: 

1102 request: FastAPI request object 

1103 credentials: HTTP Authorization credentials 

1104 jwt_token: JWT token from cookies 

1105 basic_credentials: HTTP Basic auth credentials 

1106 

1107 Returns: 

1108 str: Username/email of authenticated admin user 

1109 

1110 Raises: 

1111 HTTPException: 401 if authentication fails, 403 if user is not admin 

1112 RedirectResponse: Redirect to login page for browser requests 

1113 

1114 Examples: 

1115 >>> # This function is typically used as a FastAPI dependency 

1116 >>> callable(require_admin_auth) 

1117 True 

1118 """ 

1119 # First-Party 

1120 from mcpgateway.config import settings 

1121 

1122 # Try email authentication first if enabled 

1123 if getattr(settings, "email_auth_enabled", False): 

1124 try: 

1125 # First-Party 

1126 from mcpgateway.db import get_db 

1127 from mcpgateway.services.email_auth_service import EmailAuthService 

1128 

1129 token = jwt_token 

1130 if not token and credentials: 

1131 token = credentials.credentials 

1132 

1133 if token: 

1134 db_session = next(get_db()) 

1135 try: 

1136 # Decode and verify JWT token (use cached version for performance) 

1137 payload = await verify_jwt_token_cached(token, request) 

1138 await _enforce_revocation_and_active_user(payload) 

1139 username = payload.get("sub") or payload.get("username") # Support both new and legacy formats 

1140 

1141 if username: 

1142 # Get user from database 

1143 auth_service = EmailAuthService(db_session) 

1144 current_user = await auth_service.get_user_by_email(username) 

1145 

1146 if current_user and not getattr(current_user, "is_active", True): 

1147 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Account disabled") 

1148 

1149 if current_user and current_user.is_admin: 

1150 return current_user.email 

1151 elif current_user: 

1152 # User is authenticated but not admin - check if this is a browser request 

1153 accept_header = request.headers.get("accept", "") 

1154 if "text/html" in accept_header: 

1155 # Redirect browser to login page with error 

1156 root_path = request.scope.get("root_path", "") 

1157 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Admin privileges required", headers={"Location": f"{root_path}/admin/login?error=admin_required"}) 

1158 else: 

1159 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required") 

1160 else: 

1161 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") 

1162 except Exception: 

1163 raise 

1164 finally: 

1165 db_session.close() 

1166 except HTTPException as e: 

1167 # Re-raise HTTP exceptions (403, redirects, etc.) 

1168 if e.status_code != status.HTTP_401_UNAUTHORIZED: 

1169 raise 

1170 # For 401, check if we should redirect browser users 

1171 accept_header = request.headers.get("accept", "") 

1172 if "text/html" in accept_header: 

1173 root_path = request.scope.get("root_path", "") 

1174 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{root_path}/admin/login"}) 

1175 # If JWT auth fails, fall back to basic auth for backward compatibility 

1176 except Exception: 

1177 # If there's any other error with email auth, fall back to basic auth 

1178 pass # nosec B110 - Intentional fallback to basic auth on any email auth error 

1179 

1180 # Fall back to basic authentication (gated by API_ALLOW_BASIC_AUTH) 

1181 try: 

1182 if basic_credentials: 

1183 # SECURITY: Basic auth for API endpoints is disabled by default 

1184 if not settings.api_allow_basic_auth: 

1185 raise HTTPException( 

1186 status_code=status.HTTP_401_UNAUTHORIZED, 

1187 detail="Basic authentication is disabled for API endpoints. Use JWT or API tokens instead.", 

1188 headers={"WWW-Authenticate": "Bearer"}, 

1189 ) 

1190 return await verify_basic_credentials(basic_credentials) 

1191 else: 

1192 raise HTTPException( 

1193 status_code=status.HTTP_401_UNAUTHORIZED, 

1194 detail="Authentication required", 

1195 headers={"WWW-Authenticate": "Bearer"}, 

1196 ) 

1197 except HTTPException: 

1198 # If both methods fail, check if we should redirect browser users to login page 

1199 if getattr(settings, "email_auth_enabled", False): 

1200 accept_header = request.headers.get("accept", "") 

1201 is_htmx = request.headers.get("hx-request") == "true" 

1202 if "text/html" in accept_header or is_htmx: 

1203 root_path = request.scope.get("root_path", "") 

1204 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{root_path}/admin/login"}) 

1205 else: 

1206 raise HTTPException( 

1207 status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required. Please login with email/password or use basic auth.", headers={"WWW-Authenticate": "Bearer"} 

1208 ) 

1209 else: 

1210 # Re-raise the basic auth error 

1211 raise