Coverage for mcpgateway / auth.py: 98%

663 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/auth.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Shared authentication utilities. 

8 

9This module provides common authentication functions that can be shared 

10across different parts of the application without creating circular imports. 

11""" 

12 

13# Standard 

14import asyncio 

15from datetime import datetime, timezone 

16import hashlib 

17import logging 

18import threading 

19from typing import Any, Dict, Generator, List, Never, Optional 

20import uuid 

21 

22# Third-Party 

23from fastapi import Depends, HTTPException, status 

24from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer 

25from sqlalchemy.orm import Session 

26from starlette.requests import Request 

27 

28# First-Party 

29from mcpgateway.common.validators import SecurityValidator 

30from mcpgateway.config import settings 

31from mcpgateway.db import EmailUser, fresh_db_session, SessionLocal 

32from mcpgateway.plugins.framework import get_plugin_manager, GlobalContext, HttpAuthResolveUserPayload, HttpHeaderPayload, HttpHookType, PluginViolationError 

33from mcpgateway.utils.correlation_id import get_correlation_id 

34from mcpgateway.utils.trace_context import ( 

35 clear_trace_context, 

36 set_trace_auth_method, 

37 set_trace_context_from_teams, 

38 set_trace_team_scope, 

39 set_trace_user_email, 

40 set_trace_user_is_admin, 

41) 

42from mcpgateway.utils.verify_credentials import verify_jwt_token_cached 

43 

44# Security scheme 

45security = HTTPBearer(auto_error=False) 

46 

47# Module-level sync Redis client for rate-limiting (lazy-initialized) 

48_SYNC_REDIS_CLIENT = None # pylint: disable=invalid-name 

49_SYNC_REDIS_LOCK = threading.Lock() 

50_SYNC_REDIS_FAILURE_TIME: Optional[float] = None # Backoff after connection failures 

51 

52# Module-level in-memory cache for last_used rate-limiting (fallback when Redis unavailable) 

53_LAST_USED_CACHE: dict = {} 

54_LAST_USED_CACHE_LOCK = threading.Lock() 

55 

56 

57def _log_auth_event( 

58 logger: logging.Logger, 

59 message: str, 

60 level: int = logging.INFO, 

61 user_id: Optional[str] = None, 

62 auth_method: Optional[str] = None, 

63 auth_success: bool = False, 

64 security_event: Optional[str] = None, 

65 security_severity: str = "low", 

66 **extra_context, 

67) -> None: 

68 """Log authentication event with structured context and request_id. 

69 

70 This helper creates structured log records that include request_id from the 

71 correlation ID context, enabling end-to-end tracing of authentication flows. 

72 

73 Args: 

74 logger: Logger instance to use 

75 message: Log message 

76 level: Log level (default: INFO) 

77 user_id: User identifier 

78 auth_method: Authentication method used (jwt, api_token, etc.) 

79 auth_success: Whether authentication succeeded 

80 security_event: Type of security event (authentication, authorization, etc.) 

81 security_severity: Severity level (low, medium, high, critical) 

82 **extra_context: Additional context fields 

83 """ 

84 # Get request_id from correlation ID context 

85 request_id = get_correlation_id() 

86 

87 # Build structured log record 

88 extra = { 

89 "request_id": request_id, 

90 "entity_type": "auth", 

91 "auth_success": auth_success, 

92 "security_event": security_event or "authentication", 

93 "security_severity": security_severity, 

94 } 

95 

96 if user_id: 

97 extra["user_id"] = user_id 

98 if auth_method: 

99 extra["auth_method"] = auth_method 

100 

101 # Add any additional context 

102 extra.update(extra_context) 

103 

104 # Log with structured context 

105 logger.log(level, message, extra=extra) 

106 

107 

108def get_db() -> Generator[Session, Never, None]: 

109 """Database dependency. 

110 

111 Commits the transaction on successful completion to avoid implicit rollbacks 

112 for read-only operations. Rolls back explicitly on exception. 

113 

114 Yields: 

115 Session: SQLAlchemy database session 

116 

117 Raises: 

118 Exception: Re-raises any exception after rolling back the transaction. 

119 

120 Examples: 

121 >>> db_gen = get_db() 

122 >>> db = next(db_gen) 

123 >>> hasattr(db, 'query') 

124 True 

125 >>> hasattr(db, 'close') 

126 True 

127 """ 

128 db = SessionLocal() 

129 try: 

130 yield db 

131 db.commit() 

132 except Exception: 

133 try: 

134 db.rollback() 

135 except Exception: 

136 try: 

137 db.invalidate() 

138 except Exception: 

139 pass # nosec B110 - Best effort cleanup on connection failure 

140 raise 

141 finally: 

142 db.close() 

143 

144 

145def _get_personal_team_sync(user_email: str) -> Optional[str]: 

146 """Synchronous helper to get user's personal team using a fresh DB session. 

147 

148 This runs in a thread pool to avoid blocking the event loop. 

149 

150 Args: 

151 user_email: The user's email address. 

152 

153 Returns: 

154 The personal team ID, or None if not found. 

155 """ 

156 with fresh_db_session() as db: 

157 # Third-Party 

158 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

159 

160 # First-Party 

161 from mcpgateway.db import EmailTeam, EmailTeamMember # pylint: disable=import-outside-toplevel 

162 

163 result = db.execute(select(EmailTeam).join(EmailTeamMember).where(EmailTeamMember.user_email == user_email, EmailTeam.is_personal.is_(True))) 

164 personal_team = result.scalar_one_or_none() 

165 return personal_team.id if personal_team else None 

166 

167 

168def _get_user_team_ids_sync(email: str) -> List[str]: 

169 """Query all active team IDs for a user (including personal teams). 

170 

171 Uses a fresh DB session so this can be called from thread pool. 

172 Matches the behavior of user.get_teams() which returns all active memberships. 

173 

174 Args: 

175 email: User email address 

176 

177 Returns: 

178 List of team ID strings 

179 """ 

180 with fresh_db_session() as db: 

181 # Third-Party 

182 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

183 

184 # First-Party 

185 from mcpgateway.db import EmailTeamMember # pylint: disable=import-outside-toplevel 

186 

187 result = db.execute( 

188 select(EmailTeamMember.team_id).where( 

189 EmailTeamMember.user_email == email, 

190 EmailTeamMember.is_active.is_(True), 

191 ) 

192 ) 

193 return [row[0] for row in result.all()] 

194 

195 

196def _get_team_name_by_id_sync(team_id: Optional[str]) -> Optional[str]: 

197 """Return the active team display name for a team ID. 

198 

199 Args: 

200 team_id: Team identifier to resolve. 

201 

202 Returns: 

203 Team display name when the active team exists, otherwise ``None``. 

204 """ 

205 if not team_id: 

206 return None 

207 

208 with fresh_db_session() as db: 

209 # Third-Party 

210 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

211 

212 # First-Party 

213 from mcpgateway.db import EmailTeam # pylint: disable=import-outside-toplevel 

214 

215 result = db.execute( 

216 select(EmailTeam.name).where( 

217 EmailTeam.id == team_id, 

218 EmailTeam.is_active.is_(True), 

219 ) 

220 ) 

221 return result.scalar_one_or_none() 

222 

223 

224def _extract_claim_team_name(payload: Dict[str, Any], team_id: Optional[str]) -> Optional[str]: 

225 """Extract a matching team display name from raw JWT team claims. 

226 

227 Args: 

228 payload: Decoded JWT payload. 

229 team_id: Normalized primary team identifier to match. 

230 

231 Returns: 

232 Matching team display name from the JWT claims, if present. 

233 """ 

234 if not team_id: 

235 return None 

236 

237 raw_teams = payload.get("teams") 

238 if not isinstance(raw_teams, list): 

239 return None 

240 

241 for raw_team in raw_teams: 

242 raw_team_id = None 

243 raw_team_name = None 

244 if isinstance(raw_team, dict): 

245 raw_team_id = raw_team.get("id") 

246 raw_team_name = raw_team.get("name") 

247 elif isinstance(raw_team, str): 

248 raw_team_id = raw_team 

249 

250 if str(raw_team_id).strip() != team_id: 

251 continue 

252 

253 if raw_team_name is None: 

254 return None 

255 

256 normalized_name = str(raw_team_name).strip() 

257 return normalized_name or None 

258 

259 return None 

260 

261 

262async def resolve_trace_team_name( 

263 payload: Dict[str, Any], 

264 token_teams: Optional[List[str]], 

265 *, 

266 preresolved_team_names: Optional[Dict[str, str]] = None, 

267) -> Optional[str]: 

268 """Resolve the primary team display name for tracing. 

269 

270 The primary team name is additive trace metadata only. It does not affect 

271 scope enforcement, which continues to rely on canonical team IDs. For 

272 session tokens, DB-resolved membership is authoritative and raw JWT team 

273 display names are only used as a best-effort fallback for non-session 

274 tokens when no canonical name can be resolved. 

275 

276 Args: 

277 payload: Decoded JWT payload. 

278 token_teams: Canonical resolved team IDs, or ``None`` for admin scope. 

279 preresolved_team_names: Optional mapping of team_id to display name from 

280 a batched DB lookup. 

281 

282 Returns: 

283 Display name for the primary concrete team, or ``None`` for public/admin 

284 scopes or when the name cannot be resolved. 

285 """ 

286 if not token_teams: 

287 return None 

288 

289 primary_team_id = token_teams[0] 

290 if preresolved_team_names: 

291 resolved_name = preresolved_team_names.get(primary_team_id) 

292 if resolved_name: 

293 return resolved_name 

294 

295 try: 

296 resolved_name = await asyncio.to_thread(_get_team_name_by_id_sync, primary_team_id) 

297 if resolved_name: 

298 return resolved_name 

299 except Exception as exc: 

300 logging.getLogger(__name__).debug("Failed to resolve trace team name for team_id=%s: %s", primary_team_id, exc) 

301 

302 if payload.get("token_use") == "session": 

303 return None 

304 

305 claim_team_name = _extract_claim_team_name(payload, primary_team_id) 

306 if claim_team_name: 

307 return claim_team_name 

308 

309 return None 

310 

311 

312def get_user_team_roles(db, user_email: str) -> Dict[str, str]: 

313 """Return a {team_id: role} mapping for a user's active team memberships. 

314 

315 Args: 

316 db: SQLAlchemy database session. 

317 user_email: Email address of the user to query memberships for. 

318 

319 Returns: 

320 Dict mapping team_id to the user's role in that team. 

321 Returns empty dict on DB errors (safe default — headers stay masked). 

322 """ 

323 try: 

324 # First-Party 

325 from mcpgateway.db import EmailTeamMember # pylint: disable=import-outside-toplevel 

326 

327 rows = db.query(EmailTeamMember.team_id, EmailTeamMember.role).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).all() 

328 return {r.team_id: r.role for r in rows} 

329 except Exception: 

330 return {} 

331 

332 

333def _narrow_by_jwt_teams(payload: Dict[str, Any], db_teams: Optional[List[str]]) -> Optional[List[str]]: 

334 """Apply JWT intersection policy to DB-resolved teams. 

335 

336 If *db_teams* is ``None`` (admin bypass), returns ``None`` immediately. 

337 If the JWT ``teams`` claim is a non-empty list, returns the intersection 

338 of *db_teams* and the JWT teams. If the intersection is empty (e.g. 

339 all JWT-claimed teams have been revoked), returns ``[]`` so that 

340 downstream enforcement denies the request rather than silently 

341 broadening scope. 

342 

343 Args: 

344 payload: The decoded JWT payload dict. 

345 db_teams: Teams resolved from the database, or ``None`` for admin bypass. 

346 

347 Returns: 

348 None (admin bypass), [] (public-only / empty intersection), or list of team ID strings. 

349 """ 

350 if db_teams is None: 

351 return None 

352 

353 jwt_teams = payload.get("teams") 

354 if isinstance(jwt_teams, list) and jwt_teams: 

355 # Non-empty JWT teams → intersect with DB teams. An empty 

356 # intersection (all JWT teams revoked) returns [], which gives 

357 # public-only access and lets downstream enforcement deny the 

358 # request (fail-closed). 

359 jwt_team_set = set(normalize_token_teams({"teams": jwt_teams}) or []) 

360 return [t for t in db_teams if t in jwt_team_set] 

361 

362 # JWT teams absent, null, or empty list → no narrowing requested. 

363 # An explicit ``teams: []`` means "don't restrict by team" (i.e. use 

364 # the full DB membership), which intentionally differs from 

365 # ``normalize_token_teams`` where ``[] → public-only``. The 

366 # distinction exists because session tokens always start from DB- 

367 # resolved teams — an empty JWT claim simply means the caller did not 

368 # request a subset. 

369 return db_teams 

370 

371 

372async def _resolve_teams_from_db(email: str, user_info) -> Optional[List[str]]: 

373 """Resolve teams for session tokens from DB/cache. 

374 

375 For admin users, returns None (admin bypass). 

376 For non-admin users, returns the full list of team IDs from DB/cache. 

377 

378 Args: 

379 email: User email address 

380 user_info: User dict or EmailUser instance 

381 

382 Returns: 

383 None (admin bypass), [] (no teams), or list of team ID strings 

384 """ 

385 is_admin = user_info.get("is_admin", False) if isinstance(user_info, dict) else getattr(user_info, "is_admin", False) 

386 if is_admin: 

387 return None # Admin bypass 

388 

389 # Try auth cache first 

390 try: 

391 # First-Party 

392 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

393 

394 cached_teams = await auth_cache.get_user_teams(f"{email}:True") 

395 if cached_teams is not None: 

396 return cached_teams 

397 except Exception: # nosec B110 - Cache unavailable is non-fatal, fall through to DB 

398 pass 

399 

400 # Cache miss: query DB 

401 team_ids = await asyncio.to_thread(_get_user_team_ids_sync, email) 

402 

403 # Cache the result 

404 try: 

405 # First-Party 

406 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

407 

408 await auth_cache.set_user_teams(f"{email}:True", team_ids) 

409 except Exception: # nosec B110 - Cache write failure is non-fatal 

410 pass 

411 

412 return team_ids 

413 

414 

415_UNSET: Any = object() # sentinel distinguishing "not supplied" from explicit None 

416 

417 

418async def resolve_session_teams( 

419 payload: Dict[str, Any], 

420 email: Optional[str], 

421 user_info, 

422 *, 

423 preresolved_db_teams: Optional[List[str]] = _UNSET, 

424) -> Optional[List[str]]: 

425 """Resolve teams for a session token, using DB as the authority. 

426 

427 The database is always consulted first so that revoked team memberships 

428 take effect immediately. If the JWT carries a ``teams`` claim, the 

429 result is narrowed to the **intersection** of the DB teams and the JWT 

430 teams — this lets callers scope a session to a subset of their actual 

431 memberships (e.g. single-team mode) without risking stale grants. 

432 

433 This is the **single policy point** for session-token team resolution. 

434 All code paths that need teams for a session token should call this 

435 function rather than inlining the decision. 

436 

437 If *email* is ``None`` or empty, returns ``[]`` (public-only). An 

438 identity-less session token never receives admin bypass — there is no 

439 user to resolve from the database. 

440 

441 Policy: 

442 1. If *email* is falsy, return ``[]`` immediately (public-only). 

443 2. Resolve teams from DB/cache (``_resolve_teams_from_db``), or 

444 use *preresolved_db_teams* when the caller already fetched them 

445 (e.g. via a batched query). 

446 3. If DB returns ``None`` (admin bypass), return ``None``. 

447 4. If the JWT ``teams`` claim is a non-empty list, intersect with 

448 DB teams. If the intersection is empty (all JWT-claimed teams 

449 revoked), return ``[]`` so downstream enforcement denies the 

450 request. 

451 5. Otherwise return the full DB result. 

452 

453 Args: 

454 payload: The decoded JWT payload dict. 

455 email: User email address (for the DB lookup), or ``None``. 

456 user_info: User dict or EmailUser instance (for admin detection). 

457 preresolved_db_teams: If the caller already resolved DB teams (e.g. 

458 from a batched query), pass them here to skip the DB call. 

459 Pass ``None`` to indicate admin bypass was already determined. 

460 

461 Returns: 

462 None (admin bypass), [] (public-only), or list of team ID strings. 

463 """ 

464 if not email: 

465 return [] # No identity — public-only; never admin bypass 

466 if preresolved_db_teams is not _UNSET: 

467 db_teams: Optional[List[str]] = preresolved_db_teams 

468 else: 

469 db_teams = await _resolve_teams_from_db(email, user_info) 

470 

471 return _narrow_by_jwt_teams(payload, db_teams) 

472 

473 

474def normalize_token_teams(payload: Dict[str, Any]) -> Optional[List[str]]: 

475 """ 

476 Normalize token teams to a canonical form for consistent security checks. 

477 

478 SECURITY: This is the single source of truth for token team normalization. 

479 All code paths that read token teams should use this function. 

480 

481 Rules: 

482 - "teams" key missing → [] (public-only, secure default) 

483 - "teams" is null + is_admin=true → None (admin bypass, sees all) 

484 - "teams" is null + is_admin=false → [] (public-only, no bypass for non-admins) 

485 - "teams" is [] → [] (explicit public-only) 

486 - "teams" is [...] → normalized list of string IDs 

487 

488 Args: 

489 payload: The JWT payload dict 

490 

491 Returns: 

492 None for admin bypass, [] for public-only, or list of normalized team ID strings 

493 """ 

494 # Check if "teams" key exists (distinguishes missing from explicit null) 

495 if "teams" not in payload: 

496 # Missing teams key → public-only (secure default) 

497 return [] 

498 

499 teams = payload.get("teams") 

500 

501 if teams is None: 

502 # Explicit null - only allow admin bypass if is_admin is true 

503 # Check BOTH top-level is_admin AND nested user.is_admin 

504 is_admin = payload.get("is_admin", False) 

505 if not is_admin: 

506 user_info = payload.get("user", {}) 

507 is_admin = user_info.get("is_admin", False) if isinstance(user_info, dict) else False 

508 if is_admin: 

509 # Admin with explicit null teams → admin bypass (sees all) 

510 return None 

511 # Non-admin with null teams → public-only (no bypass) 

512 return [] 

513 

514 # teams is a list - normalize to string IDs 

515 # Handle both dict format [{"id": "team1"}] and string format ["team1"] 

516 normalized: List[str] = [] 

517 for team in teams: 

518 if isinstance(team, dict): 

519 team_id = team.get("id") 

520 if team_id: 

521 normalized.append(str(team_id)) 

522 elif isinstance(team, str): 

523 normalized.append(team) 

524 return normalized 

525 

526 

527async def get_team_from_token(payload: Dict[str, Any]) -> Optional[str]: 

528 """ 

529 Extract the team ID from an authentication token payload. 

530 

531 SECURITY: This function uses secure-first defaults: 

532 - Missing teams key = public-only (no personal team fallback) 

533 - Empty teams list = public-only (no team access) 

534 - Teams with values = use first team ID 

535 

536 This prevents privilege escalation where missing claims could grant 

537 unintended team access. 

538 

539 Args: 

540 payload (Dict[str, Any]): 

541 The token payload. Expected fields: 

542 - "sub" (str): The user's unique identifier (email). 

543 - "teams" (List[str], optional): List containing team ID. 

544 

545 Returns: 

546 Optional[str]: 

547 The resolved team ID. Returns `None` if teams is missing or empty. 

548 

549 Examples: 

550 >>> import asyncio 

551 >>> # --- Case 1: Token has team --- 

552 >>> payload = {"sub": "user@example.com", "teams": ["team_456"]} 

553 >>> asyncio.run(get_team_from_token(payload)) 

554 'team_456' 

555 

556 >>> # --- Case 2: Token has explicit empty teams (public-only) --- 

557 >>> payload = {"sub": "user@example.com", "teams": []} 

558 >>> asyncio.run(get_team_from_token(payload)) # Returns None 

559 >>> # None 

560 

561 >>> # --- Case 3: Token has no teams key (secure default) --- 

562 >>> payload = {"sub": "user@example.com"} 

563 >>> asyncio.run(get_team_from_token(payload)) # Returns None 

564 >>> # None 

565 """ 

566 teams = payload.get("teams") 

567 

568 # SECURITY: Treat missing teams as public-only (secure default) 

569 # - teams is None (missing key): Public-only (secure default, no legacy fallback) 

570 # - teams == [] (explicit empty list): Public-only, no team access 

571 # - teams == [...] (has teams): Use first team 

572 # Admin bypass is handled separately via is_admin flag in token, not via missing teams 

573 if teams is None or len(teams) == 0: 

574 # Missing teams or explicit empty = public-only, no fallback to personal team 

575 return None 

576 

577 # Has teams - use the first one 

578 team_id = teams[0] 

579 if isinstance(team_id, dict): 

580 team_id = team_id.get("id") 

581 return team_id 

582 

583 

584def _check_token_revoked_sync(jti: str) -> bool: 

585 """Synchronous helper to check if a token is revoked. 

586 

587 This runs in a thread pool to avoid blocking the event loop. 

588 

589 Args: 

590 jti: The JWT ID to check. 

591 

592 Returns: 

593 True if the token is revoked, False otherwise. 

594 """ 

595 with fresh_db_session() as db: 

596 # Third-Party 

597 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

598 

599 # First-Party 

600 from mcpgateway.db import TokenRevocation # pylint: disable=import-outside-toplevel 

601 

602 result = db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti)) 

603 return result.scalar_one_or_none() is not None 

604 

605 

606def _lookup_api_token_sync(token_hash: str) -> Optional[Dict[str, Any]]: 

607 """Synchronous helper to look up an API token by hash. 

608 

609 This runs in a thread pool to avoid blocking the event loop. 

610 

611 Args: 

612 token_hash: SHA256 hash of the API token. 

613 

614 Returns: 

615 Dict with token info if found and active, None otherwise. 

616 """ 

617 with fresh_db_session() as db: 

618 # Third-Party 

619 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

620 

621 # First-Party 

622 from mcpgateway.db import EmailApiToken, utc_now # pylint: disable=import-outside-toplevel 

623 

624 result = db.execute(select(EmailApiToken).where(EmailApiToken.token_hash == token_hash, EmailApiToken.is_active.is_(True))) 

625 api_token = result.scalar_one_or_none() 

626 

627 if not api_token: 

628 return None 

629 

630 # Check expiration 

631 if api_token.expires_at: 

632 expires_at = api_token.expires_at.replace(tzinfo=timezone.utc) if api_token.expires_at.tzinfo is None else api_token.expires_at 

633 if utc_now() > expires_at: 

634 return {"expired": True} 

635 

636 # Check revocation 

637 # First-Party 

638 from mcpgateway.db import TokenRevocation # pylint: disable=import-outside-toplevel 

639 

640 revoke_result = db.execute(select(TokenRevocation).where(TokenRevocation.jti == api_token.jti)) 

641 if revoke_result.scalar_one_or_none() is not None: 

642 return {"revoked": True} 

643 

644 # Update last_used timestamp 

645 api_token.last_used = utc_now() 

646 db.commit() 

647 

648 return { 

649 "user_email": api_token.user_email, 

650 "jti": api_token.jti, 

651 } 

652 

653 

654def _get_sync_redis_client(): 

655 """Get or create module-level synchronous Redis client for rate-limiting. 

656 

657 Returns: 

658 Redis client or None if Redis is unavailable/disabled. 

659 """ 

660 global _SYNC_REDIS_CLIENT, _SYNC_REDIS_FAILURE_TIME # pylint: disable=global-statement 

661 

662 # Standard 

663 import logging as log # pylint: disable=import-outside-toplevel,reimported 

664 import time # pylint: disable=import-outside-toplevel 

665 

666 # First-Party 

667 from mcpgateway.config import settings as config_settings # pylint: disable=import-outside-toplevel,reimported 

668 

669 # Quick check without lock 

670 if _SYNC_REDIS_CLIENT is not None or not (config_settings.redis_url and config_settings.redis_url.strip() and config_settings.cache_type == "redis"): 

671 return _SYNC_REDIS_CLIENT 

672 

673 # Backoff after recent failure (30 seconds) 

674 if _SYNC_REDIS_FAILURE_TIME and (time.time() - _SYNC_REDIS_FAILURE_TIME < 30): 

675 return None 

676 

677 # Lazy initialization with lock 

678 with _SYNC_REDIS_LOCK: 

679 # Double-check after acquiring lock 

680 if _SYNC_REDIS_CLIENT is not None: 

681 return _SYNC_REDIS_CLIENT 

682 

683 try: 

684 # Third-Party 

685 import redis # pylint: disable=import-outside-toplevel 

686 

687 _SYNC_REDIS_CLIENT = redis.from_url(config_settings.redis_url, decode_responses=True, socket_connect_timeout=2, socket_timeout=2) 

688 # Test connection 

689 _SYNC_REDIS_CLIENT.ping() 

690 _SYNC_REDIS_FAILURE_TIME = None # Clear failure state on success 

691 log.getLogger(__name__).debug("Sync Redis client initialized for API token rate-limiting") 

692 except Exception as e: 

693 log.getLogger(__name__).debug(f"Sync Redis client unavailable: {e}") 

694 _SYNC_REDIS_CLIENT = None 

695 _SYNC_REDIS_FAILURE_TIME = time.time() 

696 

697 return _SYNC_REDIS_CLIENT 

698 

699 

700def _update_api_token_last_used_sync(jti: str) -> None: 

701 """Update last_used timestamp for an API token with rate-limiting. 

702 

703 This function is called when an API token is successfully validated via JWT, 

704 ensuring the last_used field stays current for monitoring and security audits. 

705 

706 Rate-limiting: Uses Redis cache (or in-memory fallback) to track the last 

707 update time and only writes to the database if the configured interval has 

708 elapsed. This prevents excessive DB writes on high-traffic tokens. 

709 

710 Args: 

711 jti: JWT ID of the API token 

712 

713 Note: 

714 Called via asyncio.to_thread() to avoid blocking the event loop. 

715 Uses fresh_db_session() for thread-safe database access. 

716 """ 

717 # Standard 

718 import time # pylint: disable=import-outside-toplevel,redefined-outer-name 

719 

720 # First-Party 

721 from mcpgateway.config import settings as config_settings # pylint: disable=import-outside-toplevel,reimported 

722 

723 # Rate-limiting cache key 

724 cache_key = f"api_token_last_used:{jti}" 

725 update_interval_seconds = config_settings.token_last_used_update_interval_minutes * 60 

726 

727 # Try Redis rate-limiting first (if available) 

728 redis_client = _get_sync_redis_client() 

729 if redis_client: 

730 try: 

731 last_update = redis_client.get(cache_key) 

732 if last_update: 

733 # Check if enough time has elapsed 

734 time_since_update = time.time() - float(last_update) 

735 if time_since_update < update_interval_seconds: 

736 return # Skip update - too soon 

737 

738 # Update DB and cache 

739 with fresh_db_session() as db: 

740 # Third-Party 

741 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

742 

743 # First-Party 

744 from mcpgateway.db import EmailApiToken, utc_now # pylint: disable=import-outside-toplevel 

745 

746 result = db.execute(select(EmailApiToken).where(EmailApiToken.jti == jti)) 

747 api_token = result.scalar_one_or_none() 

748 if api_token: 

749 api_token.last_used = utc_now() 

750 db.commit() 

751 # Update Redis cache with current timestamp 

752 redis_client.setex(cache_key, update_interval_seconds * 2, str(time.time())) 

753 return 

754 except Exception as exc: 

755 # Redis failed, fall through to in-memory cache 

756 logger = logging.getLogger(__name__) 

757 logger.debug("Redis unavailable for API token rate-limiting, using in-memory fallback: %s", exc) 

758 

759 # Fallback: In-memory cache (module-level dict with threading.Lock for thread-safety) 

760 # Note: This is per-process and won't work in multi-worker deployments 

761 # but provides basic rate-limiting when Redis is unavailable 

762 max_cache_size = 1024 # Prevent unbounded growth 

763 

764 with _LAST_USED_CACHE_LOCK: 

765 last_update = _LAST_USED_CACHE.get(jti) 

766 if last_update: 

767 time_since_update = time.time() - last_update 

768 if time_since_update < update_interval_seconds: 

769 return # Skip update - too soon 

770 

771 # Update DB and cache 

772 with fresh_db_session() as db: 

773 # Third-Party 

774 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

775 

776 # First-Party 

777 from mcpgateway.db import EmailApiToken, utc_now # pylint: disable=import-outside-toplevel 

778 

779 result = db.execute(select(EmailApiToken).where(EmailApiToken.jti == jti)) 

780 api_token = result.scalar_one_or_none() 

781 if api_token: 

782 api_token.last_used = utc_now() 

783 db.commit() 

784 # Update in-memory cache (with lock for thread-safety) 

785 with _LAST_USED_CACHE_LOCK: 

786 if len(_LAST_USED_CACHE) >= max_cache_size: 

787 # Evict oldest entries (by timestamp value) 

788 sorted_keys = sorted(_LAST_USED_CACHE, key=_LAST_USED_CACHE.get) # type: ignore[arg-type] 

789 for k in sorted_keys[: len(_LAST_USED_CACHE) // 2]: 

790 del _LAST_USED_CACHE[k] 

791 _LAST_USED_CACHE[jti] = time.time() 

792 

793 

794def _is_api_token_jti_sync(jti: str) -> bool: 

795 """Check if JTI belongs to an API token (legacy fallback) - SYNC version. 

796 

797 Used for tokens created before auth_provider was added to the payload. 

798 Called via asyncio.to_thread() to avoid blocking the event loop. 

799 

800 SECURITY: Fail-closed on DB errors. If we can't verify the token isn't 

801 an API token, treat it as one to preserve the hard-block policy. 

802 

803 Args: 

804 jti: JWT ID to check 

805 

806 Returns: 

807 bool: True if JTI exists in email_api_tokens table OR if lookup fails 

808 """ 

809 # Third-Party 

810 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

811 

812 # First-Party 

813 from mcpgateway.db import EmailApiToken # pylint: disable=import-outside-toplevel 

814 

815 try: 

816 with fresh_db_session() as db: 

817 result = db.execute(select(EmailApiToken.id).where(EmailApiToken.jti == jti).limit(1)) 

818 return result.scalar_one_or_none() is not None 

819 except Exception as e: 

820 logging.getLogger(__name__).warning(f"Legacy API token check failed, failing closed: {e}") 

821 return True # FAIL-CLOSED: treat as API token to preserve hard-block 

822 

823 

824def _get_user_by_email_sync(email: str) -> Optional[EmailUser]: 

825 """Synchronous helper to get user by email. 

826 

827 This runs in a thread pool to avoid blocking the event loop. 

828 

829 Args: 

830 email: The user's email address. 

831 

832 Returns: 

833 EmailUser if found, None otherwise. 

834 """ 

835 with fresh_db_session() as db: 

836 # Third-Party 

837 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

838 

839 result = db.execute(select(EmailUser).where(EmailUser.email == email)) 

840 user = result.scalar_one_or_none() 

841 if user: 

842 # Detach from session and return a copy of attributes 

843 # since the session will be closed 

844 return EmailUser( 

845 email=user.email, 

846 password_hash=user.password_hash, 

847 full_name=user.full_name, 

848 is_admin=user.is_admin, 

849 is_active=user.is_active, 

850 auth_provider=user.auth_provider, 

851 password_change_required=user.password_change_required, 

852 email_verified_at=user.email_verified_at, 

853 created_at=user.created_at, 

854 updated_at=user.updated_at, 

855 ) 

856 return None 

857 

858 

859def _resolve_plugin_authenticated_user_sync(user_dict: Dict[str, Any]) -> Optional[EmailUser]: 

860 """Resolve plugin-authenticated user against database-backed identity state. 

861 

862 Plugin hooks may authenticate a request and return identity claims. This 

863 helper enforces that admin status is always derived from the database record. 

864 

865 Behavior: 

866 - Existing DB user: return DB user (authoritative for is_admin/is_active). 

867 - Missing DB user and REQUIRE_USER_IN_DB=true: reject (None). 

868 - Missing DB user and REQUIRE_USER_IN_DB=false: allow a non-admin virtual 

869 user built from non-privileged plugin claims. 

870 

871 Args: 

872 user_dict: Identity claims returned by plugin auth hook. 

873 

874 Returns: 

875 EmailUser when identity is accepted, otherwise None. 

876 """ 

877 email = str(user_dict.get("email") or "").strip() 

878 if not email: 

879 return None 

880 

881 db_user = _get_user_by_email_sync(email) 

882 if db_user: 

883 return db_user 

884 

885 if settings.require_user_in_db: 

886 return None 

887 

888 return EmailUser( 

889 email=email, 

890 password_hash=user_dict.get("password_hash", ""), 

891 full_name=user_dict.get("full_name"), 

892 is_admin=False, 

893 is_active=user_dict.get("is_active", True), 

894 auth_provider=user_dict.get("auth_provider", "local"), 

895 password_change_required=user_dict.get("password_change_required", False), 

896 email_verified_at=user_dict.get("email_verified_at"), 

897 created_at=user_dict.get("created_at", datetime.now(timezone.utc)), 

898 updated_at=user_dict.get("updated_at", datetime.now(timezone.utc)), 

899 ) 

900 

901 

902def _get_auth_context_batched_sync(email: str, jti: Optional[str] = None) -> Dict[str, Any]: 

903 """Batched auth context lookup in a single DB session. 

904 

905 Combines what were 3 separate asyncio.to_thread calls into 1: 

906 1. _get_user_by_email_sync - user data 

907 2. _get_personal_team_sync - personal team ID 

908 3. _check_token_revoked_sync - token revocation status 

909 4. _get_user_team_ids - all active team memberships (for session tokens) 

910 

911 This reduces thread pool contention and DB connection overhead. 

912 

913 Args: 

914 email: User email address 

915 jti: JWT ID for revocation check (optional) 

916 

917 Returns: 

918 Dict with keys: user (dict or None), personal_team_id (str or None), 

919 is_token_revoked (bool), team_ids (list of str), team_names (dict) 

920 

921 Examples: 

922 >>> # This function runs in a thread pool 

923 >>> # result = _get_auth_context_batched_sync("test@example.com", "jti-123") 

924 >>> # result["is_token_revoked"] # False if not revoked 

925 """ 

926 with fresh_db_session() as db: 

927 # Third-Party 

928 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

929 

930 # First-Party 

931 from mcpgateway.db import EmailTeam, EmailTeamMember, TokenRevocation # pylint: disable=import-outside-toplevel 

932 

933 result = { 

934 "user": None, 

935 "personal_team_id": None, 

936 "is_token_revoked": False, # nosec B105 - boolean flag, not a password 

937 "team_ids": [], 

938 "team_names": {}, 

939 } 

940 

941 # Query 1: Get user data 

942 user_result = db.execute(select(EmailUser).where(EmailUser.email == email)) 

943 user = user_result.scalar_one_or_none() 

944 

945 if user: 

946 # Detach user data as dict (session will close) 

947 result["user"] = { 

948 "email": user.email, 

949 "password_hash": user.password_hash, 

950 "full_name": user.full_name, 

951 "is_admin": user.is_admin, 

952 "is_active": user.is_active, 

953 "auth_provider": user.auth_provider, 

954 "password_change_required": user.password_change_required, 

955 "email_verified_at": user.email_verified_at, 

956 "created_at": user.created_at, 

957 "updated_at": user.updated_at, 

958 } 

959 

960 # Query 2: Get personal team (only if user exists) 

961 team_result = db.execute( 

962 select(EmailTeam) 

963 .join(EmailTeamMember) 

964 .where( 

965 EmailTeamMember.user_email == email, 

966 EmailTeam.is_personal.is_(True), 

967 ) 

968 ) 

969 personal_team = team_result.scalar_one_or_none() 

970 if personal_team: 

971 result["personal_team_id"] = personal_team.id 

972 

973 # Query 4: Get all active team memberships (for session token team resolution) 

974 team_ids_result = db.execute( 

975 select(EmailTeamMember.team_id, EmailTeam.name) 

976 .join(EmailTeam, EmailTeam.id == EmailTeamMember.team_id) 

977 .where( 

978 EmailTeamMember.user_email == email, 

979 EmailTeamMember.is_active.is_(True), 

980 EmailTeam.is_active.is_(True), 

981 ) 

982 ) 

983 team_rows = team_ids_result.all() 

984 team_ids: list[str] = [] 

985 team_names: dict[str, str] = {} 

986 

987 for row in team_rows: 

988 team_id = None 

989 team_name = None 

990 

991 mapping = getattr(row, "_mapping", None) 

992 if mapping is not None: 

993 team_id = mapping.get("team_id") 

994 team_name = mapping.get("name") 

995 

996 if team_id is None: 

997 team_id = getattr(row, "team_id", None) 

998 if team_name is None: 

999 team_name = getattr(row, "name", None) 

1000 

1001 if team_id is None and isinstance(row, tuple): 

1002 team_id = row[0] if len(row) > 0 else None 

1003 team_name = row[1] if len(row) > 1 else None 

1004 

1005 if not team_id: 

1006 continue 

1007 

1008 team_id_str = str(team_id) 

1009 team_ids.append(team_id_str) 

1010 if team_name: 

1011 team_names[team_id_str] = str(team_name) 

1012 

1013 result["team_ids"] = team_ids 

1014 result["team_names"] = team_names 

1015 

1016 # Query 3: Check token revocation (if JTI provided) 

1017 if jti: 

1018 revoke_result = db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti)) 

1019 result["is_token_revoked"] = revoke_result.scalar_one_or_none() is not None 

1020 

1021 return result 

1022 

1023 

1024def _user_from_cached_dict(user_dict: Dict[str, Any]) -> EmailUser: 

1025 """Create EmailUser instance from cached dict. 

1026 

1027 Args: 

1028 user_dict: User data dictionary from cache 

1029 

1030 Returns: 

1031 EmailUser instance (detached from any session) 

1032 """ 

1033 return EmailUser( 

1034 email=user_dict["email"], 

1035 password_hash=user_dict.get("password_hash", ""), 

1036 full_name=user_dict.get("full_name"), 

1037 is_admin=user_dict.get("is_admin", False), 

1038 is_active=user_dict.get("is_active", True), 

1039 auth_provider=user_dict.get("auth_provider", "local"), 

1040 password_change_required=user_dict.get("password_change_required", False), 

1041 email_verified_at=user_dict.get("email_verified_at"), 

1042 created_at=user_dict.get("created_at", datetime.now(timezone.utc)), 

1043 updated_at=user_dict.get("updated_at", datetime.now(timezone.utc)), 

1044 ) 

1045 

1046 

1047async def get_current_user( 

1048 credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), 

1049 request: Request = None, # type: ignore[assignment] 

1050) -> EmailUser: 

1051 """Get current authenticated user from JWT token with revocation checking. 

1052 

1053 Supports plugin-based custom authentication via HTTP_AUTH_RESOLVE_USER hook. 

1054 

1055 Args: 

1056 credentials: HTTP authorization credentials 

1057 request: Optional request object for plugin hooks 

1058 

1059 Returns: 

1060 EmailUser: Authenticated user 

1061 

1062 Raises: 

1063 HTTPException: If authentication fails 

1064 """ 

1065 logger = logging.getLogger(__name__) 

1066 clear_trace_context() 

1067 

1068 async def _set_auth_method_from_payload(payload: dict) -> None: 

1069 """Set request.state.auth_method based on JWT payload. 

1070 

1071 Args: 

1072 payload: Decoded JWT payload 

1073 """ 

1074 if not request: 

1075 return 

1076 

1077 # NOTE: Cannot use structural check (scopes dict) because email login JWTs 

1078 # also have scopes dict (see email_auth.py:160) 

1079 user_info = payload.get("user", {}) 

1080 auth_provider = user_info.get("auth_provider") 

1081 

1082 if auth_provider == "api_token": 

1083 request.state.auth_method = "api_token" 

1084 jti = payload.get("jti") 

1085 if jti: 

1086 request.state.jti = jti 

1087 try: 

1088 await asyncio.to_thread(_update_api_token_last_used_sync, jti) 

1089 except Exception as e: 

1090 logger.debug(f"Failed to update API token last_used: {e}") 

1091 # Continue authentication - last_used update is not critical 

1092 return 

1093 

1094 if auth_provider: 

1095 # email, oauth, saml, or any other interactive auth provider 

1096 request.state.auth_method = "jwt" 

1097 return 

1098 

1099 # Legacy API token fallback: check if JTI exists in API token table 

1100 # This handles tokens created before auth_provider was added 

1101 jti_for_check = payload.get("jti") 

1102 if jti_for_check: 

1103 is_legacy_api_token = await asyncio.to_thread(_is_api_token_jti_sync, jti_for_check) 

1104 if is_legacy_api_token: 

1105 request.state.auth_method = "api_token" 

1106 request.state.jti = jti_for_check 

1107 logger.debug(f"Legacy API token detected via DB lookup (JTI: ...{jti_for_check[-8:]})") 

1108 try: 

1109 await asyncio.to_thread(_update_api_token_last_used_sync, jti_for_check) 

1110 except Exception as e: 

1111 logger.debug(f"Failed to update legacy API token last_used: {e}") 

1112 # Continue authentication - last_used update is not critical 

1113 else: 

1114 request.state.auth_method = "jwt" 

1115 else: 

1116 # No auth_provider or JTI; default to interactive 

1117 request.state.auth_method = "jwt" 

1118 

1119 def _set_trace_for_user(user_obj: EmailUser, *, teams: Any = _UNSET, auth_method: Optional[str] = None, team_name: Optional[str] = None) -> None: 

1120 """Populate trace context from the resolved user and request state. 

1121 

1122 Args: 

1123 user_obj: Resolved authenticated user object. 

1124 teams: Optional resolved team scope override. When unset, team scope is derived from the user object. 

1125 auth_method: Optional explicit authentication method label to record on the trace. 

1126 team_name: Optional display name for the primary concrete team. 

1127 """ 

1128 resolved_auth_method = auth_method 

1129 if resolved_auth_method is None and request: 

1130 resolved_auth_method = getattr(request.state, "auth_method", None) 

1131 

1132 if teams is not _UNSET: 

1133 set_trace_context_from_teams( 

1134 teams, 

1135 user_email=user_obj.email, 

1136 is_admin=bool(user_obj.is_admin), 

1137 auth_method=resolved_auth_method, 

1138 team_name=team_name, 

1139 ) 

1140 return 

1141 

1142 set_trace_user_email(user_obj.email) 

1143 set_trace_user_is_admin(bool(user_obj.is_admin)) 

1144 if resolved_auth_method: 

1145 set_trace_auth_method(resolved_auth_method) 

1146 if user_obj.is_admin: 

1147 set_trace_team_scope("admin") 

1148 

1149 # NEW: Custom authentication hook - allows plugins to provide alternative auth 

1150 # This hook is invoked BEFORE standard JWT/API token validation 

1151 try: 

1152 # Get plugin manager singleton 

1153 plugin_manager = get_plugin_manager() 

1154 

1155 if plugin_manager and plugin_manager.has_hooks_for(HttpHookType.HTTP_AUTH_RESOLVE_USER): 

1156 # Extract client information 

1157 client_host = None 

1158 client_port = None 

1159 if request and hasattr(request, "client") and request.client: 

1160 client_host = request.client.host 

1161 client_port = request.client.port 

1162 

1163 # Serialize credentials for plugin 

1164 credentials_dict = None 

1165 if credentials: 

1166 credentials_dict = { 

1167 "scheme": credentials.scheme, 

1168 "credentials": credentials.credentials, 

1169 } 

1170 

1171 # Extract headers from request 

1172 # Note: Middleware modifies request.scope["headers"], so request.headers 

1173 # will automatically reflect any modifications made by HTTP_PRE_REQUEST hooks 

1174 headers = {} 

1175 if request and hasattr(request, "headers"): 

1176 headers = dict(request.headers) 

1177 

1178 # Get request ID from correlation ID context (set by CorrelationIDMiddleware) 

1179 request_id = get_correlation_id() 

1180 if not request_id: 

1181 # Fallback chain for safety 

1182 if request and hasattr(request, "state") and hasattr(request.state, "request_id"): 

1183 request_id = request.state.request_id 

1184 else: 

1185 request_id = uuid.uuid4().hex 

1186 logger.debug(f"Generated fallback request ID in get_current_user: {request_id}") 

1187 

1188 # Get plugin contexts from request state if available 

1189 global_context = getattr(request.state, "plugin_global_context", None) if request else None 

1190 if not global_context: 

1191 # Propagate team_id → tenant_id for by_tenant rate limiting 

1192 team_id = getattr(getattr(request, "state", None), "team_id", None) if request else None 

1193 global_context = GlobalContext( 

1194 request_id=request_id, 

1195 server_id=None, 

1196 tenant_id=team_id, 

1197 ) 

1198 

1199 context_table = getattr(request.state, "plugin_context_table", None) if request else None 

1200 

1201 # Invoke custom auth resolution hook 

1202 # violations_as_exceptions=True so PluginViolationError is raised for explicit denials 

1203 auth_result, context_table_result = await plugin_manager.invoke_hook( 

1204 HttpHookType.HTTP_AUTH_RESOLVE_USER, 

1205 payload=HttpAuthResolveUserPayload( 

1206 credentials=credentials_dict, 

1207 headers=HttpHeaderPayload(root=headers), 

1208 client_host=client_host, 

1209 client_port=client_port, 

1210 ), 

1211 global_context=global_context, 

1212 local_contexts=context_table, 

1213 violations_as_exceptions=True, # Raise PluginViolationError for auth denials 

1214 ) 

1215 

1216 # If plugin successfully authenticated user, return it 

1217 if auth_result.modified_payload and isinstance(auth_result.modified_payload, dict): 

1218 logger.info("User authenticated via plugin hook") 

1219 # Resolve plugin claims against DB state so admin flags are authoritative. 

1220 user_dict = auth_result.modified_payload 

1221 user = await asyncio.to_thread(_resolve_plugin_authenticated_user_sync, user_dict) 

1222 

1223 if user is None: 

1224 logger.warning("Plugin auth rejected: user identity could not be resolved against DB policy") 

1225 raise HTTPException( 

1226 status_code=status.HTTP_401_UNAUTHORIZED, 

1227 detail="User not found in database", 

1228 headers={"WWW-Authenticate": "Bearer"}, 

1229 ) 

1230 

1231 if not user.is_active: 

1232 raise HTTPException( 

1233 status_code=status.HTTP_401_UNAUTHORIZED, 

1234 detail="Account disabled", 

1235 headers={"WWW-Authenticate": "Bearer"}, 

1236 ) 

1237 

1238 # Store auth_method in request.state so it can be accessed by RBAC middleware 

1239 if request and auth_result.metadata: 

1240 auth_method = auth_result.metadata.get("auth_method") 

1241 if auth_method: 

1242 request.state.auth_method = auth_method 

1243 logger.debug(f"Stored auth_method '{auth_method}' in request.state") 

1244 

1245 if request and context_table_result: 

1246 request.state.plugin_context_table = context_table_result 

1247 

1248 if request and global_context: 

1249 request.state.plugin_global_context = global_context 

1250 

1251 if plugin_manager and plugin_manager.config.plugin_settings.include_user_info: 

1252 _inject_userinfo_instate(request, user) 

1253 _propagate_tenant_id(request) 

1254 

1255 _set_trace_for_user(user) 

1256 return user 

1257 # If continue_processing=True (no payload), fall through to standard auth 

1258 

1259 except PluginViolationError as e: 

1260 # Plugin explicitly denied authentication with custom message 

1261 logger.warning(f"Authentication denied by plugin: {SecurityValidator.sanitize_log_message(e.message)}") 

1262 raise HTTPException( 

1263 status_code=status.HTTP_401_UNAUTHORIZED, 

1264 detail=e.message, # Use plugin's custom error message 

1265 headers={"WWW-Authenticate": "Bearer"}, 

1266 ) 

1267 except HTTPException: 

1268 # Re-raise HTTP exceptions 

1269 raise 

1270 except Exception as e: 

1271 # Log but don't fail on plugin errors - fall back to standard auth 

1272 logger.warning(f"HTTP_AUTH_RESOLVE_USER hook failed, falling back to standard auth: {SecurityValidator.sanitize_log_message(str(e))}") 

1273 

1274 # EXISTING: Standard authentication (JWT, API tokens) 

1275 if not credentials: 

1276 logger.warning("No credentials provided") 

1277 raise HTTPException( 

1278 status_code=status.HTTP_401_UNAUTHORIZED, 

1279 detail="Authentication required", 

1280 headers={"WWW-Authenticate": "Bearer"}, 

1281 ) 

1282 

1283 logger.debug("Attempting authentication with bearer credentials") 

1284 email = None 

1285 

1286 try: 

1287 # Try JWT token first using the centralized verify_jwt_token_cached function 

1288 logger.debug("Attempting JWT token validation") 

1289 payload = await verify_jwt_token_cached(credentials.credentials, request) 

1290 

1291 logger.debug("JWT token validated successfully") 

1292 # Extract user identifier (support both new and legacy token formats) 

1293 email = payload.get("sub") 

1294 if email is None: 

1295 # Try legacy format 

1296 email = payload.get("email") 

1297 

1298 if email is None: 

1299 logger.debug("No email/sub found in JWT payload") 

1300 raise HTTPException( 

1301 status_code=status.HTTP_401_UNAUTHORIZED, 

1302 detail="Invalid token", 

1303 headers={"WWW-Authenticate": "Bearer"}, 

1304 ) 

1305 

1306 logger.debug("JWT authentication successful for email: %s", email) 

1307 

1308 # Extract JTI for revocation check 

1309 jti = payload.get("jti") 

1310 

1311 # === AUTH CACHING: Check cache before DB queries === 

1312 if settings.auth_cache_enabled: 

1313 try: 

1314 # First-Party 

1315 from mcpgateway.cache.auth_cache import auth_cache, CachedAuthContext # pylint: disable=import-outside-toplevel 

1316 

1317 cached_ctx = await auth_cache.get_auth_context(email, jti) 

1318 if cached_ctx: 

1319 logger.debug(f"Auth cache hit for {email}") 

1320 

1321 # Check revocation from cache 

1322 if cached_ctx.is_token_revoked: 

1323 raise HTTPException( 

1324 status_code=status.HTTP_401_UNAUTHORIZED, 

1325 detail="Token has been revoked", 

1326 headers={"WWW-Authenticate": "Bearer"}, 

1327 ) 

1328 

1329 # Check user active status from cache 

1330 if cached_ctx.user and not cached_ctx.user.get("is_active", True): 

1331 raise HTTPException( 

1332 status_code=status.HTTP_401_UNAUTHORIZED, 

1333 detail="Account disabled", 

1334 headers={"WWW-Authenticate": "Bearer"}, 

1335 ) 

1336 

1337 # Resolve teams based on token_use 

1338 if request: 

1339 token_use = payload.get("token_use") 

1340 request.state.token_use = token_use 

1341 

1342 if token_use == "session": # nosec B105 - Not a password; token_use is a JWT claim type 

1343 # Session token: resolve teams from DB/cache 

1344 user_info = cached_ctx.user or {"is_admin": False} 

1345 teams = await resolve_session_teams(payload, email, user_info) 

1346 else: 

1347 # API token or legacy: use embedded teams 

1348 teams = normalize_token_teams(payload) 

1349 

1350 request.state.token_teams = teams 

1351 

1352 # Set team_id: only for single-team API tokens 

1353 if teams is None: 

1354 request.state.team_id = None 

1355 elif len(teams) == 1 and token_use != "session": # nosec B105 

1356 request.state.team_id = teams[0] if isinstance(teams[0], str) else teams[0].get("id") 

1357 else: 

1358 request.state.team_id = None 

1359 

1360 request.state.trace_team_name = await resolve_trace_team_name(payload, teams) 

1361 

1362 await _set_auth_method_from_payload(payload) 

1363 

1364 # Return user from cache 

1365 if cached_ctx.user: 

1366 # When require_user_in_db is enabled, verify user still exists in DB 

1367 # This prevents stale cache from bypassing strict mode 

1368 if settings.require_user_in_db: 

1369 db_user = await asyncio.to_thread(_get_user_by_email_sync, email) 

1370 if db_user is None: 

1371 logger.warning( 

1372 f"Authentication rejected for {email}: cached user not found in database. " "REQUIRE_USER_IN_DB is enabled.", 

1373 extra={"security_event": "user_not_in_db_rejected", "user_id": email}, 

1374 ) 

1375 raise HTTPException( 

1376 status_code=status.HTTP_401_UNAUTHORIZED, 

1377 detail="User not found in database", 

1378 headers={"WWW-Authenticate": "Bearer"}, 

1379 ) 

1380 

1381 if plugin_manager and plugin_manager.config.plugin_settings.include_user_info: 

1382 _inject_userinfo_instate(request, _user_from_cached_dict(cached_ctx.user)) 

1383 _propagate_tenant_id(request) 

1384 

1385 cached_user = _user_from_cached_dict(cached_ctx.user) 

1386 _set_trace_for_user( 

1387 cached_user, 

1388 teams=getattr(request.state, "token_teams", _UNSET) if request else _UNSET, 

1389 team_name=getattr(request.state, "trace_team_name", None) if request else None, 

1390 ) 

1391 return cached_user 

1392 

1393 # User not in cache but context was (shouldn't happen, but handle it) 

1394 logger.debug("Auth context cached but user missing, falling through to DB") 

1395 

1396 except HTTPException: 

1397 raise 

1398 except Exception as cache_error: 

1399 logger.debug(f"Auth cache check failed, falling through to DB: {cache_error}") 

1400 

1401 # === BATCHED QUERIES: Single DB call for user + team + revocation === 

1402 if settings.auth_cache_batch_queries: 

1403 try: 

1404 auth_ctx = await asyncio.to_thread(_get_auth_context_batched_sync, email, jti) 

1405 

1406 # Check revocation 

1407 if auth_ctx.get("is_token_revoked"): 

1408 raise HTTPException( 

1409 status_code=status.HTTP_401_UNAUTHORIZED, 

1410 detail="Token has been revoked", 

1411 headers={"WWW-Authenticate": "Bearer"}, 

1412 ) 

1413 

1414 # Resolve teams based on token_use 

1415 token_use = payload.get("token_use") 

1416 if token_use == "session": # nosec B105 - Not a password; token_use is a JWT claim type 

1417 # Session token: use team_ids from batched query via resolve_session_teams 

1418 user_dict = auth_ctx.get("user") 

1419 is_admin = user_dict.get("is_admin", False) if user_dict else False 

1420 batch_teams = None if is_admin else auth_ctx.get("team_ids", []) 

1421 teams = await resolve_session_teams(payload, email, {"is_admin": is_admin}, preresolved_db_teams=batch_teams) 

1422 else: 

1423 # API token or legacy: use embedded teams 

1424 teams = normalize_token_teams(payload) 

1425 

1426 # Set team_id: only for single-team API tokens 

1427 if teams is None: 

1428 team_id = None 

1429 elif len(teams) == 1 and token_use != "session": # nosec B105 

1430 team_id = teams[0] if isinstance(teams[0], str) else teams[0].get("id") 

1431 else: 

1432 team_id = None 

1433 

1434 if request: 

1435 request.state.token_teams = teams 

1436 request.state.team_id = team_id 

1437 request.state.token_use = token_use 

1438 request.state.trace_team_name = await resolve_trace_team_name(payload, teams, preresolved_team_names=auth_ctx.get("team_names")) 

1439 await _set_auth_method_from_payload(payload) 

1440 

1441 # Store in cache for future requests 

1442 if settings.auth_cache_enabled: 

1443 try: 

1444 # First-Party 

1445 from mcpgateway.cache.auth_cache import auth_cache, CachedAuthContext # noqa: F811 pylint: disable=import-outside-toplevel 

1446 

1447 await auth_cache.set_auth_context( 

1448 email, 

1449 jti, 

1450 CachedAuthContext( 

1451 user=auth_ctx.get("user"), 

1452 personal_team_id=auth_ctx.get("personal_team_id"), 

1453 is_token_revoked=auth_ctx.get("is_token_revoked", False), 

1454 ), 

1455 ) 

1456 # Also populate teams-list cache so cached-path requests 

1457 # don't need an extra DB query via _resolve_teams_from_db() 

1458 # Cache the raw DB teams (batch_teams), not the narrowed 

1459 # intersection (teams), so that other sessions for the same 

1460 # user see the full membership and can narrow independently. 

1461 if token_use == "session" and batch_teams is not None: # nosec B105 

1462 await auth_cache.set_user_teams(f"{email}:True", batch_teams) 

1463 except Exception as cache_set_error: 

1464 logger.debug(f"Failed to cache auth context: {cache_set_error}") 

1465 

1466 # Create user from batched result 

1467 if auth_ctx.get("user"): 

1468 user_dict = auth_ctx["user"] 

1469 if not user_dict.get("is_active", True): 

1470 raise HTTPException( 

1471 status_code=status.HTTP_401_UNAUTHORIZED, 

1472 detail="Account disabled", 

1473 headers={"WWW-Authenticate": "Bearer"}, 

1474 ) 

1475 # Store user for return at end of function 

1476 # We'll check platform admin case and return below 

1477 _batched_user = _user_from_cached_dict(user_dict) 

1478 else: 

1479 _batched_user = None 

1480 

1481 # Handle user not found case 

1482 if _batched_user is None: 

1483 # Check if strict user-in-DB mode is enabled 

1484 if settings.require_user_in_db: 

1485 logger.warning( 

1486 f"Authentication rejected for {email}: user not found in database. " "REQUIRE_USER_IN_DB is enabled.", 

1487 extra={"security_event": "user_not_in_db_rejected", "user_id": email}, 

1488 ) 

1489 raise HTTPException( 

1490 status_code=status.HTTP_401_UNAUTHORIZED, 

1491 detail="User not found in database", 

1492 headers={"WWW-Authenticate": "Bearer"}, 

1493 ) 

1494 

1495 # Platform admin bootstrap (only when REQUIRE_USER_IN_DB=false) 

1496 if email == getattr(settings, "platform_admin_email", "admin@example.com"): 

1497 logger.info( 

1498 f"Platform admin bootstrap authentication for {email}. " "User authenticated via platform admin configuration.", 

1499 extra={"security_event": "platform_admin_bootstrap", "user_id": email}, 

1500 ) 

1501 _batched_user = EmailUser( 

1502 email=email, 

1503 password_hash="", # nosec B106 

1504 full_name=getattr(settings, "platform_admin_full_name", "Platform Administrator"), 

1505 is_admin=True, 

1506 is_active=True, 

1507 auth_provider="local", 

1508 password_change_required=False, 

1509 email_verified_at=datetime.now(timezone.utc), 

1510 created_at=datetime.now(timezone.utc), 

1511 updated_at=datetime.now(timezone.utc), 

1512 ) 

1513 else: 

1514 raise HTTPException( 

1515 status_code=status.HTTP_401_UNAUTHORIZED, 

1516 detail="User not found", 

1517 headers={"WWW-Authenticate": "Bearer"}, 

1518 ) 

1519 

1520 if plugin_manager and plugin_manager.config.plugin_settings.include_user_info: 

1521 _inject_userinfo_instate(request, _batched_user) 

1522 _propagate_tenant_id(request) 

1523 

1524 _set_trace_for_user( 

1525 _batched_user, 

1526 teams=getattr(request.state, "token_teams", _UNSET) if request else _UNSET, 

1527 team_name=getattr(request.state, "trace_team_name", None) if request else None, 

1528 ) 

1529 return _batched_user 

1530 

1531 except HTTPException: 

1532 raise 

1533 except Exception as batch_error: 

1534 logger.warning(f"Batched auth query failed, falling back to individual queries: {batch_error}") 

1535 

1536 # === FALLBACK: Original individual queries (if batching disabled or failed) === 

1537 if jti: 

1538 try: 

1539 is_revoked = await asyncio.to_thread(_check_token_revoked_sync, jti) 

1540 if is_revoked: 

1541 raise HTTPException( 

1542 status_code=status.HTTP_401_UNAUTHORIZED, 

1543 detail="Token has been revoked", 

1544 headers={"WWW-Authenticate": "Bearer"}, 

1545 ) 

1546 except HTTPException: 

1547 raise 

1548 except Exception as revoke_check_error: 

1549 # Fail-secure: if the revocation check itself errors, reject the token. 

1550 # Allowing through on error would let revoked tokens bypass enforcement 

1551 # when the DB is unreachable or the table is missing. 

1552 logger.warning( 

1553 f"Token revocation check failed for JTI {SecurityValidator.sanitize_log_message(jti)} — denying access (fail-secure): {SecurityValidator.sanitize_log_message(str(revoke_check_error))}" 

1554 ) 

1555 raise HTTPException( 

1556 status_code=status.HTTP_401_UNAUTHORIZED, 

1557 detail="Token validation failed", 

1558 headers={"WWW-Authenticate": "Bearer"}, 

1559 ) 

1560 

1561 # Resolve teams based on token_use 

1562 token_use = payload.get("token_use") 

1563 if token_use == "session": # nosec B105 - Not a password; token_use is a JWT claim type 

1564 # Session token: resolve teams from DB/cache (fallback path — separate query OK) 

1565 user_info = {"is_admin": payload.get("is_admin", False) or payload.get("user", {}).get("is_admin", False)} 

1566 normalized_teams = await resolve_session_teams(payload, email, user_info) 

1567 else: 

1568 # API token or legacy: use embedded teams 

1569 normalized_teams = normalize_token_teams(payload) 

1570 

1571 # Set team_id: only for single-team API tokens 

1572 if normalized_teams is None: 

1573 team_id = None 

1574 elif len(normalized_teams) == 1 and token_use != "session": # nosec B105 

1575 team_id = normalized_teams[0] if isinstance(normalized_teams[0], str) else normalized_teams[0].get("id") 

1576 else: 

1577 team_id = None 

1578 

1579 if request: 

1580 request.state.token_teams = normalized_teams 

1581 request.state.team_id = team_id 

1582 request.state.token_use = token_use 

1583 request.state.trace_team_name = await resolve_trace_team_name(payload, normalized_teams) 

1584 # Store JTI for use in middleware (e.g., token usage logging) 

1585 if jti: 

1586 request.state.jti = jti 

1587 await _set_auth_method_from_payload(payload) 

1588 

1589 except HTTPException: 

1590 # Re-raise HTTPException from verify_jwt_token (handles expired/invalid tokens) 

1591 raise 

1592 except Exception as jwt_error: 

1593 # JWT validation failed, try database API token 

1594 # Uses fresh DB session via asyncio.to_thread to avoid blocking event loop 

1595 logger.debug("JWT validation failed with error: %s, trying database API token", jwt_error) 

1596 try: 

1597 token_hash = hashlib.sha256(credentials.credentials.encode()).hexdigest() 

1598 

1599 # Lookup API token using fresh session in thread pool 

1600 api_token_info = await asyncio.to_thread(_lookup_api_token_sync, token_hash) 

1601 logger.debug(f"Database lookup result: {api_token_info is not None}") 

1602 

1603 if api_token_info: 

1604 # Check for error conditions returned by helper 

1605 if api_token_info.get("expired"): 

1606 raise HTTPException( 

1607 status_code=status.HTTP_401_UNAUTHORIZED, 

1608 detail="API token expired", 

1609 headers={"WWW-Authenticate": "Bearer"}, 

1610 ) 

1611 

1612 if api_token_info.get("revoked"): 

1613 raise HTTPException( 

1614 status_code=status.HTTP_401_UNAUTHORIZED, 

1615 detail="API token has been revoked", 

1616 headers={"WWW-Authenticate": "Bearer"}, 

1617 ) 

1618 

1619 # Use the email from the API token 

1620 email = api_token_info["user_email"] 

1621 logger.debug(f"API token authentication successful for email: {email}") 

1622 

1623 # Set auth_method for database API tokens 

1624 if request: 

1625 request.state.auth_method = "api_token" 

1626 request.state.user_email = api_token_info["user_email"] 

1627 # Store JTI for use in middleware 

1628 if "jti" in api_token_info: 

1629 request.state.jti = api_token_info["jti"] 

1630 else: 

1631 logger.debug("API token not found in database") 

1632 logger.debug("No valid authentication method found") 

1633 # Neither JWT nor API token worked 

1634 raise HTTPException( 

1635 status_code=status.HTTP_401_UNAUTHORIZED, 

1636 detail="Invalid authentication credentials", 

1637 headers={"WWW-Authenticate": "Bearer"}, 

1638 ) 

1639 except HTTPException: 

1640 # Re-raise HTTP exceptions 

1641 raise 

1642 except Exception as e: 

1643 # Neither JWT nor API token validation worked 

1644 logger.debug(f"Database API token validation failed with exception: {SecurityValidator.sanitize_log_message(str(e))}") 

1645 raise HTTPException( 

1646 status_code=status.HTTP_401_UNAUTHORIZED, 

1647 detail="Invalid authentication credentials", 

1648 headers={"WWW-Authenticate": "Bearer"}, 

1649 ) 

1650 

1651 # Get user from database using fresh session in thread pool 

1652 user = await asyncio.to_thread(_get_user_by_email_sync, email) 

1653 

1654 if user is None: 

1655 # Check if strict user-in-DB mode is enabled 

1656 if settings.require_user_in_db: 

1657 logger.warning( 

1658 f"Authentication rejected for {email}: user not found in database. " "REQUIRE_USER_IN_DB is enabled.", 

1659 extra={"security_event": "user_not_in_db_rejected", "user_id": email}, 

1660 ) 

1661 raise HTTPException( 

1662 status_code=status.HTTP_401_UNAUTHORIZED, 

1663 detail="User not found in database", 

1664 headers={"WWW-Authenticate": "Bearer"}, 

1665 ) 

1666 

1667 # Platform admin bootstrap (only when REQUIRE_USER_IN_DB=false) 

1668 # If user doesn't exist but token is valid and email matches platform admin, 

1669 # create a virtual admin user object 

1670 if email == getattr(settings, "platform_admin_email", "admin@example.com"): 

1671 logger.info( 

1672 f"Platform admin bootstrap authentication for {email}. " "User authenticated via platform admin configuration.", 

1673 extra={"security_event": "platform_admin_bootstrap", "user_id": email}, 

1674 ) 

1675 # Create a virtual admin user for authentication purposes 

1676 user = EmailUser( 

1677 email=email, 

1678 password_hash="", # nosec B106 - Not used for JWT authentication 

1679 full_name=getattr(settings, "platform_admin_full_name", "Platform Administrator"), 

1680 is_admin=True, 

1681 is_active=True, 

1682 auth_provider="local", 

1683 password_change_required=False, 

1684 email_verified_at=datetime.now(timezone.utc), 

1685 created_at=datetime.now(timezone.utc), 

1686 updated_at=datetime.now(timezone.utc), 

1687 ) 

1688 else: 

1689 raise HTTPException( 

1690 status_code=status.HTTP_401_UNAUTHORIZED, 

1691 detail="User not found", 

1692 headers={"WWW-Authenticate": "Bearer"}, 

1693 ) 

1694 

1695 if not user.is_active: 

1696 raise HTTPException( 

1697 status_code=status.HTTP_401_UNAUTHORIZED, 

1698 detail="Account disabled", 

1699 headers={"WWW-Authenticate": "Bearer"}, 

1700 ) 

1701 

1702 if plugin_manager and plugin_manager.config.plugin_settings.include_user_info: 

1703 _inject_userinfo_instate(request, user) 

1704 _propagate_tenant_id(request) 

1705 

1706 trace_teams = getattr(request.state, "token_teams", _UNSET) if request else _UNSET 

1707 _set_trace_for_user(user, teams=trace_teams, team_name=getattr(request.state, "trace_team_name", None) if request else None) 

1708 return user 

1709 

1710 

1711def _propagate_tenant_id(request: Optional[object] = None) -> None: 

1712 """Propagate request.state.team_id into GlobalContext.tenant_id for rate limiting. 

1713 

1714 Called unconditionally at every return path in get_current_user() — unlike 

1715 _inject_userinfo_instate() which is gated by include_user_info. This 

1716 ensures by_tenant rate limiting works even when include_user_info is False 

1717 (the default) and the middleware has already created plugin_global_context. 

1718 

1719 Only writes when tenant_id is still None (no overwrite of plugin-set values). 

1720 

1721 Args: 

1722 request: The incoming request object, or ``None`` if unavailable. 

1723 """ 

1724 if not request: 

1725 return 

1726 global_context = getattr(getattr(request, "state", None), "plugin_global_context", None) 

1727 if global_context and global_context.tenant_id is None: 

1728 team_id = getattr(getattr(request, "state", None), "team_id", None) 

1729 if team_id: 

1730 global_context.tenant_id = team_id 

1731 

1732 

1733def _inject_userinfo_instate(request: Optional[object] = None, user: Optional[EmailUser] = None) -> None: 

1734 """This function injects user related information into the plugin_global_context, if the config has 

1735 include_user_info key set as true. 

1736 

1737 Args: 

1738 request: Optional request object for plugin hooks 

1739 user: User related information 

1740 """ 

1741 

1742 logger = logging.getLogger(__name__) 

1743 # Get request ID from correlation ID context (set by CorrelationIDMiddleware) 

1744 request_id = get_correlation_id() 

1745 if not request_id: 

1746 # Fallback chain for safety 

1747 if request and hasattr(request, "state") and hasattr(request.state, "request_id"): 

1748 request_id = request.state.request_id 

1749 else: 

1750 request_id = uuid.uuid4().hex 

1751 logger.debug(f"Generated fallback request ID in get_current_user: {request_id}") 

1752 

1753 # Get plugin contexts from request state if available 

1754 global_context = getattr(request.state, "plugin_global_context", None) if request else None 

1755 if not global_context: 

1756 # Create global context 

1757 global_context = GlobalContext( 

1758 request_id=request_id, 

1759 server_id=None, 

1760 tenant_id=None, 

1761 ) 

1762 

1763 if user: 

1764 if not global_context.user: 

1765 global_context.user = {} 

1766 global_context.user["email"] = user.email 

1767 global_context.user["is_admin"] = user.is_admin 

1768 global_context.user["full_name"] = user.full_name 

1769 

1770 if request and global_context: 

1771 request.state.plugin_global_context = global_context