Coverage for mcpgateway / utils / verify_credentials.py: 100%

186 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/utils/verify_credentials.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Authentication verification utilities for MCP Gateway. 

8This module provides JWT and Basic authentication verification functions 

9for securing API endpoints. It supports authentication via Authorization 

10headers and cookies. 

11Examples: 

12 >>> from mcpgateway.utils import verify_credentials as vc 

13 >>> from mcpgateway.utils import jwt_config_helper as jch 

14 >>> from pydantic import SecretStr 

15 >>> class DummySettings: 

16 ... jwt_secret_key = 'secret' 

17 ... jwt_algorithm = 'HS256' 

18 ... jwt_audience = 'mcpgateway-api' 

19 ... jwt_issuer = 'mcpgateway' 

20 ... jwt_issuer_verification = True 

21 ... jwt_audience_verification = True 

22 ... jwt_public_key_path = '' 

23 ... jwt_private_key_path = '' 

24 ... basic_auth_user = 'user' 

25 ... basic_auth_password = SecretStr('pass') 

26 ... auth_required = True 

27 ... require_token_expiration = False 

28 ... require_jti = False 

29 ... validate_token_environment = False 

30 ... docs_allow_basic_auth = False 

31 >>> vc.settings = DummySettings() 

32 >>> jch.settings = DummySettings() 

33 >>> import jwt 

34 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'secret', algorithm='HS256') 

35 >>> import asyncio 

36 >>> asyncio.run(vc.verify_jwt_token(token))['sub'] == 'alice' 

37 True 

38 >>> payload = asyncio.run(vc.verify_credentials(token)) 

39 >>> payload['token'] == token 

40 True 

41 >>> from fastapi.security import HTTPBasicCredentials 

42 >>> creds = HTTPBasicCredentials(username='user', password='pass') 

43 >>> asyncio.run(vc.verify_basic_credentials(creds)) == 'user' 

44 True 

45 >>> creds_bad = HTTPBasicCredentials(username='user', password='wrong') 

46 >>> try: 

47 ... asyncio.run(vc.verify_basic_credentials(creds_bad)) 

48 ... except Exception as e: 

49 ... print('error') 

50 error 

51""" 

52 

53# Standard 

54from base64 import b64decode 

55import binascii 

56from typing import Optional 

57 

58# Third-Party 

59from fastapi import Cookie, Depends, HTTPException, Request, status 

60from fastapi.security import HTTPAuthorizationCredentials, HTTPBasic, HTTPBasicCredentials, HTTPBearer 

61from fastapi.security.utils import get_authorization_scheme_param 

62import jwt 

63 

64# First-Party 

65from mcpgateway.config import settings 

66from mcpgateway.services.logging_service import LoggingService 

67from mcpgateway.utils.jwt_config_helper import validate_jwt_algo_and_keys 

68 

69basic_security = HTTPBasic(auto_error=False) 

70security = HTTPBearer(auto_error=False) 

71 

72# Initialize logging service first 

73logging_service = LoggingService() 

74logger = logging_service.get_logger(__name__) 

75 

76 

77async def verify_jwt_token(token: str) -> dict: 

78 """Verify and decode a JWT token in a single pass. 

79 

80 Decodes and validates a JWT token using the configured secret key 

81 and algorithm from settings. Uses PyJWT's require option for claim 

82 enforcement instead of a separate unverified decode. 

83 

84 Note: 

85 With single-pass decoding, signature validation occurs before 

86 claim validation. An invalid signature will result in "Invalid token" 

87 error even if the token is also missing required claims. 

88 

89 Args: 

90 token: The JWT token string to verify. 

91 

92 Returns: 

93 dict: The decoded token payload containing claims (e.g., user info). 

94 

95 Raises: 

96 HTTPException: If token is invalid, expired, or missing required claims. 

97 """ 

98 try: 

99 validate_jwt_algo_and_keys() 

100 

101 # Import the verification key helper 

102 # First-Party 

103 from mcpgateway.utils.jwt_config_helper import get_jwt_public_key_or_secret 

104 

105 options = { 

106 "verify_aud": settings.jwt_audience_verification, 

107 "verify_iss": settings.jwt_issuer_verification, 

108 } 

109 

110 if settings.require_token_expiration: 

111 options["require"] = ["exp"] 

112 

113 decode_kwargs = { 

114 "key": get_jwt_public_key_or_secret(), 

115 "algorithms": [settings.jwt_algorithm], 

116 "options": options, 

117 } 

118 

119 if settings.jwt_audience_verification: 

120 decode_kwargs["audience"] = settings.jwt_audience 

121 

122 if settings.jwt_issuer_verification: 

123 decode_kwargs["issuer"] = settings.jwt_issuer 

124 

125 payload = jwt.decode(token, **decode_kwargs) 

126 

127 # Log warning for tokens without expiration (when not required) 

128 if not settings.require_token_expiration and "exp" not in payload: 

129 logger.warning(f"JWT token without expiration accepted. Consider enabling REQUIRE_TOKEN_EXPIRATION for better security. Token sub: {payload.get('sub', 'unknown')}") 

130 

131 # Require JTI if configured 

132 if settings.require_jti and "jti" not in payload: 

133 raise HTTPException( 

134 status_code=status.HTTP_401_UNAUTHORIZED, 

135 detail="Token is missing required JTI claim. Set REQUIRE_JTI=false to allow.", 

136 headers={"WWW-Authenticate": "Bearer"}, 

137 ) 

138 

139 # Log warning for tokens without JTI (when not required) 

140 if not settings.require_jti and "jti" not in payload: 

141 logger.warning(f"JWT token without JTI accepted. Token cannot be revoked. Consider enabling REQUIRE_JTI for better security. Token sub: {payload.get('sub', 'unknown')}") 

142 

143 # Validate environment claim if configured (reject mismatched, allow missing for backward compatibility) 

144 if settings.validate_token_environment: 

145 token_env = payload.get("env") 

146 if token_env is not None and token_env != settings.environment: 

147 raise HTTPException( 

148 status_code=status.HTTP_401_UNAUTHORIZED, 

149 detail=f"Token environment mismatch: token is for '{token_env}', server is '{settings.environment}'", 

150 headers={"WWW-Authenticate": "Bearer"}, 

151 ) 

152 

153 return payload 

154 

155 except jwt.MissingRequiredClaimError: 

156 raise HTTPException( 

157 status_code=status.HTTP_401_UNAUTHORIZED, 

158 detail="Token is missing required expiration claim. Set REQUIRE_TOKEN_EXPIRATION=false to allow.", 

159 headers={"WWW-Authenticate": "Bearer"}, 

160 ) 

161 except jwt.ExpiredSignatureError: 

162 raise HTTPException( 

163 status_code=status.HTTP_401_UNAUTHORIZED, 

164 detail="Token has expired", 

165 headers={"WWW-Authenticate": "Bearer"}, 

166 ) 

167 except jwt.PyJWTError: 

168 raise HTTPException( 

169 status_code=status.HTTP_401_UNAUTHORIZED, 

170 detail="Invalid token", 

171 headers={"WWW-Authenticate": "Bearer"}, 

172 ) 

173 

174 

175async def verify_jwt_token_cached(token: str, request: Optional[Request] = None) -> dict: 

176 """Verify JWT token with request-level caching. 

177 

178 If a request object is provided and the token has already been verified 

179 for this request, returns the cached payload. Otherwise, performs 

180 verification and caches the result in request.state. 

181 

182 Args: 

183 token: JWT token string to verify 

184 request: Optional FastAPI/Starlette request for request-level caching. 

185 Must have a 'state' attribute to enable caching. 

186 

187 Returns: 

188 dict: Decoded and verified JWT payload 

189 

190 Raises: 

191 HTTPException: If token is invalid, expired, or missing required claims. 

192 """ 

193 # Check request.state cache first (safely handle non-Request objects) 

194 if request is not None and hasattr(request, "state"): 

195 cached = getattr(request.state, "_jwt_verified_payload", None) 

196 # Verify cache is a valid tuple of (token, payload) before unpacking 

197 if cached is not None and isinstance(cached, tuple) and len(cached) == 2: 

198 cached_token, cached_payload = cached 

199 if cached_token == token: 

200 return cached_payload 

201 

202 # Verify token (single decode) 

203 payload = await verify_jwt_token(token) 

204 

205 # Cache in request.state for reuse across middleware 

206 if request is not None and hasattr(request, "state"): 

207 request.state._jwt_verified_payload = (token, payload) 

208 

209 return payload 

210 

211 

212async def verify_credentials(token: str) -> dict: 

213 """Verify credentials using a JWT token. 

214 

215 A wrapper around verify_jwt_token that adds the original token 

216 to the decoded payload for reference. 

217 

218 This function uses verify_jwt_token internally which may raise exceptions. 

219 

220 Args: 

221 token: The JWT token string to verify. 

222 

223 Returns: 

224 dict: The validated token payload with the original token added 

225 under the 'token' key. 

226 

227 Examples: 

228 >>> from mcpgateway.utils import verify_credentials as vc 

229 >>> from mcpgateway.utils import jwt_config_helper as jch 

230 >>> from pydantic import SecretStr 

231 >>> class DummySettings: 

232 ... jwt_secret_key = 'secret' 

233 ... jwt_algorithm = 'HS256' 

234 ... jwt_audience = 'mcpgateway-api' 

235 ... jwt_issuer = 'mcpgateway' 

236 ... jwt_audience_verification = True 

237 ... jwt_issuer_verification = True 

238 ... jwt_public_key_path = '' 

239 ... jwt_private_key_path = '' 

240 ... basic_auth_user = 'user' 

241 ... basic_auth_password = SecretStr('pass') 

242 ... auth_required = True 

243 ... require_token_expiration = False 

244 ... require_jti = False 

245 ... validate_token_environment = False 

246 ... docs_allow_basic_auth = False 

247 >>> vc.settings = DummySettings() 

248 >>> jch.settings = DummySettings() 

249 >>> import jwt 

250 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'secret', algorithm='HS256') 

251 >>> import asyncio 

252 >>> payload = asyncio.run(vc.verify_credentials(token)) 

253 >>> payload['token'] == token 

254 True 

255 """ 

256 payload = await verify_jwt_token(token) 

257 payload["token"] = token 

258 return payload 

259 

260 

261async def verify_credentials_cached(token: str, request: Optional[Request] = None) -> dict: 

262 """Verify credentials using a JWT token with request-level caching. 

263 

264 A wrapper around verify_jwt_token_cached that adds the original token 

265 to the decoded payload for reference. 

266 

267 Args: 

268 token: The JWT token string to verify. 

269 request: Optional FastAPI/Starlette request for request-level caching. 

270 

271 Returns: 

272 dict: The validated token payload with the original token added 

273 under the 'token' key. Returns a copy to avoid mutating cached payload. 

274 """ 

275 payload = await verify_jwt_token_cached(token, request) 

276 # Return a copy with token added to avoid mutating the cached payload 

277 return {**payload, "token": token} 

278 

279 

280async def require_auth(request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), jwt_token: Optional[str] = Cookie(default=None)) -> str | dict: 

281 """Require authentication via JWT token or proxy headers. 

282 

283 FastAPI dependency that checks for authentication via: 

284 1. Proxy headers (if mcp_client_auth_enabled=false and trust_proxy_auth=true) 

285 2. JWT token in Authorization header (Bearer scheme) 

286 3. JWT token in cookies 

287 

288 If authentication is required but no token is provided, raises an HTTP 401 error. 

289 

290 Args: 

291 request: The FastAPI request object for accessing headers. 

292 credentials: HTTP Authorization credentials from the request header. 

293 jwt_token: JWT token from cookies. 

294 

295 Returns: 

296 str | dict: The verified credentials payload if authenticated, 

297 proxy user if proxy auth enabled, or "anonymous" if authentication is not required. 

298 

299 Raises: 

300 HTTPException: 401 status if authentication is required but no valid 

301 token is provided. 

302 

303 Examples: 

304 >>> from mcpgateway.utils import verify_credentials as vc 

305 >>> from mcpgateway.utils import jwt_config_helper as jch 

306 >>> from pydantic import SecretStr 

307 >>> class DummySettings: 

308 ... jwt_secret_key = 'secret' 

309 ... jwt_algorithm = 'HS256' 

310 ... jwt_audience = 'mcpgateway-api' 

311 ... jwt_issuer = 'mcpgateway' 

312 ... jwt_audience_verification = True 

313 ... jwt_issuer_verification = True 

314 ... jwt_public_key_path = '' 

315 ... jwt_private_key_path = '' 

316 ... basic_auth_user = 'user' 

317 ... basic_auth_password = SecretStr('pass') 

318 ... auth_required = True 

319 ... mcp_client_auth_enabled = True 

320 ... trust_proxy_auth = False 

321 ... proxy_user_header = 'X-Authenticated-User' 

322 ... require_token_expiration = False 

323 ... require_jti = False 

324 ... validate_token_environment = False 

325 ... docs_allow_basic_auth = False 

326 >>> vc.settings = DummySettings() 

327 >>> jch.settings = DummySettings() 

328 >>> import jwt 

329 >>> from fastapi.security import HTTPAuthorizationCredentials 

330 >>> from fastapi import Request 

331 >>> import asyncio 

332 

333 Test with valid credentials in header: 

334 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'secret', algorithm='HS256') 

335 >>> creds = HTTPAuthorizationCredentials(scheme='Bearer', credentials=token) 

336 >>> req = Request(scope={'type': 'http', 'headers': []}) 

337 >>> result = asyncio.run(vc.require_auth(request=req, credentials=creds, jwt_token=None)) 

338 >>> result['sub'] == 'alice' 

339 True 

340 

341 Test with valid token in cookie: 

342 >>> result = asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=token)) 

343 >>> result['sub'] == 'alice' 

344 True 

345 

346 Test with auth required but no token: 

347 >>> try: 

348 ... asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=None)) 

349 ... except vc.HTTPException as e: 

350 ... print(e.status_code, e.detail) 

351 401 Not authenticated 

352 

353 Test with auth not required: 

354 >>> vc.settings.auth_required = False 

355 >>> result = asyncio.run(vc.require_auth(request=req, credentials=None, jwt_token=None)) 

356 >>> result 

357 'anonymous' 

358 >>> vc.settings.auth_required = True 

359 """ 

360 # If MCP client auth is disabled and proxy auth is trusted, use proxy headers 

361 if not settings.mcp_client_auth_enabled: 

362 if settings.trust_proxy_auth: 

363 # Extract user from proxy header 

364 proxy_user = request.headers.get(settings.proxy_user_header) 

365 if proxy_user: 

366 return {"sub": proxy_user, "source": "proxy", "token": None} # nosec B105 - None is not a password 

367 # No proxy header - check auth_required (matches RBAC/WebSocket behavior) 

368 if settings.auth_required: 

369 raise HTTPException( 

370 status_code=status.HTTP_401_UNAUTHORIZED, 

371 detail="Proxy authentication header required", 

372 headers={"WWW-Authenticate": "Bearer"}, 

373 ) 

374 return "anonymous" 

375 else: 

376 # Warning: MCP auth disabled without proxy trust - security risk! 

377 # This case is already warned about in config validation 

378 if settings.auth_required: 

379 raise HTTPException( 

380 status_code=status.HTTP_401_UNAUTHORIZED, 

381 detail="Authentication required but no auth method configured", 

382 headers={"WWW-Authenticate": "Bearer"}, 

383 ) 

384 return "anonymous" 

385 

386 # Standard JWT authentication flow - prioritize manual cookie reading 

387 token = None 

388 

389 # 1. First try manual cookie reading (most reliable) 

390 if hasattr(request, "cookies") and request.cookies: 

391 manual_token = request.cookies.get("jwt_token") 

392 if manual_token: 

393 token = manual_token 

394 

395 # 2. Then try Authorization header 

396 if not token and credentials and credentials.credentials: 

397 token = credentials.credentials 

398 

399 # 3. Finally try FastAPI Cookie dependency (fallback) 

400 if not token and jwt_token: 

401 token = jwt_token 

402 

403 if settings.auth_required and not token: 

404 raise HTTPException( 

405 status_code=status.HTTP_401_UNAUTHORIZED, 

406 detail="Not authenticated", 

407 headers={"WWW-Authenticate": "Bearer"}, 

408 ) 

409 return await verify_credentials_cached(token, request) if token else "anonymous" 

410 

411 

412async def verify_basic_credentials(credentials: HTTPBasicCredentials) -> str: 

413 """Verify HTTP Basic authentication credentials. 

414 

415 Validates the provided username and password against the configured 

416 basic auth credentials in settings. 

417 

418 Args: 

419 credentials: HTTP Basic credentials containing username and password. 

420 

421 Returns: 

422 str: The authenticated username if credentials are valid. 

423 

424 Raises: 

425 HTTPException: 401 status if credentials are invalid. 

426 

427 Examples: 

428 >>> from mcpgateway.utils import verify_credentials as vc 

429 >>> from pydantic import SecretStr 

430 >>> class DummySettings: 

431 ... jwt_secret_key = 'secret' 

432 ... jwt_algorithm = 'HS256' 

433 ... jwt_audience = 'mcpgateway-api' 

434 ... jwt_issuer = 'mcpgateway' 

435 ... jwt_audience_verification = True 

436 ... jwt_issuer_verification = True 

437 ... basic_auth_user = 'user' 

438 ... basic_auth_password = SecretStr('pass') 

439 ... auth_required = True 

440 ... docs_allow_basic_auth = False 

441 >>> vc.settings = DummySettings() 

442 >>> from fastapi.security import HTTPBasicCredentials 

443 >>> creds = HTTPBasicCredentials(username='user', password='pass') 

444 >>> import asyncio 

445 >>> asyncio.run(vc.verify_basic_credentials(creds)) == 'user' 

446 True 

447 >>> creds_bad = HTTPBasicCredentials(username='user', password='wrong') 

448 >>> try: 

449 ... asyncio.run(vc.verify_basic_credentials(creds_bad)) 

450 ... except Exception as e: 

451 ... print('error') 

452 error 

453 """ 

454 is_valid_user = credentials.username == settings.basic_auth_user 

455 is_valid_pass = credentials.password == settings.basic_auth_password.get_secret_value() 

456 

457 if not (is_valid_user and is_valid_pass): 

458 raise HTTPException( 

459 status_code=status.HTTP_401_UNAUTHORIZED, 

460 detail="Invalid credentials", 

461 headers={"WWW-Authenticate": "Basic"}, 

462 ) 

463 return credentials.username 

464 

465 

466async def require_basic_auth(credentials: HTTPBasicCredentials = Depends(basic_security)) -> str: 

467 """Require valid HTTP Basic authentication. 

468 

469 FastAPI dependency that enforces Basic authentication when enabled. 

470 Returns the authenticated username or "anonymous" if auth is not required. 

471 

472 Args: 

473 credentials: HTTP Basic credentials provided by the client. 

474 

475 Returns: 

476 str: The authenticated username or "anonymous" if auth is not required. 

477 

478 Raises: 

479 HTTPException: 401 status if authentication is required but no valid 

480 credentials are provided. 

481 

482 Examples: 

483 >>> from mcpgateway.utils import verify_credentials as vc 

484 >>> from pydantic import SecretStr 

485 >>> class DummySettings: 

486 ... jwt_secret_key = 'secret' 

487 ... jwt_algorithm = 'HS256' 

488 ... jwt_audience = 'mcpgateway-api' 

489 ... jwt_issuer = 'mcpgateway' 

490 ... jwt_audience_verification = True 

491 ... jwt_issuer_verification = True 

492 ... basic_auth_user = 'user' 

493 ... basic_auth_password = SecretStr('pass') 

494 ... auth_required = True 

495 ... docs_allow_basic_auth = False 

496 >>> vc.settings = DummySettings() 

497 >>> from fastapi.security import HTTPBasicCredentials 

498 >>> import asyncio 

499 

500 Test with valid credentials: 

501 >>> creds = HTTPBasicCredentials(username='user', password='pass') 

502 >>> asyncio.run(vc.require_basic_auth(creds)) 

503 'user' 

504 

505 Test with auth required but no credentials: 

506 >>> try: 

507 ... asyncio.run(vc.require_basic_auth(None)) 

508 ... except vc.HTTPException as e: 

509 ... print(e.status_code, e.detail) 

510 401 Not authenticated 

511 

512 Test with auth not required: 

513 >>> vc.settings.auth_required = False 

514 >>> asyncio.run(vc.require_basic_auth(None)) 

515 'anonymous' 

516 >>> vc.settings.auth_required = True 

517 """ 

518 if settings.auth_required: 

519 if not credentials: 

520 raise HTTPException( 

521 status_code=status.HTTP_401_UNAUTHORIZED, 

522 detail="Not authenticated", 

523 headers={"WWW-Authenticate": "Basic"}, 

524 ) 

525 return await verify_basic_credentials(credentials) 

526 return "anonymous" 

527 

528 

529async def require_docs_basic_auth(auth_header: str) -> str: 

530 """Dedicated handler for HTTP Basic Auth for documentation endpoints only. 

531 

532 This function is ONLY intended for /docs, /redoc, or similar endpoints, and is enabled 

533 via the settings.docs_allow_basic_auth flag. It should NOT be used for general API authentication. 

534 

535 Args: 

536 auth_header: Raw Authorization header value (e.g. "Basic username:password"). 

537 

538 Returns: 

539 str: The authenticated username if credentials are valid. 

540 

541 Raises: 

542 HTTPException: If credentials are invalid or malformed. 

543 ValueError: If the basic auth format is invalid (missing colon). 

544 

545 Examples: 

546 >>> from mcpgateway.utils import verify_credentials as vc 

547 >>> from pydantic import SecretStr 

548 >>> class DummySettings: 

549 ... jwt_secret_key = 'secret' 

550 ... jwt_algorithm = 'HS256' 

551 ... jwt_audience = 'mcpgateway-api' 

552 ... jwt_issuer = 'mcpgateway' 

553 ... jwt_audience_verification = True 

554 ... jwt_issuer_verification = True 

555 ... basic_auth_user = 'user' 

556 ... basic_auth_password = SecretStr('pass') 

557 ... auth_required = True 

558 ... require_token_expiration = False 

559 ... require_jti = False 

560 ... validate_token_environment = False 

561 ... docs_allow_basic_auth = True 

562 >>> vc.settings = DummySettings() 

563 >>> import base64, asyncio 

564 

565 Test with properly encoded credentials: 

566 >>> userpass = base64.b64encode(b'user:pass').decode() 

567 >>> auth_header = f'Basic {userpass}' 

568 >>> asyncio.run(vc.require_docs_basic_auth(auth_header)) 

569 'user' 

570 

571 Test with different valid credentials: 

572 >>> valid_creds = base64.b64encode(b'user:pass').decode() 

573 >>> valid_header = f'Basic {valid_creds}' 

574 >>> result = asyncio.run(vc.require_docs_basic_auth(valid_header)) 

575 >>> result == 'user' 

576 True 

577 

578 Test with invalid password: 

579 >>> badpass = base64.b64encode(b'user:wrong').decode() 

580 >>> bad_header = f'Basic {badpass}' 

581 >>> try: 

582 ... asyncio.run(vc.require_docs_basic_auth(bad_header)) 

583 ... except vc.HTTPException as e: 

584 ... e.status_code == 401 

585 True 

586 

587 Test with malformed base64 (no colon): 

588 >>> malformed = base64.b64encode(b'userpass').decode() 

589 >>> malformed_header = f'Basic {malformed}' 

590 >>> try: 

591 ... asyncio.run(vc.require_docs_basic_auth(malformed_header)) 

592 ... except vc.HTTPException as e: 

593 ... e.status_code == 401 

594 True 

595 

596 Test with invalid base64 encoding: 

597 >>> invalid_header = 'Basic invalid_base64!' 

598 >>> try: 

599 ... asyncio.run(vc.require_docs_basic_auth(invalid_header)) 

600 ... except vc.HTTPException as e: 

601 ... 'Invalid basic auth credentials' in e.detail 

602 True 

603 

604 Test when docs_allow_basic_auth is disabled: 

605 >>> vc.settings.docs_allow_basic_auth = False 

606 >>> try: 

607 ... asyncio.run(vc.require_docs_basic_auth(auth_header)) 

608 ... except vc.HTTPException as e: 

609 ... 'not allowed' in e.detail 

610 True 

611 >>> vc.settings.docs_allow_basic_auth = True 

612 

613 Test with non-Basic auth scheme: 

614 >>> bearer_header = 'Bearer eyJhbGciOiJIUzI1NiJ9...' 

615 >>> try: 

616 ... asyncio.run(vc.require_docs_basic_auth(bearer_header)) 

617 ... except vc.HTTPException as e: 

618 ... e.status_code == 401 

619 True 

620 

621 Test with empty credentials part: 

622 >>> empty_header = 'Basic ' 

623 >>> try: 

624 ... asyncio.run(vc.require_docs_basic_auth(empty_header)) 

625 ... except vc.HTTPException as e: 

626 ... 'not allowed' in e.detail 

627 True 

628 

629 Test with Unicode decode error: 

630 >>> from base64 import b64encode 

631 >>> bad_bytes = bytes([0xff, 0xfe]) # Invalid UTF-8 bytes 

632 >>> bad_unicode = b64encode(bad_bytes).decode() 

633 >>> unicode_header = f'Basic {bad_unicode}' 

634 >>> try: 

635 ... asyncio.run(vc.require_docs_basic_auth(unicode_header)) 

636 ... except vc.HTTPException as e: 

637 ... 'Invalid basic auth credentials' in e.detail 

638 True 

639 """ 

640 scheme, param = get_authorization_scheme_param(auth_header) 

641 if scheme.lower() == "basic" and param and settings.docs_allow_basic_auth: 

642 try: 

643 data = b64decode(param).decode("ascii") 

644 username, separator, password = data.partition(":") 

645 if not separator: 

646 raise ValueError("Invalid basic auth format") 

647 credentials = HTTPBasicCredentials(username=username, password=password) 

648 return await require_basic_auth(credentials=credentials) 

649 except (ValueError, UnicodeDecodeError, binascii.Error): 

650 raise HTTPException( 

651 status_code=status.HTTP_401_UNAUTHORIZED, 

652 detail="Invalid basic auth credentials", 

653 headers={"WWW-Authenticate": "Basic"}, 

654 ) 

655 raise HTTPException( 

656 status_code=status.HTTP_401_UNAUTHORIZED, 

657 detail="Basic authentication not allowed or malformed", 

658 headers={"WWW-Authenticate": "Basic"}, 

659 ) 

660 

661 

662async def require_docs_auth_override( 

663 auth_header: str | None = None, 

664 jwt_token: str | None = None, 

665) -> str | dict: 

666 """Require authentication for docs endpoints, bypassing global auth settings. 

667 

668 This function specifically validates JWT tokens for documentation endpoints 

669 (/docs, /redoc, /openapi.json) regardless of global authentication settings 

670 like mcp_client_auth_enabled or auth_required. 

671 

672 Args: 

673 auth_header: Raw Authorization header value (e.g. "Bearer eyJhbGciOi..."). 

674 jwt_token: JWT token from cookies. 

675 

676 Returns: 

677 str | dict: The decoded JWT payload. 

678 

679 Raises: 

680 HTTPException: If authentication fails or credentials are invalid. 

681 

682 Examples: 

683 >>> from mcpgateway.utils import verify_credentials as vc 

684 >>> from mcpgateway.utils import jwt_config_helper as jch 

685 >>> class DummySettings: 

686 ... jwt_secret_key = 'secret' 

687 ... jwt_algorithm = 'HS256' 

688 ... jwt_audience = 'mcpgateway-api' 

689 ... jwt_issuer = 'mcpgateway' 

690 ... jwt_audience_verification = True 

691 ... jwt_issuer_verification = True 

692 ... jwt_public_key_path = '' 

693 ... jwt_private_key_path = '' 

694 ... docs_allow_basic_auth = False 

695 ... require_token_expiration = False 

696 ... require_jti = False 

697 ... validate_token_environment = False 

698 >>> vc.settings = DummySettings() 

699 >>> jch.settings = DummySettings() 

700 >>> import jwt 

701 >>> import asyncio 

702 

703 Test with valid JWT: 

704 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'secret', algorithm='HS256') 

705 >>> auth_header = f'Bearer {token}' 

706 >>> result = asyncio.run(vc.require_docs_auth_override(auth_header=auth_header)) 

707 >>> result['sub'] == 'alice' 

708 True 

709 

710 Test with no token: 

711 >>> try: 

712 ... asyncio.run(vc.require_docs_auth_override()) 

713 ... except vc.HTTPException as e: 

714 ... print(e.status_code, e.detail) 

715 401 Not authenticated 

716 """ 

717 # Extract token from header or cookie 

718 token = jwt_token 

719 if auth_header: 

720 scheme, param = get_authorization_scheme_param(auth_header) 

721 if scheme.lower() == "bearer" and param: 

722 token = param 

723 elif scheme.lower() == "basic" and param and settings.docs_allow_basic_auth: 

724 # Only allow Basic Auth for docs endpoints when explicitly enabled 

725 return await require_docs_basic_auth(auth_header) 

726 

727 # Always require a token for docs endpoints 

728 if not token: 

729 raise HTTPException( 

730 status_code=status.HTTP_401_UNAUTHORIZED, 

731 detail="Not authenticated", 

732 headers={"WWW-Authenticate": "Bearer"}, 

733 ) 

734 

735 # Validate the JWT token 

736 return await verify_credentials(token) 

737 

738 

739async def require_auth_override( 

740 auth_header: str | None = None, 

741 jwt_token: str | None = None, 

742 request: Request | None = None, 

743) -> str | dict: 

744 """Call require_auth manually from middleware without FastAPI dependency injection. 

745 

746 This wrapper allows manual authentication verification in contexts where 

747 FastAPI's dependency injection is not available (e.g., middleware). 

748 It parses the Authorization header and creates the appropriate credentials 

749 object before calling require_auth. 

750 

751 Args: 

752 auth_header: Raw Authorization header value (e.g. "Bearer eyJhbGciOi..."). 

753 jwt_token: JWT taken from a cookie. If both header and cookie are 

754 supplied, the header takes precedence. 

755 request: Optional Request object for accessing headers (used for proxy auth). 

756 

757 Returns: 

758 str | dict: The decoded JWT payload or the string "anonymous", 

759 same as require_auth. 

760 

761 Raises: 

762 HTTPException: If authentication fails or credentials are invalid. 

763 ValueError: If basic auth credentials are malformed. 

764 

765 Note: 

766 This wrapper may propagate HTTPException raised by require_auth, 

767 but it does not raise anything on its own. 

768 

769 Examples: 

770 >>> from mcpgateway.utils import verify_credentials as vc 

771 >>> from mcpgateway.utils import jwt_config_helper as jch 

772 >>> from pydantic import SecretStr 

773 >>> class DummySettings: 

774 ... jwt_secret_key = 'secret' 

775 ... jwt_algorithm = 'HS256' 

776 ... jwt_audience = 'mcpgateway-api' 

777 ... jwt_issuer = 'mcpgateway' 

778 ... jwt_audience_verification = True 

779 ... jwt_issuer_verification = True 

780 ... jwt_public_key_path = '' 

781 ... jwt_private_key_path = '' 

782 ... basic_auth_user = 'user' 

783 ... basic_auth_password = SecretStr('pass') 

784 ... auth_required = True 

785 ... mcp_client_auth_enabled = True 

786 ... trust_proxy_auth = False 

787 ... proxy_user_header = 'X-Authenticated-User' 

788 ... require_token_expiration = False 

789 ... require_jti = False 

790 ... validate_token_environment = False 

791 ... docs_allow_basic_auth = False 

792 >>> vc.settings = DummySettings() 

793 >>> jch.settings = DummySettings() 

794 >>> import jwt 

795 >>> import asyncio 

796 

797 Test with Bearer token in auth header: 

798 >>> token = jwt.encode({'sub': 'alice', 'aud': 'mcpgateway-api', 'iss': 'mcpgateway'}, 'secret', algorithm='HS256') 

799 >>> auth_header = f'Bearer {token}' 

800 >>> result = asyncio.run(vc.require_auth_override(auth_header=auth_header)) 

801 >>> result['sub'] == 'alice' 

802 True 

803 

804 Test with invalid auth scheme: 

805 >>> auth_header = 'Basic dXNlcjpwYXNz' # Base64 encoded user:pass 

806 >>> vc.settings.auth_required = False 

807 >>> result = asyncio.run(vc.require_auth_override(auth_header=auth_header)) 

808 >>> result 

809 'anonymous' 

810 

811 Test with only cookie token: 

812 >>> result = asyncio.run(vc.require_auth_override(jwt_token=token)) 

813 >>> result['sub'] == 'alice' 

814 True 

815 

816 Test with no auth: 

817 >>> result = asyncio.run(vc.require_auth_override()) 

818 >>> result 

819 'anonymous' 

820 >>> vc.settings.auth_required = True 

821 """ 

822 # Create a mock request if not provided (for backward compatibility) 

823 if request is None: 

824 request = Request(scope={"type": "http", "headers": []}) 

825 

826 credentials = None 

827 if auth_header: 

828 scheme, param = get_authorization_scheme_param(auth_header) 

829 if scheme.lower() == "bearer" and param: 

830 credentials = HTTPAuthorizationCredentials(scheme=scheme, credentials=param) 

831 elif scheme.lower() == "basic" and param and settings.docs_allow_basic_auth: 

832 # Only allow Basic Auth for docs endpoints when explicitly enabled 

833 return await require_docs_basic_auth(auth_header) 

834 return await require_auth(request=request, credentials=credentials, jwt_token=jwt_token) 

835 

836 

837async def require_admin_auth( 

838 request: Request, 

839 credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), 

840 jwt_token: Optional[str] = Cookie(None, alias="jwt_token"), 

841 basic_credentials: Optional[HTTPBasicCredentials] = Depends(basic_security), 

842) -> str: 

843 """Require admin authentication supporting both email auth and basic auth. 

844 

845 This dependency supports multiple authentication methods: 

846 1. Email-based JWT authentication (when EMAIL_AUTH_ENABLED=true) 

847 2. Basic authentication (legacy support) 

848 3. Proxy headers (if configured) 

849 

850 For email auth, the user must have is_admin=true. 

851 For basic auth, uses the configured BASIC_AUTH_USER/PASSWORD. 

852 

853 Args: 

854 request: FastAPI request object 

855 credentials: HTTP Authorization credentials 

856 jwt_token: JWT token from cookies 

857 basic_credentials: HTTP Basic auth credentials 

858 

859 Returns: 

860 str: Username/email of authenticated admin user 

861 

862 Raises: 

863 HTTPException: 401 if authentication fails, 403 if user is not admin 

864 RedirectResponse: Redirect to login page for browser requests 

865 

866 Examples: 

867 >>> # This function is typically used as a FastAPI dependency 

868 >>> callable(require_admin_auth) 

869 True 

870 """ 

871 # First-Party 

872 from mcpgateway.config import settings 

873 

874 # Try email authentication first if enabled 

875 if getattr(settings, "email_auth_enabled", False): 

876 try: 

877 # First-Party 

878 from mcpgateway.db import get_db 

879 from mcpgateway.services.email_auth_service import EmailAuthService 

880 

881 token = jwt_token 

882 if not token and credentials: 

883 token = credentials.credentials 

884 

885 if token: 

886 db_session = next(get_db()) 

887 try: 

888 # Decode and verify JWT token (use cached version for performance) 

889 payload = await verify_jwt_token_cached(token, request) 

890 username = payload.get("sub") or payload.get("username") # Support both new and legacy formats 

891 

892 if username: 

893 # Get user from database 

894 auth_service = EmailAuthService(db_session) 

895 current_user = await auth_service.get_user_by_email(username) 

896 

897 if current_user and current_user.is_admin: 

898 return current_user.email 

899 elif current_user: 

900 # User is authenticated but not admin - check if this is a browser request 

901 accept_header = request.headers.get("accept", "") 

902 if "text/html" in accept_header: 

903 # Redirect browser to login page with error 

904 root_path = request.scope.get("root_path", "") 

905 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Admin privileges required", headers={"Location": f"{root_path}/admin/login?error=admin_required"}) 

906 else: 

907 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required") 

908 else: 

909 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") 

910 except Exception: 

911 raise Exception 

912 finally: 

913 db_session.close() 

914 except HTTPException as e: 

915 # Re-raise HTTP exceptions (403, redirects, etc.) 

916 if e.status_code != status.HTTP_401_UNAUTHORIZED: 

917 raise 

918 # For 401, check if we should redirect browser users 

919 accept_header = request.headers.get("accept", "") 

920 if "text/html" in accept_header: 

921 root_path = request.scope.get("root_path", "") 

922 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{root_path}/admin/login"}) 

923 # If JWT auth fails, fall back to basic auth for backward compatibility 

924 except Exception: 

925 # If there's any other error with email auth, fall back to basic auth 

926 pass # nosec B110 - Intentional fallback to basic auth on any email auth error 

927 

928 # Fall back to basic authentication (gated by API_ALLOW_BASIC_AUTH) 

929 try: 

930 if basic_credentials: 

931 # SECURITY: Basic auth for API endpoints is disabled by default 

932 if not settings.api_allow_basic_auth: 

933 raise HTTPException( 

934 status_code=status.HTTP_401_UNAUTHORIZED, 

935 detail="Basic authentication is disabled for API endpoints. Use JWT or API tokens instead.", 

936 headers={"WWW-Authenticate": "Bearer"}, 

937 ) 

938 return await verify_basic_credentials(basic_credentials) 

939 else: 

940 raise HTTPException( 

941 status_code=status.HTTP_401_UNAUTHORIZED, 

942 detail="Authentication required", 

943 headers={"WWW-Authenticate": "Bearer"}, 

944 ) 

945 except HTTPException: 

946 # If both methods fail, check if we should redirect browser users to login page 

947 if getattr(settings, "email_auth_enabled", False): 

948 accept_header = request.headers.get("accept", "") 

949 is_htmx = request.headers.get("hx-request") == "true" 

950 if "text/html" in accept_header or is_htmx: 

951 root_path = request.scope.get("root_path", "") 

952 raise HTTPException(status_code=status.HTTP_302_FOUND, detail="Authentication required", headers={"Location": f"{root_path}/admin/login"}) 

953 else: 

954 raise HTTPException( 

955 status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required. Please login with email/password or use basic auth.", headers={"WWW-Authenticate": "Bearer"} 

956 ) 

957 else: 

958 # Re-raise the basic auth error 

959 raise