Coverage for mcpgateway / utils / verify_credentials.py: 99%

279 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/utils/verify_credentials.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Authentication verification utilities for ContextForge. 

8This module provides JWT and Basic authentication verification functions 

9for securing API endpoints. It supports authentication via Authorization 

10headers and cookies. 

11Examples: 

12 >>> from mcpgateway.utils import verify_credentials as vc 

13 >>> from mcpgateway.utils import jwt_config_helper as jch 

14 >>> from pydantic import SecretStr 

15 >>> class DummySettings: 

16 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

17 ... jwt_algorithm = 'HS256' 

18 ... jwt_audience = 'mcpgateway-api' 

19 ... jwt_issuer = 'mcpgateway' 

20 ... jwt_issuer_verification = True 

21 ... jwt_audience_verification = True 

22 ... jwt_public_key_path = '' 

23 ... jwt_private_key_path = '' 

24 ... basic_auth_user = 'user' 

25 ... basic_auth_password = SecretStr('pass') 

26 ... auth_required = True 

27 ... require_token_expiration = False 

28 ... require_jti = False 

29 ... validate_token_environment = False 

30 ... docs_allow_basic_auth = False 

31 >>> vc.settings = DummySettings() 

32 >>> jch.settings = DummySettings() 

33 >>> jch.clear_jwt_caches() 

34 >>> import jwt 

35 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256') 

36 >>> import asyncio 

37 >>> asyncio.run(vc.verify_jwt_token(token))['sub'] == 'alice' 

38 True 

39 >>> payload = asyncio.run(vc.verify_credentials(token)) 

40 >>> payload['token'] == token 

41 True 

42 >>> from fastapi.security import HTTPBasicCredentials 

43 >>> creds = HTTPBasicCredentials(username='user', password='pass') 

44 >>> asyncio.run(vc.verify_basic_credentials(creds)) == 'user' 

45 True 

46 >>> creds_bad = HTTPBasicCredentials(username='user', password='wrong') 

47 >>> try: 

48 ... asyncio.run(vc.verify_basic_credentials(creds_bad)) 

49 ... except Exception as e: 

50 ... print('error') 

51 error 

52""" 

53 

54# Standard 

55import asyncio 

56from base64 import b64decode 

57import binascii 

58from typing import Any, Optional 

59 

60# Third-Party 

61from fastapi import Cookie, Depends, HTTPException, Request, status 

62from fastapi.security import HTTPAuthorizationCredentials, HTTPBasic, HTTPBasicCredentials, HTTPBearer 

63from fastapi.security.utils import get_authorization_scheme_param 

64import jwt 

65 

66# First-Party 

67from mcpgateway.config import settings 

68from mcpgateway.services.logging_service import LoggingService 

69from mcpgateway.utils.jwt_config_helper import validate_jwt_algo_and_keys 

70from mcpgateway.utils.time_restrictions import validate_time_restrictions 

71 

72basic_security = HTTPBasic(auto_error=False) 

73security = HTTPBearer(auto_error=False) 

74 

75# Initialize logging service first 

76logging_service = LoggingService() 

77logger = logging_service.get_logger(__name__) 

78 

79 

80def is_proxy_auth_trust_active(settings_obj: Any | None = None) -> bool: 

81 """Return whether proxy-header trust mode is explicitly active. 

82 

83 Args: 

84 settings_obj: Optional settings object override (defaults to global settings). 

85 

86 Returns: 

87 ``True`` when proxy-header trust is explicitly enabled and acknowledged; 

88 otherwise ``False``. 

89 """ 

90 current_settings = settings_obj or settings 

91 

92 if current_settings.mcp_client_auth_enabled or not current_settings.trust_proxy_auth: 

93 return False 

94 

95 if getattr(current_settings, "trust_proxy_auth_dangerously", False) is True: 

96 return True 

97 

98 if not getattr(is_proxy_auth_trust_active, "_warned", False): 

99 logger.warning("Ignoring trusted proxy auth because TRUST_PROXY_AUTH_DANGEROUSLY is false while MCP client auth is disabled.") 

100 is_proxy_auth_trust_active._warned = True # type: ignore[attr-defined] 

101 return False 

102 

103 

104def extract_websocket_bearer_token(query_params: Any, headers: Any, *, query_param_warning: Optional[str] = None) -> Optional[str]: 

105 """Extract bearer token from WebSocket Authorization headers. 

106 

107 Args: 

108 query_params: WebSocket query parameters mapping-like object. 

109 headers: WebSocket headers mapping-like object. 

110 query_param_warning: Optional warning message when legacy query token is detected. 

111 

112 Returns: 

113 Bearer token value when present, otherwise None. 

114 """ 

115 # Do not accept tokens from query parameters. This avoids leaking bearer 

116 # secrets through URL logs/history/proxy telemetry. 

117 query = query_params or {} 

118 legacy_token = query.get("token") if hasattr(query, "get") else None 

119 if legacy_token and query_param_warning: 

120 logger.warning(f"{query_param_warning}; token ignored") 

121 

122 header_values = headers or {} 

123 auth_header = header_values.get("authorization") if hasattr(header_values, "get") else None 

124 if not auth_header and hasattr(header_values, "get"): 

125 auth_header = header_values.get("Authorization") 

126 if auth_header: 

127 scheme, _, credentials = auth_header.partition(" ") 

128 if scheme.lower() == "bearer" and credentials: 

129 return credentials.strip() 

130 return None 

131 

132 

133async def verify_jwt_token(token: str) -> dict: 

134 """Verify and decode a JWT token in a single pass. 

135 

136 Decodes and validates a JWT token using the configured secret key 

137 and algorithm from settings. Uses PyJWT's require option for claim 

138 enforcement instead of a separate unverified decode. 

139 

140 Note: 

141 With single-pass decoding, signature validation occurs before 

142 claim validation. An invalid signature will result in "Invalid token" 

143 error even if the token is also missing required claims. 

144 

145 Args: 

146 token: The JWT token string to verify. 

147 

148 Returns: 

149 dict: The decoded token payload containing claims (e.g., user info). 

150 

151 Raises: 

152 HTTPException: If token is invalid, expired, or missing required claims. 

153 """ 

154 try: 

155 validate_jwt_algo_and_keys() 

156 

157 # Import the verification key helper 

158 # First-Party 

159 from mcpgateway.utils.jwt_config_helper import get_jwt_public_key_or_secret 

160 

161 options = { 

162 "verify_aud": settings.jwt_audience_verification, 

163 "verify_iss": settings.jwt_issuer_verification, 

164 } 

165 

166 if settings.require_token_expiration: 

167 options["require"] = ["exp"] 

168 

169 decode_kwargs = { 

170 "key": get_jwt_public_key_or_secret(), 

171 "algorithms": [settings.jwt_algorithm], 

172 "options": options, 

173 } 

174 

175 if settings.jwt_audience_verification: 

176 decode_kwargs["audience"] = settings.jwt_audience 

177 

178 if settings.jwt_issuer_verification: 

179 decode_kwargs["issuer"] = settings.jwt_issuer 

180 

181 payload = jwt.decode(token, **decode_kwargs) 

182 

183 # Log warning for tokens without expiration (when not required) 

184 if not settings.require_token_expiration and "exp" not in payload: 

185 logger.warning(f"JWT token without expiration accepted. Consider enabling REQUIRE_TOKEN_EXPIRATION for better security. Token sub: {payload.get('sub', 'unknown')}") 

186 

187 # Require JTI if configured 

188 if settings.require_jti and "jti" not in payload: 

189 raise HTTPException( 

190 status_code=status.HTTP_401_UNAUTHORIZED, 

191 detail="Token is missing required JTI claim. Set REQUIRE_JTI=false to allow.", 

192 headers={"WWW-Authenticate": "Bearer"}, 

193 ) 

194 

195 # Log warning for tokens without JTI (when not required) 

196 if not settings.require_jti and "jti" not in payload: 

197 logger.warning(f"JWT token without JTI accepted. Token cannot be revoked. Consider enabling REQUIRE_JTI for better security. Token sub: {payload.get('sub', 'unknown')}") 

198 

199 # Validate environment claim if configured (reject mismatched, allow missing for backward compatibility) 

200 if settings.validate_token_environment: 

201 token_env = payload.get("env") 

202 if token_env is not None and token_env != settings.environment: 

203 raise HTTPException( 

204 status_code=status.HTTP_401_UNAUTHORIZED, 

205 detail=f"Token environment mismatch: token is for '{token_env}', server is '{settings.environment}'", 

206 headers={"WWW-Authenticate": "Bearer"}, 

207 ) 

208 

209 # Validate time restrictions if present in token scopes 

210 validate_time_restrictions(payload) 

211 

212 return payload 

213 

214 except jwt.MissingRequiredClaimError: 

215 raise HTTPException( 

216 status_code=status.HTTP_401_UNAUTHORIZED, 

217 detail="Token is missing required expiration claim. Set REQUIRE_TOKEN_EXPIRATION=false to allow.", 

218 headers={"WWW-Authenticate": "Bearer"}, 

219 ) 

220 except jwt.ExpiredSignatureError: 

221 raise HTTPException( 

222 status_code=status.HTTP_401_UNAUTHORIZED, 

223 detail="Token has expired", 

224 headers={"WWW-Authenticate": "Bearer"}, 

225 ) 

226 except jwt.PyJWTError: 

227 raise HTTPException( 

228 status_code=status.HTTP_401_UNAUTHORIZED, 

229 detail="Invalid token", 

230 headers={"WWW-Authenticate": "Bearer"}, 

231 ) 

232 

233 

234async def verify_jwt_token_cached(token: str, request: Optional[Request] = None) -> dict: 

235 """Verify JWT token with request-level caching. 

236 

237 If a request object is provided and the token has already been verified 

238 for this request, returns the cached payload. Otherwise, performs 

239 verification and caches the result in request.state. 

240 

241 Args: 

242 token: JWT token string to verify 

243 request: Optional FastAPI/Starlette request for request-level caching. 

244 Must have a 'state' attribute to enable caching. 

245 

246 Returns: 

247 dict: Decoded and verified JWT payload 

248 

249 Raises: 

250 HTTPException: If token is invalid, expired, or missing required claims. 

251 """ 

252 # Check request.state cache first (safely handle non-Request objects) 

253 if request is not None and hasattr(request, "state"): 

254 cached = getattr(request.state, "_jwt_verified_payload", None) 

255 # Verify cache is a valid tuple of (token, payload) before unpacking 

256 if cached is not None and isinstance(cached, tuple) and len(cached) == 2: 

257 cached_token, cached_payload = cached 

258 if cached_token == token: 

259 return cached_payload 

260 

261 # Verify token (single decode) 

262 payload = await verify_jwt_token(token) 

263 

264 # Cache in request.state for reuse across middleware 

265 if request is not None and hasattr(request, "state"): 

266 request.state._jwt_verified_payload = (token, payload) 

267 

268 return payload 

269 

270 

271async def verify_credentials(token: str) -> dict: 

272 """Verify credentials using a JWT token. 

273 

274 A wrapper around verify_jwt_token that adds the original token 

275 to the decoded payload for reference. 

276 

277 This function uses verify_jwt_token internally which may raise exceptions. 

278 

279 Args: 

280 token: The JWT token string to verify. 

281 

282 Returns: 

283 dict: The validated token payload with the original token added 

284 under the 'token' key. 

285 

286 Examples: 

287 >>> from mcpgateway.utils import verify_credentials as vc 

288 >>> from mcpgateway.utils import jwt_config_helper as jch 

289 >>> from pydantic import SecretStr 

290 >>> class DummySettings: 

291 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

292 ... jwt_algorithm = 'HS256' 

293 ... jwt_audience = 'mcpgateway-api' 

294 ... jwt_issuer = 'mcpgateway' 

295 ... jwt_audience_verification = True 

296 ... jwt_issuer_verification = True 

297 ... jwt_public_key_path = '' 

298 ... jwt_private_key_path = '' 

299 ... basic_auth_user = 'user' 

300 ... basic_auth_password = SecretStr('pass') 

301 ... auth_required = True 

302 ... require_token_expiration = False 

303 ... require_jti = False 

304 ... validate_token_environment = False 

305 ... docs_allow_basic_auth = False 

306 >>> vc.settings = DummySettings() 

307 >>> jch.settings = DummySettings() 

308 >>> jch.clear_jwt_caches() 

309 >>> import jwt 

310 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256') 

311 >>> import asyncio 

312 >>> payload = asyncio.run(vc.verify_credentials(token)) 

313 >>> payload['token'] == token 

314 True 

315 """ 

316 payload = await verify_jwt_token(token) 

317 payload["token"] = token 

318 return payload 

319 

320 

321async def verify_credentials_cached(token: str, request: Optional[Request] = None) -> dict: 

322 """Verify credentials using a JWT token with request-level caching. 

323 

324 A wrapper around verify_jwt_token_cached that adds the original token 

325 to the decoded payload for reference. 

326 

327 Args: 

328 token: The JWT token string to verify. 

329 request: Optional FastAPI/Starlette request for request-level caching. 

330 

331 Returns: 

332 dict: The validated token payload with the original token added 

333 under the 'token' key. Returns a copy to avoid mutating cached payload. 

334 """ 

335 payload = await verify_jwt_token_cached(token, request) 

336 # Return a copy with token added to avoid mutating the cached payload 

337 return {**payload, "token": token} 

338 

339 

340def _raise_auth_401(detail: str) -> None: 

341 """Raise a standardized bearer-auth 401 error. 

342 

343 Args: 

344 detail: Error detail message for the response body. 

345 

346 Raises: 

347 HTTPException: Always raises 401 Unauthorized with Bearer auth header. 

348 """ 

349 raise HTTPException( 

350 status_code=status.HTTP_401_UNAUTHORIZED, 

351 detail=detail, 

352 headers={"WWW-Authenticate": "Bearer"}, 

353 ) 

354 

355 

356async def _enforce_revocation_and_active_user(payload: dict) -> None: 

357 """Enforce token revocation and active-user checks for JWT-authenticated flows. 

358 

359 Args: 

360 payload: Verified JWT payload used to derive revocation and user status checks. 

361 

362 Raises: 

363 HTTPException: 401 when the token is revoked, the account is disabled, 

364 or strict user-in-db mode rejects a missing user. 

365 """ 

366 # First-Party 

367 from mcpgateway.auth import _check_token_revoked_sync, _get_user_by_email_sync 

368 

369 jti = payload.get("jti") 

370 if jti: 

371 try: 

372 if await asyncio.to_thread(_check_token_revoked_sync, jti): 

373 _raise_auth_401("Token has been revoked") 

374 except HTTPException: 

375 raise 

376 except Exception as exc: 

377 logger.warning("Token revocation check failed for JTI %s: %s", jti, exc) 

378 

379 username = payload.get("sub") or payload.get("email") or payload.get("username") 

380 if not username: 

381 return 

382 

383 try: 

384 user = await asyncio.to_thread(_get_user_by_email_sync, username) 

385 except Exception as exc: 

386 logger.warning("User status check failed for %s: %s", username, exc) 

387 return 

388 

389 if user is None: 

390 if settings.require_user_in_db and username != getattr(settings, "platform_admin_email", "admin@example.com"): 

391 _raise_auth_401("User not found in database") 

392 return 

393 

394 if not user.is_active: 

395 _raise_auth_401("Account disabled") 

396 

397 

398async def require_auth(request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), jwt_token: Optional[str] = Cookie(default=None)) -> str | dict: 

399 """Require authentication via JWT token or proxy headers. 

400 

401 FastAPI dependency that checks for authentication via: 

402 1. Proxy headers (if mcp_client_auth_enabled=false and trust_proxy_auth=true) 

403 2. JWT token in Authorization header (Bearer scheme) 

404 3. JWT token in cookies 

405 

406 If authentication is required but no token is provided, raises an HTTP 401 error. 

407 

408 Args: 

409 request: The FastAPI request object for accessing headers. 

410 credentials: HTTP Authorization credentials from the request header. 

411 jwt_token: JWT token from cookies. 

412 

413 Returns: 

414 str | dict: The verified credentials payload if authenticated, 

415 proxy user if proxy auth enabled, or "anonymous" if authentication is not required. 

416 

417 Raises: 

418 HTTPException: 401 status if authentication is required but no valid 

419 token is provided. 

420 

421 Examples: 

422 >>> from mcpgateway.utils import verify_credentials as vc 

423 >>> from mcpgateway.utils import jwt_config_helper as jch 

424 >>> from pydantic import SecretStr 

425 >>> class DummySettings: 

426 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

427 ... jwt_algorithm = 'HS256' 

428 ... jwt_audience = 'mcpgateway-api' 

429 ... jwt_issuer = 'mcpgateway' 

430 ... jwt_audience_verification = True 

431 ... jwt_issuer_verification = True 

432 ... jwt_public_key_path = '' 

433 ... jwt_private_key_path = '' 

434 ... basic_auth_user = 'user' 

435 ... basic_auth_password = SecretStr('pass') 

436 ... auth_required = True 

437 ... mcp_client_auth_enabled = True 

438 ... trust_proxy_auth = False 

439 ... proxy_user_header = 'X-Authenticated-User' 

440 ... require_token_expiration = False 

441 ... require_jti = False 

442 ... validate_token_environment = False 

443 ... docs_allow_basic_auth = False 

444 >>> vc.settings = DummySettings() 

445 >>> jch.settings = DummySettings() 

446 >>> jch.clear_jwt_caches() 

447 >>> import jwt 

448 >>> from fastapi.security import HTTPAuthorizationCredentials 

449 >>> from fastapi import Request 

450 >>> import asyncio 

451 

452 Test with valid credentials in header: 

453 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256') 

454 >>> creds = HTTPAuthorizationCredentials(scheme='Bearer', credentials=token) 

455 >>> req = Request(scope={'type': 'http', 'headers': []}) 

456 >>> result = asyncio.run(vc.require_auth(request=req, credentials=creds, jwt_token=None)) 

457 >>> result['sub'] == 'alice' 

458 True 

459 

460 Test with valid token in cookie: 

461 >>> result = asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=token)) 

462 >>> result['sub'] == 'alice' 

463 True 

464 

465 Test with auth required but no token: 

466 >>> try: 

467 ... asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=None)) 

468 ... except vc.HTTPException as e: 

469 ... print(e.status_code, e.detail) 

470 401 Not authenticated 

471 

472 Test with auth not required: 

473 >>> vc.settings.auth_required = False 

474 >>> result = asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=None)) 

475 >>> result 

476 'anonymous' 

477 >>> vc.settings.auth_required = True 

478 """ 

479 # If MCP client auth is disabled and proxy auth is trusted, use proxy headers 

480 if not settings.mcp_client_auth_enabled: 

481 if is_proxy_auth_trust_active(): 

482 # Extract user from proxy header 

483 proxy_user = request.headers.get(settings.proxy_user_header) 

484 if proxy_user: 

485 return {"sub": proxy_user, "source": "proxy", "token": None} # nosec B105 - None is not a password 

486 # No proxy header - check auth_required (matches RBAC/WebSocket behavior) 

487 if settings.auth_required: 

488 raise HTTPException( 

489 status_code=status.HTTP_401_UNAUTHORIZED, 

490 detail="Proxy authentication header required", 

491 headers={"WWW-Authenticate": "Bearer"}, 

492 ) 

493 return "anonymous" 

494 else: 

495 # Warning: MCP auth disabled without proxy trust - security risk! 

496 # This case is already warned about in config validation 

497 if settings.auth_required: 

498 raise HTTPException( 

499 status_code=status.HTTP_401_UNAUTHORIZED, 

500 detail="Authentication required but no auth method configured", 

501 headers={"WWW-Authenticate": "Bearer"}, 

502 ) 

503 return "anonymous" 

504 

505 # Standard JWT authentication flow - prioritize manual cookie reading 

506 token = None 

507 

508 # 1. First try manual cookie reading (most reliable) 

509 if hasattr(request, "cookies") and request.cookies: 

510 manual_token = request.cookies.get("jwt_token") 

511 if manual_token: 

512 token = manual_token 

513 

514 # 2. Then try Authorization header 

515 if not token and credentials and credentials.credentials: 

516 token = credentials.credentials 

517 

518 # 3. Finally try FastAPI Cookie dependency (fallback) 

519 if not token and jwt_token: 

520 token = jwt_token 

521 

522 if settings.auth_required and not token: 

523 _raise_auth_401("Not authenticated") 

524 

525 if not token: 

526 return "anonymous" 

527 

528 payload = await verify_credentials_cached(token, request) 

529 await _enforce_revocation_and_active_user(payload) 

530 return payload 

531 

532 

533async def verify_basic_credentials(credentials: HTTPBasicCredentials) -> str: 

534 """Verify HTTP Basic authentication credentials. 

535 

536 Validates the provided username and password against the configured 

537 basic auth credentials in settings. 

538 

539 Args: 

540 credentials: HTTP Basic credentials containing username and password. 

541 

542 Returns: 

543 str: The authenticated username if credentials are valid. 

544 

545 Raises: 

546 HTTPException: 401 status if credentials are invalid. 

547 

548 Examples: 

549 >>> from mcpgateway.utils import verify_credentials as vc 

550 >>> from pydantic import SecretStr 

551 >>> class DummySettings: 

552 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

553 ... jwt_algorithm = 'HS256' 

554 ... jwt_audience = 'mcpgateway-api' 

555 ... jwt_issuer = 'mcpgateway' 

556 ... jwt_audience_verification = True 

557 ... jwt_issuer_verification = True 

558 ... basic_auth_user = 'user' 

559 ... basic_auth_password = SecretStr('pass') 

560 ... auth_required = True 

561 ... docs_allow_basic_auth = False 

562 >>> vc.settings = DummySettings() 

563 >>> from fastapi.security import HTTPBasicCredentials 

564 >>> creds = HTTPBasicCredentials(username='user', password='pass') 

565 >>> import asyncio 

566 >>> asyncio.run(vc.verify_basic_credentials(creds)) == 'user' 

567 True 

568 >>> creds_bad = HTTPBasicCredentials(username='user', password='wrong') 

569 >>> try: 

570 ... asyncio.run(vc.verify_basic_credentials(creds_bad)) 

571 ... except Exception as e: 

572 ... print('error') 

573 error 

574 """ 

575 is_valid_user = credentials.username == settings.basic_auth_user 

576 is_valid_pass = credentials.password == settings.basic_auth_password.get_secret_value() 

577 

578 if not (is_valid_user and is_valid_pass): 

579 raise HTTPException( 

580 status_code=status.HTTP_401_UNAUTHORIZED, 

581 detail="Invalid credentials", 

582 headers={"WWW-Authenticate": "Basic"}, 

583 ) 

584 return credentials.username 

585 

586 

587async def require_basic_auth(credentials: HTTPBasicCredentials = Depends(basic_security)) -> str: 

588 """Require valid HTTP Basic authentication. 

589 

590 FastAPI dependency that enforces Basic authentication when enabled. 

591 Returns the authenticated username or "anonymous" if auth is not required. 

592 

593 Args: 

594 credentials: HTTP Basic credentials provided by the client. 

595 

596 Returns: 

597 str: The authenticated username or "anonymous" if auth is not required. 

598 

599 Raises: 

600 HTTPException: 401 status if authentication is required but no valid 

601 credentials are provided. 

602 

603 Examples: 

604 >>> from mcpgateway.utils import verify_credentials as vc 

605 >>> from pydantic import SecretStr 

606 >>> class DummySettings: 

607 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

608 ... jwt_algorithm = 'HS256' 

609 ... jwt_audience = 'mcpgateway-api' 

610 ... jwt_issuer = 'mcpgateway' 

611 ... jwt_audience_verification = True 

612 ... jwt_issuer_verification = True 

613 ... basic_auth_user = 'user' 

614 ... basic_auth_password = SecretStr('pass') 

615 ... auth_required = True 

616 ... docs_allow_basic_auth = False 

617 >>> vc.settings = DummySettings() 

618 >>> from fastapi.security import HTTPBasicCredentials 

619 >>> import asyncio 

620 

621 Test with valid credentials: 

622 >>> creds = HTTPBasicCredentials(username='user', password='pass') 

623 >>> asyncio.run(vc.require_basic_auth(creds)) 

624 'user' 

625 

626 Test with auth required but no credentials: 

627 >>> try: 

628 ... asyncio.run(vc.require_basic_auth(None)) 

629 ... except vc.HTTPException as e: 

630 ... print(e.status_code, e.detail) 

631 401 Not authenticated 

632 

633 Test with auth not required: 

634 >>> vc.settings.auth_required = False 

635 >>> asyncio.run(vc.require_basic_auth(None)) 

636 'anonymous' 

637 >>> vc.settings.auth_required = True 

638 """ 

639 if settings.auth_required: 

640 if not credentials: 

641 raise HTTPException( 

642 status_code=status.HTTP_401_UNAUTHORIZED, 

643 detail="Not authenticated", 

644 headers={"WWW-Authenticate": "Basic"}, 

645 ) 

646 return await verify_basic_credentials(credentials) 

647 return "anonymous" 

648 

649 

650async def require_docs_basic_auth(auth_header: str) -> str: 

651 """Dedicated handler for HTTP Basic Auth for documentation endpoints only. 

652 

653 This function is ONLY intended for /docs, /redoc, or similar endpoints, and is enabled 

654 via the settings.docs_allow_basic_auth flag. It should NOT be used for general API authentication. 

655 

656 Args: 

657 auth_header: Raw Authorization header value (e.g. "Basic username:password"). 

658 

659 Returns: 

660 str: The authenticated username if credentials are valid. 

661 

662 Raises: 

663 HTTPException: If credentials are invalid or malformed. 

664 ValueError: If the basic auth format is invalid (missing colon). 

665 

666 Examples: 

667 >>> from mcpgateway.utils import verify_credentials as vc 

668 >>> from pydantic import SecretStr 

669 >>> class DummySettings: 

670 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

671 ... jwt_algorithm = 'HS256' 

672 ... jwt_audience = 'mcpgateway-api' 

673 ... jwt_issuer = 'mcpgateway' 

674 ... jwt_audience_verification = True 

675 ... jwt_issuer_verification = True 

676 ... basic_auth_user = 'user' 

677 ... basic_auth_password = SecretStr('pass') 

678 ... auth_required = True 

679 ... require_token_expiration = False 

680 ... require_jti = False 

681 ... validate_token_environment = False 

682 ... docs_allow_basic_auth = True 

683 >>> vc.settings = DummySettings() 

684 >>> import base64, asyncio 

685 

686 Test with properly encoded credentials: 

687 >>> userpass = base64.b64encode(b'user:pass').decode() 

688 >>> auth_header = f'Basic {userpass}' 

689 >>> asyncio.run(vc.require_docs_basic_auth(auth_header)) 

690 'user' 

691 

692 Test with different valid credentials: 

693 >>> valid_creds = base64.b64encode(b'user:pass').decode() 

694 >>> valid_header = f'Basic {valid_creds}' 

695 >>> result = asyncio.run(vc.require_docs_basic_auth(valid_header)) 

696 >>> result == 'user' 

697 True 

698 

699 Test with invalid password: 

700 >>> badpass = base64.b64encode(b'user:wrong').decode() 

701 >>> bad_header = f'Basic {badpass}' 

702 >>> try: 

703 ... asyncio.run(vc.require_docs_basic_auth(bad_header)) 

704 ... except vc.HTTPException as e: 

705 ... e.status_code == 401 

706 True 

707 

708 Test with malformed base64 (no colon): 

709 >>> malformed = base64.b64encode(b'userpass').decode() 

710 >>> malformed_header = f'Basic {malformed}' 

711 >>> try: 

712 ... asyncio.run(vc.require_docs_basic_auth(malformed_header)) 

713 ... except vc.HTTPException as e: 

714 ... e.status_code == 401 

715 True 

716 

717 Test with invalid base64 encoding: 

718 >>> invalid_header = 'Basic invalid_base64!' 

719 >>> try: 

720 ... asyncio.run(vc.require_docs_basic_auth(invalid_header)) 

721 ... except vc.HTTPException as e: 

722 ... 'Invalid basic auth credentials' in e.detail 

723 True 

724 

725 Test when docs_allow_basic_auth is disabled: 

726 >>> vc.settings.docs_allow_basic_auth = False 

727 >>> try: 

728 ... asyncio.run(vc.require_docs_basic_auth(auth_header)) 

729 ... except vc.HTTPException as e: 

730 ... 'not allowed' in e.detail 

731 True 

732 >>> vc.settings.docs_allow_basic_auth = True 

733 

734 Test with non-Basic auth scheme: 

735 >>> bearer_header = 'Bearer eyJhbGciOiJIUzI1NiJ9...' 

736 >>> try: 

737 ... asyncio.run(vc.require_docs_basic_auth(bearer_header)) 

738 ... except vc.HTTPException as e: 

739 ... e.status_code == 401 

740 True 

741 

742 Test with empty credentials part: 

743 >>> empty_header = 'Basic ' 

744 >>> try: 

745 ... asyncio.run(vc.require_docs_basic_auth(empty_header)) 

746 ... except vc.HTTPException as e: 

747 ... 'not allowed' in e.detail 

748 True 

749 

750 Test with Unicode decode error: 

751 >>> from base64 import b64encode 

752 >>> bad_bytes = bytes([0xff, 0xfe]) # Invalid UTF-8 bytes 

753 >>> bad_unicode = b64encode(bad_bytes).decode() 

754 >>> unicode_header = f'Basic {bad_unicode}' 

755 >>> try: 

756 ... asyncio.run(vc.require_docs_basic_auth(unicode_header)) 

757 ... except vc.HTTPException as e: 

758 ... 'Invalid basic auth credentials' in e.detail 

759 True 

760 """ 

761 scheme, param = get_authorization_scheme_param(auth_header) 

762 if scheme.lower() == "basic" and param and settings.docs_allow_basic_auth: 

763 try: 

764 data = b64decode(param).decode("ascii") 

765 username, separator, password = data.partition(":") 

766 if not separator: 

767 raise ValueError("Invalid basic auth format") 

768 credentials = HTTPBasicCredentials(username=username, password=password) 

769 return await require_basic_auth(credentials=credentials) 

770 except (ValueError, UnicodeDecodeError, binascii.Error): 

771 raise HTTPException( 

772 status_code=status.HTTP_401_UNAUTHORIZED, 

773 detail="Invalid basic auth credentials", 

774 headers={"WWW-Authenticate": "Basic"}, 

775 ) 

776 raise HTTPException( 

777 status_code=status.HTTP_401_UNAUTHORIZED, 

778 detail="Basic authentication not allowed or malformed", 

779 headers={"WWW-Authenticate": "Basic"}, 

780 ) 

781 

782 

783async def require_docs_auth_override( 

784 auth_header: str | None = None, 

785 jwt_token: str | None = None, 

786) -> str | dict: 

787 """Require authentication for docs endpoints, bypassing global auth settings. 

788 

789 This function specifically validates JWT tokens for documentation endpoints 

790 (/docs, /redoc, /openapi.json) regardless of global authentication settings 

791 like mcp_client_auth_enabled or auth_required. 

792 

793 Args: 

794 auth_header: Raw Authorization header value (e.g. "Bearer eyJhbGciOi..."). 

795 jwt_token: JWT token from cookies. 

796 

797 Returns: 

798 str | dict: The decoded JWT payload. 

799 

800 Raises: 

801 HTTPException: If authentication fails or credentials are invalid. 

802 

803 Examples: 

804 >>> from mcpgateway.utils import verify_credentials as vc 

805 >>> from mcpgateway.utils import jwt_config_helper as jch 

806 >>> class DummySettings: 

807 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

808 ... jwt_algorithm = 'HS256' 

809 ... jwt_audience = 'mcpgateway-api' 

810 ... jwt_issuer = 'mcpgateway' 

811 ... jwt_audience_verification = True 

812 ... jwt_issuer_verification = True 

813 ... jwt_public_key_path = '' 

814 ... jwt_private_key_path = '' 

815 ... docs_allow_basic_auth = False 

816 ... require_token_expiration = False 

817 ... require_jti = False 

818 ... validate_token_environment = False 

819 >>> vc.settings = DummySettings() 

820 >>> jch.settings = DummySettings() 

821 >>> jch.clear_jwt_caches() 

822 >>> import jwt 

823 >>> import asyncio 

824 

825 Test with valid JWT: 

826 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256') 

827 >>> auth_header = f'Bearer {token}' 

828 >>> result = asyncio.run(vc.require_docs_auth_override(auth_header=auth_header)) 

829 >>> result['sub'] == 'alice' 

830 True 

831 

832 Test with no token: 

833 >>> try: 

834 ... asyncio.run(vc.require_docs_auth_override()) 

835 ... except vc.HTTPException as e: 

836 ... print(e.status_code, e.detail) 

837 401 Not authenticated 

838 """ 

839 # Extract token from header or cookie 

840 token = jwt_token 

841 if auth_header: 

842 scheme, param = get_authorization_scheme_param(auth_header) 

843 if scheme.lower() == "bearer" and param: 

844 token = param 

845 elif scheme.lower() == "basic" and param and settings.docs_allow_basic_auth: 

846 # Only allow Basic Auth for docs endpoints when explicitly enabled 

847 return await require_docs_basic_auth(auth_header) 

848 

849 # Always require a token for docs endpoints 

850 if not token: 

851 raise HTTPException( 

852 status_code=status.HTTP_401_UNAUTHORIZED, 

853 detail="Not authenticated", 

854 headers={"WWW-Authenticate": "Bearer"}, 

855 ) 

856 

857 # Validate JWT and enforce standard token/account status checks. 

858 payload = await verify_credentials(token) 

859 if isinstance(payload, dict): 

860 await _enforce_revocation_and_active_user(payload) 

861 return payload 

862 

863 

864async def require_auth_override( 

865 auth_header: str | None = None, 

866 jwt_token: str | None = None, 

867 request: Request | None = None, 

868) -> str | dict: 

869 """Call require_auth manually from middleware without FastAPI dependency injection. 

870 

871 This wrapper allows manual authentication verification in contexts where 

872 FastAPI's dependency injection is not available (e.g., middleware). 

873 It parses the Authorization header and creates the appropriate credentials 

874 object before calling require_auth. 

875 

876 Args: 

877 auth_header: Raw Authorization header value (e.g. "Bearer eyJhbGciOi..."). 

878 jwt_token: JWT taken from a cookie. If both header and cookie are 

879 supplied, the header takes precedence. 

880 request: Optional Request object for accessing headers (used for proxy auth). 

881 

882 Returns: 

883 str | dict: The decoded JWT payload or the string "anonymous", 

884 same as require_auth. 

885 

886 Raises: 

887 HTTPException: If authentication fails or credentials are invalid. 

888 ValueError: If basic auth credentials are malformed. 

889 

890 Note: 

891 This wrapper may propagate HTTPException raised by require_auth, 

892 but it does not raise anything on its own. 

893 

894 Examples: 

895 >>> from mcpgateway.utils import verify_credentials as vc 

896 >>> from mcpgateway.utils import jwt_config_helper as jch 

897 >>> from pydantic import SecretStr 

898 >>> class DummySettings: 

899 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

900 ... jwt_algorithm = 'HS256' 

901 ... jwt_audience = 'mcpgateway-api' 

902 ... jwt_issuer = 'mcpgateway' 

903 ... jwt_audience_verification = True 

904 ... jwt_issuer_verification = True 

905 ... jwt_public_key_path = '' 

906 ... jwt_private_key_path = '' 

907 ... basic_auth_user = 'user' 

908 ... basic_auth_password = SecretStr('pass') 

909 ... auth_required = True 

910 ... mcp_client_auth_enabled = True 

911 ... trust_proxy_auth = False 

912 ... proxy_user_header = 'X-Authenticated-User' 

913 ... require_token_expiration = False 

914 ... require_jti = False 

915 ... validate_token_environment = False 

916 ... docs_allow_basic_auth = False 

917 >>> vc.settings = DummySettings() 

918 >>> jch.settings = DummySettings() 

919 >>> jch.clear_jwt_caches() 

920 >>> import jwt 

921 >>> import asyncio 

922 

923 Test with Bearer token in auth header: 

924 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256') 

925 >>> auth_header = f'Bearer {token}' 

926 >>> result = asyncio.run(vc.require_auth_override(auth_header=auth_header)) 

927 >>> result['sub'] == 'alice' 

928 True 

929 

930 Test with invalid auth scheme: 

931 >>> auth_header = 'Basic dXNlcjpwYXNz' # Base64 encoded user:pass 

932 >>> vc.settings.auth_required = False 

933 >>> result = asyncio.run(vc.require_auth_override(auth_header=auth_header)) 

934 >>> result 

935 'anonymous' 

936 

937 Test with only cookie token: 

938 >>> result = asyncio.run(vc.require_auth_override(jwt_token=token)) 

939 >>> result['sub'] == 'alice' 

940 True 

941 

942 Test with no auth: 

943 >>> result = asyncio.run(vc.require_auth_override()) 

944 >>> result 

945 'anonymous' 

946 >>> vc.settings.auth_required = True 

947 """ 

948 # Create a mock request if not provided (for backward compatibility) 

949 if request is None: 

950 request = Request(scope={"type": "http", "headers": []}) 

951 

952 credentials = None 

953 if auth_header: 

954 scheme, param = get_authorization_scheme_param(auth_header) 

955 if scheme.lower() == "bearer" and param: 

956 credentials = HTTPAuthorizationCredentials(scheme=scheme, credentials=param) 

957 elif scheme.lower() == "basic" and param and settings.docs_allow_basic_auth: 

958 # Only allow Basic Auth for docs endpoints when explicitly enabled 

959 return await require_docs_basic_auth(auth_header) 

960 return await require_auth(request=request, credentials=credentials, jwt_token=jwt_token) 

961 

962 

963async def require_auth_header_first( 

964 auth_header: str | None = None, 

965 jwt_token: str | None = None, 

966 request: Request | None = None, 

967) -> str | dict: 

968 """Like require_auth_override but Authorization header takes precedence over cookies. 

969 

970 Token resolution order (matches streamable_http_auth middleware): 

971 1. Authorization Bearer header (highest priority) 

972 2. Cookie ``jwt_token`` from ``request.cookies`` 

973 3. ``jwt_token`` keyword argument 

974 

975 Use this in the stateful-session fallback (``_get_request_context_or_default``) 

976 so that identity is consistent with the ASGI middleware that already 

977 authenticated the primary request. 

978 

979 Args: 

980 auth_header: Raw Authorization header value (e.g. "Bearer eyJhbGciOi..."). 

981 jwt_token: JWT taken from a cookie. Used only when no header token and no 

982 request cookie are present. 

983 request: Optional Request object. A bare empty request is created when 

984 *None* is supplied (backward-compatible default). 

985 

986 Returns: 

987 str | dict: The decoded JWT payload or the string "anonymous". 

988 

989 Raises: 

990 HTTPException: If authentication fails or credentials are invalid. 

991 

992 Examples: 

993 >>> from mcpgateway.utils import verify_credentials as vc 

994 >>> from mcpgateway.utils import jwt_config_helper as jch 

995 >>> from pydantic import SecretStr 

996 >>> class DummySettings: 

997 ... jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

998 ... jwt_algorithm = 'HS256' 

999 ... jwt_audience = 'mcpgateway-api' 

1000 ... jwt_issuer = 'mcpgateway' 

1001 ... jwt_audience_verification = True 

1002 ... jwt_issuer_verification = True 

1003 ... jwt_public_key_path = '' 

1004 ... jwt_private_key_path = '' 

1005 ... basic_auth_user = 'user' 

1006 ... basic_auth_password = SecretStr('pass') 

1007 ... auth_required = True 

1008 ... mcp_client_auth_enabled = True 

1009 ... trust_proxy_auth = False 

1010 ... proxy_user_header = 'X-Authenticated-User' 

1011 ... require_token_expiration = False 

1012 ... require_jti = False 

1013 ... validate_token_environment = False 

1014 ... docs_allow_basic_auth = False 

1015 >>> vc.settings = DummySettings() 

1016 >>> jch.settings = DummySettings() 

1017 >>> jch.clear_jwt_caches() 

1018 >>> import jwt 

1019 >>> import asyncio 

1020 

1021 Test header wins over cookie (the core fix): 

1022 >>> header_tok = jwt.encode({'sub': 'header-user', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256') 

1023 >>> result = asyncio.run(vc.require_auth_header_first(auth_header=f'Bearer {header_tok}')) 

1024 >>> result['sub'] == 'header-user' 

1025 True 

1026 

1027 Test cookie fallback when no header: 

1028 >>> cookie_tok = jwt.encode({'sub': 'cookie-user', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'this-is-a-long-test-secret-key-32chars', algorithm='HS256') 

1029 >>> result = asyncio.run(vc.require_auth_header_first(jwt_token=cookie_tok)) 

1030 >>> result['sub'] == 'cookie-user' 

1031 True 

1032 

1033 Test no auth when not required: 

1034 >>> vc.settings.auth_required = False 

1035 >>> result = asyncio.run(vc.require_auth_header_first()) 

1036 >>> result 

1037 'anonymous' 

1038 >>> vc.settings.auth_required = True 

1039 """ 

1040 if request is None: 

1041 request = Request(scope={"type": "http", "headers": []}) 

1042 

1043 # Proxy auth path — identical to require_auth 

1044 if not settings.mcp_client_auth_enabled: 

1045 if is_proxy_auth_trust_active(): 

1046 proxy_user = request.headers.get(settings.proxy_user_header) 

1047 if proxy_user: 

1048 return {"sub": proxy_user, "source": "proxy", "token": None} # nosec B105 - None is not a password 

1049 if settings.auth_required: 

1050 raise HTTPException( 

1051 status_code=status.HTTP_401_UNAUTHORIZED, 

1052 detail="Proxy authentication header required", 

1053 headers={"WWW-Authenticate": "Bearer"}, 

1054 ) 

1055 return "anonymous" 

1056 if settings.auth_required: 

1057 raise HTTPException( 

1058 status_code=status.HTTP_401_UNAUTHORIZED, 

1059 detail="Authentication required but no auth method configured", 

1060 headers={"WWW-Authenticate": "Bearer"}, 

1061 ) 

1062 return "anonymous" 

1063 

1064 # Parse auth header once 

1065 scheme = param = "" 

1066 if auth_header: 

1067 scheme, param = get_authorization_scheme_param(auth_header) 

1068 if scheme.lower() == "basic" and param and settings.docs_allow_basic_auth: 

1069 return await require_docs_basic_auth(auth_header) 

1070 

1071 # Header-first JWT token resolution 

1072 token: str | None = None 

1073 

1074 # 1. Authorization Bearer header (highest priority — matches middleware) 

1075 if scheme.lower() == "bearer" and param: 

1076 token = param 

1077 

1078 # 2. Cookie from request.cookies 

1079 if not token and hasattr(request, "cookies") and request.cookies: 

1080 token = request.cookies.get("jwt_token") or None 

1081 

1082 # 3. jwt_token keyword argument 

1083 if not token and jwt_token: 

1084 token = jwt_token 

1085 

1086 if settings.auth_required and not token: 

1087 raise HTTPException( 

1088 status_code=status.HTTP_401_UNAUTHORIZED, 

1089 detail="Not authenticated", 

1090 headers={"WWW-Authenticate": "Bearer"}, 

1091 ) 

1092 return await verify_credentials_cached(token, request) if token else "anonymous" 

1093 

1094 

1095async def require_admin_auth( 

1096 request: Request, 

1097 credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), 

1098 jwt_token: Optional[str] = Cookie(None, alias="jwt_token"), 

1099 basic_credentials: Optional[HTTPBasicCredentials] = Depends(basic_security), 

1100) -> str: 

1101 """Require admin authentication supporting both email auth and basic auth. 

1102 

1103 This dependency supports multiple authentication methods: 

1104 1. Email-based JWT authentication (when EMAIL_AUTH_ENABLED=true) 

1105 2. Basic authentication (legacy support) 

1106 3. Proxy headers (if configured) 

1107 

1108 For email auth, the user must have is_admin=true. 

1109 For basic auth, uses the configured BASIC_AUTH_USER/PASSWORD. 

1110 

1111 Args: 

1112 request: FastAPI request object 

1113 credentials: HTTP Authorization credentials 

1114 jwt_token: JWT token from cookies 

1115 basic_credentials: HTTP Basic auth credentials 

1116 

1117 Returns: 

1118 str: Username/email of authenticated admin user 

1119 

1120 Raises: 

1121 HTTPException: 401 if authentication fails, 403 if user is not admin 

1122 RedirectResponse: Redirect to login page for browser requests 

1123 

1124 Examples: 

1125 >>> # This function is typically used as a FastAPI dependency 

1126 >>> callable(require_admin_auth) 

1127 True 

1128 """ 

1129 # First-Party 

1130 from mcpgateway.config import settings 

1131 

1132 # Try email authentication first if enabled 

1133 if getattr(settings, "email_auth_enabled", False): 

1134 try: 

1135 # First-Party 

1136 from mcpgateway.db import get_db 

1137 from mcpgateway.services.email_auth_service import EmailAuthService 

1138 

1139 token = jwt_token 

1140 if not token and credentials: 

1141 token = credentials.credentials 

1142 

1143 if token: 

1144 db_session = next(get_db()) 

1145 try: 

1146 # Decode and verify JWT token (use cached version for performance) 

1147 payload = await verify_jwt_token_cached(token, request) 

1148 await _enforce_revocation_and_active_user(payload) 

1149 username = payload.get("sub") or payload.get("username") # Support both new and legacy formats 

1150 

1151 if username: 

1152 # Get user from database 

1153 auth_service = EmailAuthService(db_session) 

1154 current_user = await auth_service.get_user_by_email(username) 

1155 

1156 if current_user and not getattr(current_user, "is_active", True): 

1157 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Account disabled") 

1158 

1159 if current_user and current_user.is_admin: 

1160 return current_user.email 

1161 elif current_user: 

1162 # User is authenticated but not admin - check if this is a browser request 

1163 accept_header = request.headers.get("accept", "") 

1164 if "text/html" in accept_header: 

1165 # Redirect browser to login page with error 

1166 root_path = request.scope.get("root_path", "") 

1167 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Admin privileges required", headers={"Location": f"{root_path}/admin/login?error=admin_required"}) 

1168 else: 

1169 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required") 

1170 else: 

1171 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") 

1172 except Exception: 

1173 raise 

1174 finally: 

1175 db_session.close() 

1176 except HTTPException as e: 

1177 # Re-raise HTTP exceptions (403, redirects, etc.) 

1178 if e.status_code != status.HTTP_401_UNAUTHORIZED: 

1179 raise 

1180 # For 401, check if we should redirect browser users 

1181 accept_header = request.headers.get("accept", "") 

1182 if "text/html" in accept_header: 

1183 root_path = request.scope.get("root_path", "") 

1184 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{root_path}/admin/login"}) 

1185 # If JWT auth fails, fall back to basic auth for backward compatibility 

1186 except Exception: 

1187 # If there's any other error with email auth, fall back to basic auth 

1188 pass # nosec B110 - Intentional fallback to basic auth on any email auth error 

1189 

1190 # Fall back to basic authentication (gated by API_ALLOW_BASIC_AUTH) 

1191 try: 

1192 if basic_credentials: 

1193 # SECURITY: Basic auth for API endpoints is disabled by default 

1194 if not settings.api_allow_basic_auth: 

1195 raise HTTPException( 

1196 status_code=status.HTTP_401_UNAUTHORIZED, 

1197 detail="Basic authentication is disabled for API endpoints. Use JWT or API tokens instead.", 

1198 headers={"WWW-Authenticate": "Bearer"}, 

1199 ) 

1200 return await verify_basic_credentials(basic_credentials) 

1201 else: 

1202 raise HTTPException( 

1203 status_code=status.HTTP_401_UNAUTHORIZED, 

1204 detail="Authentication required", 

1205 headers={"WWW-Authenticate": "Bearer"}, 

1206 ) 

1207 except HTTPException: 

1208 # If both methods fail, check if we should redirect browser users to login page 

1209 if getattr(settings, "email_auth_enabled", False): 

1210 accept_header = request.headers.get("accept", "") 

1211 is_htmx = request.headers.get("hx-request") == "true" 

1212 if "text/html" in accept_header or is_htmx: 

1213 root_path = request.scope.get("root_path", "") 

1214 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{root_path}/admin/login"}) 

1215 else: 

1216 raise HTTPException( 

1217 status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required. Please login with email/password or use basic auth.", headers={"WWW-Authenticate": "Bearer"} 

1218 ) 

1219 else: 

1220 # Re-raise the basic auth error 

1221 raise