Coverage for mcpgateway / auth.py: 99%

556 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/auth.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Shared authentication utilities. 

8 

9This module provides common authentication functions that can be shared 

10across different parts of the application without creating circular imports. 

11""" 

12 

13# Standard 

14import asyncio 

15from datetime import datetime, timezone 

16import hashlib 

17import logging 

18import threading 

19from typing import Any, Dict, Generator, List, Never, Optional 

20import uuid 

21 

22# Third-Party 

23from fastapi import Depends, HTTPException, status 

24from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer 

25from sqlalchemy.orm import Session 

26from starlette.requests import Request 

27 

28# First-Party 

29from mcpgateway.config import settings 

30from mcpgateway.db import EmailUser, fresh_db_session, SessionLocal 

31from mcpgateway.plugins.framework import get_plugin_manager, GlobalContext, HttpAuthResolveUserPayload, HttpHeaderPayload, HttpHookType, PluginViolationError 

32from mcpgateway.utils.correlation_id import get_correlation_id 

33from mcpgateway.utils.verify_credentials import verify_jwt_token_cached 

34 

35# Security scheme 

36security = HTTPBearer(auto_error=False) 

37 

38# Module-level sync Redis client for rate-limiting (lazy-initialized) 

39_SYNC_REDIS_CLIENT = None # pylint: disable=invalid-name 

40_SYNC_REDIS_LOCK = threading.Lock() 

41_SYNC_REDIS_FAILURE_TIME: Optional[float] = None # Backoff after connection failures 

42 

43# Module-level in-memory cache for last_used rate-limiting (fallback when Redis unavailable) 

44_LAST_USED_CACHE: dict = {} 

45_LAST_USED_CACHE_LOCK = threading.Lock() 

46 

47 

48def _log_auth_event( 

49 logger: logging.Logger, 

50 message: str, 

51 level: int = logging.INFO, 

52 user_id: Optional[str] = None, 

53 auth_method: Optional[str] = None, 

54 auth_success: bool = False, 

55 security_event: Optional[str] = None, 

56 security_severity: str = "low", 

57 **extra_context, 

58) -> None: 

59 """Log authentication event with structured context and request_id. 

60 

61 This helper creates structured log records that include request_id from the 

62 correlation ID context, enabling end-to-end tracing of authentication flows. 

63 

64 Args: 

65 logger: Logger instance to use 

66 message: Log message 

67 level: Log level (default: INFO) 

68 user_id: User identifier 

69 auth_method: Authentication method used (jwt, api_token, etc.) 

70 auth_success: Whether authentication succeeded 

71 security_event: Type of security event (authentication, authorization, etc.) 

72 security_severity: Severity level (low, medium, high, critical) 

73 **extra_context: Additional context fields 

74 """ 

75 # Get request_id from correlation ID context 

76 request_id = get_correlation_id() 

77 

78 # Build structured log record 

79 extra = { 

80 "request_id": request_id, 

81 "entity_type": "auth", 

82 "auth_success": auth_success, 

83 "security_event": security_event or "authentication", 

84 "security_severity": security_severity, 

85 } 

86 

87 if user_id: 

88 extra["user_id"] = user_id 

89 if auth_method: 

90 extra["auth_method"] = auth_method 

91 

92 # Add any additional context 

93 extra.update(extra_context) 

94 

95 # Log with structured context 

96 logger.log(level, message, extra=extra) 

97 

98 

99def get_db() -> Generator[Session, Never, None]: 

100 """Database dependency. 

101 

102 Commits the transaction on successful completion to avoid implicit rollbacks 

103 for read-only operations. Rolls back explicitly on exception. 

104 

105 Yields: 

106 Session: SQLAlchemy database session 

107 

108 Raises: 

109 Exception: Re-raises any exception after rolling back the transaction. 

110 

111 Examples: 

112 >>> db_gen = get_db() 

113 >>> db = next(db_gen) 

114 >>> hasattr(db, 'query') 

115 True 

116 >>> hasattr(db, 'close') 

117 True 

118 """ 

119 db = SessionLocal() 

120 try: 

121 yield db 

122 db.commit() 

123 except Exception: 

124 try: 

125 db.rollback() 

126 except Exception: 

127 try: 

128 db.invalidate() 

129 except Exception: 

130 pass # nosec B110 - Best effort cleanup on connection failure 

131 raise 

132 finally: 

133 db.close() 

134 

135 

136def _get_personal_team_sync(user_email: str) -> Optional[str]: 

137 """Synchronous helper to get user's personal team using a fresh DB session. 

138 

139 This runs in a thread pool to avoid blocking the event loop. 

140 

141 Args: 

142 user_email: The user's email address. 

143 

144 Returns: 

145 The personal team ID, or None if not found. 

146 """ 

147 with fresh_db_session() as db: 

148 # Third-Party 

149 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

150 

151 # First-Party 

152 from mcpgateway.db import EmailTeam, EmailTeamMember # pylint: disable=import-outside-toplevel 

153 

154 result = db.execute(select(EmailTeam).join(EmailTeamMember).where(EmailTeamMember.user_email == user_email, EmailTeam.is_personal.is_(True))) 

155 personal_team = result.scalar_one_or_none() 

156 return personal_team.id if personal_team else None 

157 

158 

159def _get_user_team_ids_sync(email: str) -> List[str]: 

160 """Query all active team IDs for a user (including personal teams). 

161 

162 Uses a fresh DB session so this can be called from thread pool. 

163 Matches the behavior of user.get_teams() which returns all active memberships. 

164 

165 Args: 

166 email: User email address 

167 

168 Returns: 

169 List of team ID strings 

170 """ 

171 with fresh_db_session() as db: 

172 # Third-Party 

173 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

174 

175 # First-Party 

176 from mcpgateway.db import EmailTeamMember # pylint: disable=import-outside-toplevel 

177 

178 result = db.execute( 

179 select(EmailTeamMember.team_id).where( 

180 EmailTeamMember.user_email == email, 

181 EmailTeamMember.is_active.is_(True), 

182 ) 

183 ) 

184 return [row[0] for row in result.all()] 

185 

186 

187def get_user_team_roles(db, user_email: str) -> Dict[str, str]: 

188 """Return a {team_id: role} mapping for a user's active team memberships. 

189 

190 Args: 

191 db: SQLAlchemy database session. 

192 user_email: Email address of the user to query memberships for. 

193 

194 Returns: 

195 Dict mapping team_id to the user's role in that team. 

196 Returns empty dict on DB errors (safe default — headers stay masked). 

197 """ 

198 try: 

199 # First-Party 

200 from mcpgateway.db import EmailTeamMember # pylint: disable=import-outside-toplevel 

201 

202 rows = db.query(EmailTeamMember.team_id, EmailTeamMember.role).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).all() 

203 return {r.team_id: r.role for r in rows} 

204 except Exception: 

205 return {} 

206 

207 

208def _resolve_teams_from_db_sync(email: str, is_admin: bool) -> Optional[List[str]]: 

209 """Resolve teams synchronously with L1 cache support. 

210 

211 Used by StreamableHTTP transport which runs in a sync context. 

212 Checks the in-memory L1 cache before falling back to DB. 

213 

214 Args: 

215 email: User email address 

216 is_admin: Whether the user is an admin 

217 

218 Returns: 

219 None (admin bypass), [] (no teams), or list of team ID strings 

220 """ 

221 if is_admin: 

222 return None # Admin bypass 

223 

224 cache_key = f"{email}:True" 

225 

226 # Check L1 in-memory cache (sync-safe, no network I/O) 

227 try: 

228 # First-Party 

229 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

230 

231 entry = auth_cache._teams_list_cache.get(cache_key) # pylint: disable=protected-access 

232 if entry and not entry.is_expired(): 

233 auth_cache._hit_count += 1 # pylint: disable=protected-access 

234 return entry.value 

235 except Exception: # nosec B110 - Cache unavailable is non-fatal 

236 pass 

237 

238 # Cache miss: query DB 

239 team_ids = _get_user_team_ids_sync(email) 

240 

241 # Populate L1 cache for subsequent requests 

242 try: 

243 # Standard 

244 import time # pylint: disable=import-outside-toplevel 

245 

246 # First-Party 

247 from mcpgateway.cache.auth_cache import auth_cache, CacheEntry # pylint: disable=import-outside-toplevel 

248 

249 with auth_cache._lock: # pylint: disable=protected-access 

250 auth_cache._teams_list_cache[cache_key] = CacheEntry( # pylint: disable=protected-access 

251 value=team_ids, 

252 expiry=time.time() + auth_cache._teams_list_ttl, # pylint: disable=protected-access 

253 ) 

254 except Exception: # nosec B110 - Cache write failure is non-fatal 

255 pass 

256 

257 return team_ids 

258 

259 

260async def _resolve_teams_from_db(email: str, user_info) -> Optional[List[str]]: 

261 """Resolve teams for session tokens from DB/cache. 

262 

263 For admin users, returns None (admin bypass). 

264 For non-admin users, returns the full list of team IDs from DB/cache. 

265 

266 Args: 

267 email: User email address 

268 user_info: User dict or EmailUser instance 

269 

270 Returns: 

271 None (admin bypass), [] (no teams), or list of team ID strings 

272 """ 

273 is_admin = user_info.get("is_admin", False) if isinstance(user_info, dict) else getattr(user_info, "is_admin", False) 

274 if is_admin: 

275 return None # Admin bypass 

276 

277 # Try auth cache first 

278 try: 

279 # First-Party 

280 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

281 

282 cached_teams = await auth_cache.get_user_teams(f"{email}:True") 

283 if cached_teams is not None: 

284 return cached_teams 

285 except Exception: # nosec B110 - Cache unavailable is non-fatal, fall through to DB 

286 pass 

287 

288 # Cache miss: query DB 

289 team_ids = await asyncio.to_thread(_get_user_team_ids_sync, email) 

290 

291 # Cache the result 

292 try: 

293 # First-Party 

294 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel 

295 

296 await auth_cache.set_user_teams(f"{email}:True", team_ids) 

297 except Exception: # nosec B110 - Cache write failure is non-fatal 

298 pass 

299 

300 return team_ids 

301 

302 

303def normalize_token_teams(payload: Dict[str, Any]) -> Optional[List[str]]: 

304 """ 

305 Normalize token teams to a canonical form for consistent security checks. 

306 

307 SECURITY: This is the single source of truth for token team normalization. 

308 All code paths that read token teams should use this function. 

309 

310 Rules: 

311 - "teams" key missing → [] (public-only, secure default) 

312 - "teams" is null + is_admin=true → None (admin bypass, sees all) 

313 - "teams" is null + is_admin=false → [] (public-only, no bypass for non-admins) 

314 - "teams" is [] → [] (explicit public-only) 

315 - "teams" is [...] → normalized list of string IDs 

316 

317 Args: 

318 payload: The JWT payload dict 

319 

320 Returns: 

321 None for admin bypass, [] for public-only, or list of normalized team ID strings 

322 """ 

323 # Check if "teams" key exists (distinguishes missing from explicit null) 

324 if "teams" not in payload: 

325 # Missing teams key → public-only (secure default) 

326 return [] 

327 

328 teams = payload.get("teams") 

329 

330 if teams is None: 

331 # Explicit null - only allow admin bypass if is_admin is true 

332 # Check BOTH top-level is_admin AND nested user.is_admin 

333 is_admin = payload.get("is_admin", False) 

334 if not is_admin: 

335 user_info = payload.get("user", {}) 

336 is_admin = user_info.get("is_admin", False) if isinstance(user_info, dict) else False 

337 if is_admin: 

338 # Admin with explicit null teams → admin bypass (sees all) 

339 return None 

340 # Non-admin with null teams → public-only (no bypass) 

341 return [] 

342 

343 # teams is a list - normalize to string IDs 

344 # Handle both dict format [{"id": "team1"}] and string format ["team1"] 

345 normalized: List[str] = [] 

346 for team in teams: 

347 if isinstance(team, dict): 

348 team_id = team.get("id") 

349 if team_id: 

350 normalized.append(str(team_id)) 

351 elif isinstance(team, str): 

352 normalized.append(team) 

353 return normalized 

354 

355 

356async def get_team_from_token(payload: Dict[str, Any]) -> Optional[str]: 

357 """ 

358 Extract the team ID from an authentication token payload. 

359 

360 SECURITY: This function uses secure-first defaults: 

361 - Missing teams key = public-only (no personal team fallback) 

362 - Empty teams list = public-only (no team access) 

363 - Teams with values = use first team ID 

364 

365 This prevents privilege escalation where missing claims could grant 

366 unintended team access. 

367 

368 Args: 

369 payload (Dict[str, Any]): 

370 The token payload. Expected fields: 

371 - "sub" (str): The user's unique identifier (email). 

372 - "teams" (List[str], optional): List containing team ID. 

373 

374 Returns: 

375 Optional[str]: 

376 The resolved team ID. Returns `None` if teams is missing or empty. 

377 

378 Examples: 

379 >>> import asyncio 

380 >>> # --- Case 1: Token has team --- 

381 >>> payload = {"sub": "user@example.com", "teams": ["team_456"]} 

382 >>> asyncio.run(get_team_from_token(payload)) 

383 'team_456' 

384 

385 >>> # --- Case 2: Token has explicit empty teams (public-only) --- 

386 >>> payload = {"sub": "user@example.com", "teams": []} 

387 >>> asyncio.run(get_team_from_token(payload)) # Returns None 

388 >>> # None 

389 

390 >>> # --- Case 3: Token has no teams key (secure default) --- 

391 >>> payload = {"sub": "user@example.com"} 

392 >>> asyncio.run(get_team_from_token(payload)) # Returns None 

393 >>> # None 

394 """ 

395 teams = payload.get("teams") 

396 

397 # SECURITY: Treat missing teams as public-only (secure default) 

398 # - teams is None (missing key): Public-only (secure default, no legacy fallback) 

399 # - teams == [] (explicit empty list): Public-only, no team access 

400 # - teams == [...] (has teams): Use first team 

401 # Admin bypass is handled separately via is_admin flag in token, not via missing teams 

402 if teams is None or len(teams) == 0: 

403 # Missing teams or explicit empty = public-only, no fallback to personal team 

404 return None 

405 

406 # Has teams - use the first one 

407 team_id = teams[0] 

408 if isinstance(team_id, dict): 

409 team_id = team_id.get("id") 

410 return team_id 

411 

412 

413def _check_token_revoked_sync(jti: str) -> bool: 

414 """Synchronous helper to check if a token is revoked. 

415 

416 This runs in a thread pool to avoid blocking the event loop. 

417 

418 Args: 

419 jti: The JWT ID to check. 

420 

421 Returns: 

422 True if the token is revoked, False otherwise. 

423 """ 

424 with fresh_db_session() as db: 

425 # Third-Party 

426 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

427 

428 # First-Party 

429 from mcpgateway.db import TokenRevocation # pylint: disable=import-outside-toplevel 

430 

431 result = db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti)) 

432 return result.scalar_one_or_none() is not None 

433 

434 

435def _lookup_api_token_sync(token_hash: str) -> Optional[Dict[str, Any]]: 

436 """Synchronous helper to look up an API token by hash. 

437 

438 This runs in a thread pool to avoid blocking the event loop. 

439 

440 Args: 

441 token_hash: SHA256 hash of the API token. 

442 

443 Returns: 

444 Dict with token info if found and active, None otherwise. 

445 """ 

446 with fresh_db_session() as db: 

447 # Third-Party 

448 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

449 

450 # First-Party 

451 from mcpgateway.db import EmailApiToken, utc_now # pylint: disable=import-outside-toplevel 

452 

453 result = db.execute(select(EmailApiToken).where(EmailApiToken.token_hash == token_hash, EmailApiToken.is_active.is_(True))) 

454 api_token = result.scalar_one_or_none() 

455 

456 if api_token: 

457 # Check expiration 

458 if api_token.expires_at and api_token.expires_at < datetime.now(timezone.utc): 

459 return {"expired": True} 

460 

461 # Check revocation 

462 # First-Party 

463 from mcpgateway.db import TokenRevocation # pylint: disable=import-outside-toplevel 

464 

465 revoke_result = db.execute(select(TokenRevocation).where(TokenRevocation.jti == api_token.jti)) 

466 if revoke_result.scalar_one_or_none() is not None: 

467 return {"revoked": True} 

468 

469 # Update last_used timestamp 

470 api_token.last_used = utc_now() 

471 db.commit() 

472 

473 return { 

474 "user_email": api_token.user_email, 

475 "jti": api_token.jti, 

476 } 

477 return None 

478 

479 

480def _get_sync_redis_client(): 

481 """Get or create module-level synchronous Redis client for rate-limiting. 

482 

483 Returns: 

484 Redis client or None if Redis is unavailable/disabled. 

485 """ 

486 global _SYNC_REDIS_CLIENT, _SYNC_REDIS_FAILURE_TIME # pylint: disable=global-statement 

487 

488 # Standard 

489 import logging as log # pylint: disable=import-outside-toplevel,reimported 

490 import time # pylint: disable=import-outside-toplevel 

491 

492 # First-Party 

493 from mcpgateway.config import settings as config_settings # pylint: disable=import-outside-toplevel,reimported 

494 

495 # Quick check without lock 

496 if _SYNC_REDIS_CLIENT is not None or not (config_settings.redis_url and config_settings.redis_url.strip() and config_settings.cache_type == "redis"): 

497 return _SYNC_REDIS_CLIENT 

498 

499 # Backoff after recent failure (30 seconds) 

500 if _SYNC_REDIS_FAILURE_TIME and (time.time() - _SYNC_REDIS_FAILURE_TIME < 30): 

501 return None 

502 

503 # Lazy initialization with lock 

504 with _SYNC_REDIS_LOCK: 

505 # Double-check after acquiring lock 

506 if _SYNC_REDIS_CLIENT is not None: 

507 return _SYNC_REDIS_CLIENT 

508 

509 try: 

510 # Third-Party 

511 import redis # pylint: disable=import-outside-toplevel 

512 

513 _SYNC_REDIS_CLIENT = redis.from_url(config_settings.redis_url, decode_responses=True, socket_connect_timeout=2, socket_timeout=2) 

514 # Test connection 

515 _SYNC_REDIS_CLIENT.ping() 

516 _SYNC_REDIS_FAILURE_TIME = None # Clear failure state on success 

517 log.getLogger(__name__).debug("Sync Redis client initialized for API token rate-limiting") 

518 except Exception as e: 

519 log.getLogger(__name__).debug(f"Sync Redis client unavailable: {e}") 

520 _SYNC_REDIS_CLIENT = None 

521 _SYNC_REDIS_FAILURE_TIME = time.time() 

522 

523 return _SYNC_REDIS_CLIENT 

524 

525 

526def _update_api_token_last_used_sync(jti: str) -> None: 

527 """Update last_used timestamp for an API token with rate-limiting. 

528 

529 This function is called when an API token is successfully validated via JWT, 

530 ensuring the last_used field stays current for monitoring and security audits. 

531 

532 Rate-limiting: Uses Redis cache (or in-memory fallback) to track the last 

533 update time and only writes to the database if the configured interval has 

534 elapsed. This prevents excessive DB writes on high-traffic tokens. 

535 

536 Args: 

537 jti: JWT ID of the API token 

538 

539 Note: 

540 Called via asyncio.to_thread() to avoid blocking the event loop. 

541 Uses fresh_db_session() for thread-safe database access. 

542 """ 

543 # Standard 

544 import time # pylint: disable=import-outside-toplevel,redefined-outer-name 

545 

546 # First-Party 

547 from mcpgateway.config import settings as config_settings # pylint: disable=import-outside-toplevel,reimported 

548 

549 # Rate-limiting cache key 

550 cache_key = f"api_token_last_used:{jti}" 

551 update_interval_seconds = config_settings.token_last_used_update_interval_minutes * 60 

552 

553 # Try Redis rate-limiting first (if available) 

554 redis_client = _get_sync_redis_client() 

555 if redis_client: 

556 try: 

557 last_update = redis_client.get(cache_key) 

558 if last_update: 

559 # Check if enough time has elapsed 

560 time_since_update = time.time() - float(last_update) 

561 if time_since_update < update_interval_seconds: 

562 return # Skip update - too soon 

563 

564 # Update DB and cache 

565 with fresh_db_session() as db: 

566 # Third-Party 

567 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

568 

569 # First-Party 

570 from mcpgateway.db import EmailApiToken, utc_now # pylint: disable=import-outside-toplevel 

571 

572 result = db.execute(select(EmailApiToken).where(EmailApiToken.jti == jti)) 

573 api_token = result.scalar_one_or_none() 

574 if api_token: 

575 api_token.last_used = utc_now() 

576 db.commit() 

577 # Update Redis cache with current timestamp 

578 redis_client.setex(cache_key, update_interval_seconds * 2, str(time.time())) 

579 return 

580 except Exception as exc: 

581 # Redis failed, fall through to in-memory cache 

582 logger = logging.getLogger(__name__) 

583 logger.debug("Redis unavailable for API token rate-limiting, using in-memory fallback: %s", exc) 

584 

585 # Fallback: In-memory cache (module-level dict with threading.Lock for thread-safety) 

586 # Note: This is per-process and won't work in multi-worker deployments 

587 # but provides basic rate-limiting when Redis is unavailable 

588 max_cache_size = 1024 # Prevent unbounded growth 

589 

590 with _LAST_USED_CACHE_LOCK: 

591 last_update = _LAST_USED_CACHE.get(jti) 

592 if last_update: 

593 time_since_update = time.time() - last_update 

594 if time_since_update < update_interval_seconds: 

595 return # Skip update - too soon 

596 

597 # Update DB and cache 

598 with fresh_db_session() as db: 

599 # Third-Party 

600 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

601 

602 # First-Party 

603 from mcpgateway.db import EmailApiToken, utc_now # pylint: disable=import-outside-toplevel 

604 

605 result = db.execute(select(EmailApiToken).where(EmailApiToken.jti == jti)) 

606 api_token = result.scalar_one_or_none() 

607 if api_token: 

608 api_token.last_used = utc_now() 

609 db.commit() 

610 # Update in-memory cache (with lock for thread-safety) 

611 with _LAST_USED_CACHE_LOCK: 

612 if len(_LAST_USED_CACHE) >= max_cache_size: 

613 # Evict oldest entries (by timestamp value) 

614 sorted_keys = sorted(_LAST_USED_CACHE, key=_LAST_USED_CACHE.get) # type: ignore[arg-type] 

615 for k in sorted_keys[: len(_LAST_USED_CACHE) // 2]: 

616 del _LAST_USED_CACHE[k] 

617 _LAST_USED_CACHE[jti] = time.time() 

618 

619 

620def _is_api_token_jti_sync(jti: str) -> bool: 

621 """Check if JTI belongs to an API token (legacy fallback) - SYNC version. 

622 

623 Used for tokens created before auth_provider was added to the payload. 

624 Called via asyncio.to_thread() to avoid blocking the event loop. 

625 

626 SECURITY: Fail-closed on DB errors. If we can't verify the token isn't 

627 an API token, treat it as one to preserve the hard-block policy. 

628 

629 Args: 

630 jti: JWT ID to check 

631 

632 Returns: 

633 bool: True if JTI exists in email_api_tokens table OR if lookup fails 

634 """ 

635 # Third-Party 

636 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

637 

638 # First-Party 

639 from mcpgateway.db import EmailApiToken # pylint: disable=import-outside-toplevel 

640 

641 try: 

642 with fresh_db_session() as db: 

643 result = db.execute(select(EmailApiToken.id).where(EmailApiToken.jti == jti).limit(1)) 

644 return result.scalar_one_or_none() is not None 

645 except Exception as e: 

646 logging.getLogger(__name__).warning(f"Legacy API token check failed, failing closed: {e}") 

647 return True # FAIL-CLOSED: treat as API token to preserve hard-block 

648 

649 

650def _get_user_by_email_sync(email: str) -> Optional[EmailUser]: 

651 """Synchronous helper to get user by email. 

652 

653 This runs in a thread pool to avoid blocking the event loop. 

654 

655 Args: 

656 email: The user's email address. 

657 

658 Returns: 

659 EmailUser if found, None otherwise. 

660 """ 

661 with fresh_db_session() as db: 

662 # Third-Party 

663 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

664 

665 result = db.execute(select(EmailUser).where(EmailUser.email == email)) 

666 user = result.scalar_one_or_none() 

667 if user: 

668 # Detach from session and return a copy of attributes 

669 # since the session will be closed 

670 return EmailUser( 

671 email=user.email, 

672 password_hash=user.password_hash, 

673 full_name=user.full_name, 

674 is_admin=user.is_admin, 

675 is_active=user.is_active, 

676 auth_provider=user.auth_provider, 

677 password_change_required=user.password_change_required, 

678 email_verified_at=user.email_verified_at, 

679 created_at=user.created_at, 

680 updated_at=user.updated_at, 

681 ) 

682 return None 

683 

684 

685def _resolve_plugin_authenticated_user_sync(user_dict: Dict[str, Any]) -> Optional[EmailUser]: 

686 """Resolve plugin-authenticated user against database-backed identity state. 

687 

688 Plugin hooks may authenticate a request and return identity claims. This 

689 helper enforces that admin status is always derived from the database record. 

690 

691 Behavior: 

692 - Existing DB user: return DB user (authoritative for is_admin/is_active). 

693 - Missing DB user and REQUIRE_USER_IN_DB=true: reject (None). 

694 - Missing DB user and REQUIRE_USER_IN_DB=false: allow a non-admin virtual 

695 user built from non-privileged plugin claims. 

696 

697 Args: 

698 user_dict: Identity claims returned by plugin auth hook. 

699 

700 Returns: 

701 EmailUser when identity is accepted, otherwise None. 

702 """ 

703 email = str(user_dict.get("email") or "").strip() 

704 if not email: 

705 return None 

706 

707 db_user = _get_user_by_email_sync(email) 

708 if db_user: 

709 return db_user 

710 

711 if settings.require_user_in_db: 

712 return None 

713 

714 return EmailUser( 

715 email=email, 

716 password_hash=user_dict.get("password_hash", ""), 

717 full_name=user_dict.get("full_name"), 

718 is_admin=False, 

719 is_active=user_dict.get("is_active", True), 

720 auth_provider=user_dict.get("auth_provider", "local"), 

721 password_change_required=user_dict.get("password_change_required", False), 

722 email_verified_at=user_dict.get("email_verified_at"), 

723 created_at=user_dict.get("created_at", datetime.now(timezone.utc)), 

724 updated_at=user_dict.get("updated_at", datetime.now(timezone.utc)), 

725 ) 

726 

727 

728def _get_auth_context_batched_sync(email: str, jti: Optional[str] = None) -> Dict[str, Any]: 

729 """Batched auth context lookup in a single DB session. 

730 

731 Combines what were 3 separate asyncio.to_thread calls into 1: 

732 1. _get_user_by_email_sync - user data 

733 2. _get_personal_team_sync - personal team ID 

734 3. _check_token_revoked_sync - token revocation status 

735 4. _get_user_team_ids - all active team memberships (for session tokens) 

736 

737 This reduces thread pool contention and DB connection overhead. 

738 

739 Args: 

740 email: User email address 

741 jti: JWT ID for revocation check (optional) 

742 

743 Returns: 

744 Dict with keys: user (dict or None), personal_team_id (str or None), 

745 is_token_revoked (bool), team_ids (list of str) 

746 

747 Examples: 

748 >>> # This function runs in a thread pool 

749 >>> # result = _get_auth_context_batched_sync("test@example.com", "jti-123") 

750 >>> # result["is_token_revoked"] # False if not revoked 

751 """ 

752 with fresh_db_session() as db: 

753 # Third-Party 

754 from sqlalchemy import select # pylint: disable=import-outside-toplevel 

755 

756 # First-Party 

757 from mcpgateway.db import EmailTeam, EmailTeamMember, TokenRevocation # pylint: disable=import-outside-toplevel 

758 

759 result = { 

760 "user": None, 

761 "personal_team_id": None, 

762 "is_token_revoked": False, # nosec B105 - boolean flag, not a password 

763 "team_ids": [], 

764 } 

765 

766 # Query 1: Get user data 

767 user_result = db.execute(select(EmailUser).where(EmailUser.email == email)) 

768 user = user_result.scalar_one_or_none() 

769 

770 if user: 

771 # Detach user data as dict (session will close) 

772 result["user"] = { 

773 "email": user.email, 

774 "password_hash": user.password_hash, 

775 "full_name": user.full_name, 

776 "is_admin": user.is_admin, 

777 "is_active": user.is_active, 

778 "auth_provider": user.auth_provider, 

779 "password_change_required": user.password_change_required, 

780 "email_verified_at": user.email_verified_at, 

781 "created_at": user.created_at, 

782 "updated_at": user.updated_at, 

783 } 

784 

785 # Query 2: Get personal team (only if user exists) 

786 team_result = db.execute( 

787 select(EmailTeam) 

788 .join(EmailTeamMember) 

789 .where( 

790 EmailTeamMember.user_email == email, 

791 EmailTeam.is_personal.is_(True), 

792 ) 

793 ) 

794 personal_team = team_result.scalar_one_or_none() 

795 if personal_team: 

796 result["personal_team_id"] = personal_team.id 

797 

798 # Query 4: Get all active team memberships (for session token team resolution) 

799 team_ids_result = db.execute( 

800 select(EmailTeamMember.team_id).where( 

801 EmailTeamMember.user_email == email, 

802 EmailTeamMember.is_active.is_(True), 

803 ) 

804 ) 

805 result["team_ids"] = [row[0] for row in team_ids_result.all()] 

806 

807 # Query 3: Check token revocation (if JTI provided) 

808 if jti: 

809 revoke_result = db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti)) 

810 result["is_token_revoked"] = revoke_result.scalar_one_or_none() is not None 

811 

812 return result 

813 

814 

815def _user_from_cached_dict(user_dict: Dict[str, Any]) -> EmailUser: 

816 """Create EmailUser instance from cached dict. 

817 

818 Args: 

819 user_dict: User data dictionary from cache 

820 

821 Returns: 

822 EmailUser instance (detached from any session) 

823 """ 

824 return EmailUser( 

825 email=user_dict["email"], 

826 password_hash=user_dict.get("password_hash", ""), 

827 full_name=user_dict.get("full_name"), 

828 is_admin=user_dict.get("is_admin", False), 

829 is_active=user_dict.get("is_active", True), 

830 auth_provider=user_dict.get("auth_provider", "local"), 

831 password_change_required=user_dict.get("password_change_required", False), 

832 email_verified_at=user_dict.get("email_verified_at"), 

833 created_at=user_dict.get("created_at", datetime.now(timezone.utc)), 

834 updated_at=user_dict.get("updated_at", datetime.now(timezone.utc)), 

835 ) 

836 

837 

838async def get_current_user( 

839 credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), 

840 request: Request = None, # type: ignore[assignment] 

841) -> EmailUser: 

842 """Get current authenticated user from JWT token with revocation checking. 

843 

844 Supports plugin-based custom authentication via HTTP_AUTH_RESOLVE_USER hook. 

845 

846 Args: 

847 credentials: HTTP authorization credentials 

848 request: Optional request object for plugin hooks 

849 

850 Returns: 

851 EmailUser: Authenticated user 

852 

853 Raises: 

854 HTTPException: If authentication fails 

855 """ 

856 logger = logging.getLogger(__name__) 

857 

858 async def _set_auth_method_from_payload(payload: dict) -> None: 

859 """Set request.state.auth_method based on JWT payload. 

860 

861 Args: 

862 payload: Decoded JWT payload 

863 """ 

864 if not request: 

865 return 

866 

867 # NOTE: Cannot use structural check (scopes dict) because email login JWTs 

868 # also have scopes dict (see email_auth.py:160) 

869 user_info = payload.get("user", {}) 

870 auth_provider = user_info.get("auth_provider") 

871 

872 if auth_provider == "api_token": 

873 request.state.auth_method = "api_token" 

874 jti = payload.get("jti") 

875 if jti: 

876 request.state.jti = jti 

877 try: 

878 await asyncio.to_thread(_update_api_token_last_used_sync, jti) 

879 except Exception as e: 

880 logger.debug(f"Failed to update API token last_used: {e}") 

881 # Continue authentication - last_used update is not critical 

882 return 

883 

884 if auth_provider: 

885 # email, oauth, saml, or any other interactive auth provider 

886 request.state.auth_method = "jwt" 

887 return 

888 

889 # Legacy API token fallback: check if JTI exists in API token table 

890 # This handles tokens created before auth_provider was added 

891 jti_for_check = payload.get("jti") 

892 if jti_for_check: 

893 is_legacy_api_token = await asyncio.to_thread(_is_api_token_jti_sync, jti_for_check) 

894 if is_legacy_api_token: 

895 request.state.auth_method = "api_token" 

896 request.state.jti = jti_for_check 

897 logger.debug(f"Legacy API token detected via DB lookup (JTI: ...{jti_for_check[-8:]})") 

898 try: 

899 await asyncio.to_thread(_update_api_token_last_used_sync, jti_for_check) 

900 except Exception as e: 

901 logger.debug(f"Failed to update legacy API token last_used: {e}") 

902 # Continue authentication - last_used update is not critical 

903 else: 

904 request.state.auth_method = "jwt" 

905 else: 

906 # No auth_provider or JTI; default to interactive 

907 request.state.auth_method = "jwt" 

908 

909 # NEW: Custom authentication hook - allows plugins to provide alternative auth 

910 # This hook is invoked BEFORE standard JWT/API token validation 

911 try: 

912 # Get plugin manager singleton 

913 plugin_manager = get_plugin_manager() 

914 

915 if plugin_manager and plugin_manager.has_hooks_for(HttpHookType.HTTP_AUTH_RESOLVE_USER): 

916 # Extract client information 

917 client_host = None 

918 client_port = None 

919 if request and hasattr(request, "client") and request.client: 

920 client_host = request.client.host 

921 client_port = request.client.port 

922 

923 # Serialize credentials for plugin 

924 credentials_dict = None 

925 if credentials: 

926 credentials_dict = { 

927 "scheme": credentials.scheme, 

928 "credentials": credentials.credentials, 

929 } 

930 

931 # Extract headers from request 

932 # Note: Middleware modifies request.scope["headers"], so request.headers 

933 # will automatically reflect any modifications made by HTTP_PRE_REQUEST hooks 

934 headers = {} 

935 if request and hasattr(request, "headers"): 

936 headers = dict(request.headers) 

937 

938 # Get request ID from correlation ID context (set by CorrelationIDMiddleware) 

939 request_id = get_correlation_id() 

940 if not request_id: 

941 # Fallback chain for safety 

942 if request and hasattr(request, "state") and hasattr(request.state, "request_id"): 

943 request_id = request.state.request_id 

944 else: 

945 request_id = uuid.uuid4().hex 

946 logger.debug(f"Generated fallback request ID in get_current_user: {request_id}") 

947 

948 # Get plugin contexts from request state if available 

949 global_context = getattr(request.state, "plugin_global_context", None) if request else None 

950 if not global_context: 

951 # Create global context 

952 global_context = GlobalContext( 

953 request_id=request_id, 

954 server_id=None, 

955 tenant_id=None, 

956 ) 

957 

958 context_table = getattr(request.state, "plugin_context_table", None) if request else None 

959 

960 # Invoke custom auth resolution hook 

961 # violations_as_exceptions=True so PluginViolationError is raised for explicit denials 

962 auth_result, context_table_result = await plugin_manager.invoke_hook( 

963 HttpHookType.HTTP_AUTH_RESOLVE_USER, 

964 payload=HttpAuthResolveUserPayload( 

965 credentials=credentials_dict, 

966 headers=HttpHeaderPayload(root=headers), 

967 client_host=client_host, 

968 client_port=client_port, 

969 ), 

970 global_context=global_context, 

971 local_contexts=context_table, 

972 violations_as_exceptions=True, # Raise PluginViolationError for auth denials 

973 ) 

974 

975 # If plugin successfully authenticated user, return it 

976 if auth_result.modified_payload and isinstance(auth_result.modified_payload, dict): 

977 logger.info("User authenticated via plugin hook") 

978 # Resolve plugin claims against DB state so admin flags are authoritative. 

979 user_dict = auth_result.modified_payload 

980 user = await asyncio.to_thread(_resolve_plugin_authenticated_user_sync, user_dict) 

981 

982 if user is None: 

983 logger.warning("Plugin auth rejected: user identity could not be resolved against DB policy") 

984 raise HTTPException( 

985 status_code=status.HTTP_401_UNAUTHORIZED, 

986 detail="User not found in database", 

987 headers={"WWW-Authenticate": "Bearer"}, 

988 ) 

989 

990 if not user.is_active: 

991 raise HTTPException( 

992 status_code=status.HTTP_401_UNAUTHORIZED, 

993 detail="Account disabled", 

994 headers={"WWW-Authenticate": "Bearer"}, 

995 ) 

996 

997 # Store auth_method in request.state so it can be accessed by RBAC middleware 

998 if request and auth_result.metadata: 

999 auth_method = auth_result.metadata.get("auth_method") 

1000 if auth_method: 

1001 request.state.auth_method = auth_method 

1002 logger.debug(f"Stored auth_method '{auth_method}' in request.state") 

1003 

1004 if request and context_table_result: 

1005 request.state.plugin_context_table = context_table_result 

1006 

1007 if request and global_context: 

1008 request.state.plugin_global_context = global_context 

1009 

1010 if plugin_manager and plugin_manager.config.plugin_settings.include_user_info: 

1011 _inject_userinfo_instate(request, user) 

1012 

1013 return user 

1014 # If continue_processing=True (no payload), fall through to standard auth 

1015 

1016 except PluginViolationError as e: 

1017 # Plugin explicitly denied authentication with custom message 

1018 logger.warning(f"Authentication denied by plugin: {e.message}") 

1019 raise HTTPException( 

1020 status_code=status.HTTP_401_UNAUTHORIZED, 

1021 detail=e.message, # Use plugin's custom error message 

1022 headers={"WWW-Authenticate": "Bearer"}, 

1023 ) 

1024 except HTTPException: 

1025 # Re-raise HTTP exceptions 

1026 raise 

1027 except Exception as e: 

1028 # Log but don't fail on plugin errors - fall back to standard auth 

1029 logger.warning(f"HTTP_AUTH_RESOLVE_USER hook failed, falling back to standard auth: {e}") 

1030 

1031 # EXISTING: Standard authentication (JWT, API tokens) 

1032 if not credentials: 

1033 logger.warning("No credentials provided") 

1034 raise HTTPException( 

1035 status_code=status.HTTP_401_UNAUTHORIZED, 

1036 detail="Authentication required", 

1037 headers={"WWW-Authenticate": "Bearer"}, 

1038 ) 

1039 

1040 logger.debug("Attempting authentication with bearer credentials") 

1041 email = None 

1042 

1043 try: 

1044 # Try JWT token first using the centralized verify_jwt_token_cached function 

1045 logger.debug("Attempting JWT token validation") 

1046 payload = await verify_jwt_token_cached(credentials.credentials, request) 

1047 

1048 logger.debug("JWT token validated successfully") 

1049 # Extract user identifier (support both new and legacy token formats) 

1050 email = payload.get("sub") 

1051 if email is None: 

1052 # Try legacy format 

1053 email = payload.get("email") 

1054 

1055 if email is None: 

1056 logger.debug("No email/sub found in JWT payload") 

1057 raise HTTPException( 

1058 status_code=status.HTTP_401_UNAUTHORIZED, 

1059 detail="Invalid token", 

1060 headers={"WWW-Authenticate": "Bearer"}, 

1061 ) 

1062 

1063 logger.debug("JWT authentication successful for email: %s", email) 

1064 

1065 # Extract JTI for revocation check 

1066 jti = payload.get("jti") 

1067 

1068 # === AUTH CACHING: Check cache before DB queries === 

1069 if settings.auth_cache_enabled: 

1070 try: 

1071 # First-Party 

1072 from mcpgateway.cache.auth_cache import auth_cache, CachedAuthContext # pylint: disable=import-outside-toplevel 

1073 

1074 cached_ctx = await auth_cache.get_auth_context(email, jti) 

1075 if cached_ctx: 

1076 logger.debug(f"Auth cache hit for {email}") 

1077 

1078 # Check revocation from cache 

1079 if cached_ctx.is_token_revoked: 

1080 raise HTTPException( 

1081 status_code=status.HTTP_401_UNAUTHORIZED, 

1082 detail="Token has been revoked", 

1083 headers={"WWW-Authenticate": "Bearer"}, 

1084 ) 

1085 

1086 # Check user active status from cache 

1087 if cached_ctx.user and not cached_ctx.user.get("is_active", True): 

1088 raise HTTPException( 

1089 status_code=status.HTTP_401_UNAUTHORIZED, 

1090 detail="Account disabled", 

1091 headers={"WWW-Authenticate": "Bearer"}, 

1092 ) 

1093 

1094 # Resolve teams based on token_use 

1095 if request: 

1096 token_use = payload.get("token_use") 

1097 request.state.token_use = token_use 

1098 

1099 if token_use == "session": # nosec B105 - Not a password; token_use is a JWT claim type 

1100 # Session token: resolve teams from DB/cache 

1101 user_info = cached_ctx.user or {"is_admin": False} 

1102 teams = await _resolve_teams_from_db(email, user_info) 

1103 else: 

1104 # API token or legacy: use embedded teams 

1105 teams = normalize_token_teams(payload) 

1106 

1107 request.state.token_teams = teams 

1108 

1109 # Set team_id: only for single-team API tokens 

1110 if teams is None: 

1111 request.state.team_id = None 

1112 elif len(teams) == 1 and token_use != "session": # nosec B105 

1113 request.state.team_id = teams[0] if isinstance(teams[0], str) else teams[0].get("id") 

1114 else: 

1115 request.state.team_id = None 

1116 

1117 await _set_auth_method_from_payload(payload) 

1118 

1119 # Return user from cache 

1120 if cached_ctx.user: 

1121 # When require_user_in_db is enabled, verify user still exists in DB 

1122 # This prevents stale cache from bypassing strict mode 

1123 if settings.require_user_in_db: 

1124 db_user = await asyncio.to_thread(_get_user_by_email_sync, email) 

1125 if db_user is None: 

1126 logger.warning( 

1127 f"Authentication rejected for {email}: cached user not found in database. " "REQUIRE_USER_IN_DB is enabled.", 

1128 extra={"security_event": "user_not_in_db_rejected", "user_id": email}, 

1129 ) 

1130 raise HTTPException( 

1131 status_code=status.HTTP_401_UNAUTHORIZED, 

1132 detail="User not found in database", 

1133 headers={"WWW-Authenticate": "Bearer"}, 

1134 ) 

1135 

1136 if plugin_manager and plugin_manager.config.plugin_settings.include_user_info: 

1137 _inject_userinfo_instate(request, _user_from_cached_dict(cached_ctx.user)) 

1138 

1139 return _user_from_cached_dict(cached_ctx.user) 

1140 

1141 # User not in cache but context was (shouldn't happen, but handle it) 

1142 logger.debug("Auth context cached but user missing, falling through to DB") 

1143 

1144 except HTTPException: 

1145 raise 

1146 except Exception as cache_error: 

1147 logger.debug(f"Auth cache check failed, falling through to DB: {cache_error}") 

1148 

1149 # === BATCHED QUERIES: Single DB call for user + team + revocation === 

1150 if settings.auth_cache_batch_queries: 

1151 try: 

1152 auth_ctx = await asyncio.to_thread(_get_auth_context_batched_sync, email, jti) 

1153 

1154 # Check revocation 

1155 if auth_ctx.get("is_token_revoked"): 

1156 raise HTTPException( 

1157 status_code=status.HTTP_401_UNAUTHORIZED, 

1158 detail="Token has been revoked", 

1159 headers={"WWW-Authenticate": "Bearer"}, 

1160 ) 

1161 

1162 # Resolve teams based on token_use 

1163 token_use = payload.get("token_use") 

1164 if token_use == "session": # nosec B105 - Not a password; token_use is a JWT claim type 

1165 # Session token: use team_ids from batched query 

1166 user_dict = auth_ctx.get("user") 

1167 is_admin = user_dict.get("is_admin", False) if user_dict else False 

1168 if is_admin: 

1169 teams = None # Admin bypass 

1170 else: 

1171 teams = auth_ctx.get("team_ids", []) 

1172 else: 

1173 # API token or legacy: use embedded teams 

1174 teams = normalize_token_teams(payload) 

1175 

1176 # Set team_id: only for single-team API tokens 

1177 if teams is None: 

1178 team_id = None 

1179 elif len(teams) == 1 and token_use != "session": # nosec B105 

1180 team_id = teams[0] if isinstance(teams[0], str) else teams[0].get("id") 

1181 else: 

1182 team_id = None 

1183 

1184 if request: 

1185 request.state.token_teams = teams 

1186 request.state.team_id = team_id 

1187 request.state.token_use = token_use 

1188 await _set_auth_method_from_payload(payload) 

1189 

1190 # Store in cache for future requests 

1191 if settings.auth_cache_enabled: 

1192 try: 

1193 # First-Party 

1194 from mcpgateway.cache.auth_cache import auth_cache, CachedAuthContext # noqa: F811 pylint: disable=import-outside-toplevel 

1195 

1196 await auth_cache.set_auth_context( 

1197 email, 

1198 jti, 

1199 CachedAuthContext( 

1200 user=auth_ctx.get("user"), 

1201 personal_team_id=auth_ctx.get("personal_team_id"), 

1202 is_token_revoked=auth_ctx.get("is_token_revoked", False), 

1203 ), 

1204 ) 

1205 # Also populate teams-list cache so cached-path requests 

1206 # don't need an extra DB query via _resolve_teams_from_db() 

1207 if token_use == "session" and teams is not None: # nosec B105 

1208 await auth_cache.set_user_teams(f"{email}:True", teams) 

1209 except Exception as cache_set_error: 

1210 logger.debug(f"Failed to cache auth context: {cache_set_error}") 

1211 

1212 # Create user from batched result 

1213 if auth_ctx.get("user"): 

1214 user_dict = auth_ctx["user"] 

1215 if not user_dict.get("is_active", True): 

1216 raise HTTPException( 

1217 status_code=status.HTTP_401_UNAUTHORIZED, 

1218 detail="Account disabled", 

1219 headers={"WWW-Authenticate": "Bearer"}, 

1220 ) 

1221 # Store user for return at end of function 

1222 # We'll check platform admin case and return below 

1223 _batched_user = _user_from_cached_dict(user_dict) 

1224 else: 

1225 _batched_user = None 

1226 

1227 # Handle user not found case 

1228 if _batched_user is None: 

1229 # Check if strict user-in-DB mode is enabled 

1230 if settings.require_user_in_db: 

1231 logger.warning( 

1232 f"Authentication rejected for {email}: user not found in database. " "REQUIRE_USER_IN_DB is enabled.", 

1233 extra={"security_event": "user_not_in_db_rejected", "user_id": email}, 

1234 ) 

1235 raise HTTPException( 

1236 status_code=status.HTTP_401_UNAUTHORIZED, 

1237 detail="User not found in database", 

1238 headers={"WWW-Authenticate": "Bearer"}, 

1239 ) 

1240 

1241 # Platform admin bootstrap (only when REQUIRE_USER_IN_DB=false) 

1242 if email == getattr(settings, "platform_admin_email", "admin@example.com"): 

1243 logger.info( 

1244 f"Platform admin bootstrap authentication for {email}. " "User authenticated via platform admin configuration.", 

1245 extra={"security_event": "platform_admin_bootstrap", "user_id": email}, 

1246 ) 

1247 _batched_user = EmailUser( 

1248 email=email, 

1249 password_hash="", # nosec B106 

1250 full_name=getattr(settings, "platform_admin_full_name", "Platform Administrator"), 

1251 is_admin=True, 

1252 is_active=True, 

1253 auth_provider="local", 

1254 password_change_required=False, 

1255 email_verified_at=datetime.now(timezone.utc), 

1256 created_at=datetime.now(timezone.utc), 

1257 updated_at=datetime.now(timezone.utc), 

1258 ) 

1259 else: 

1260 raise HTTPException( 

1261 status_code=status.HTTP_401_UNAUTHORIZED, 

1262 detail="User not found", 

1263 headers={"WWW-Authenticate": "Bearer"}, 

1264 ) 

1265 

1266 if plugin_manager and plugin_manager.config.plugin_settings.include_user_info: 

1267 _inject_userinfo_instate(request, _batched_user) 

1268 

1269 return _batched_user 

1270 

1271 except HTTPException: 

1272 raise 

1273 except Exception as batch_error: 

1274 logger.warning(f"Batched auth query failed, falling back to individual queries: {batch_error}") 

1275 

1276 # === FALLBACK: Original individual queries (if batching disabled or failed) === 

1277 if jti: 

1278 try: 

1279 is_revoked = await asyncio.to_thread(_check_token_revoked_sync, jti) 

1280 if is_revoked: 

1281 raise HTTPException( 

1282 status_code=status.HTTP_401_UNAUTHORIZED, 

1283 detail="Token has been revoked", 

1284 headers={"WWW-Authenticate": "Bearer"}, 

1285 ) 

1286 except HTTPException: 

1287 raise 

1288 except Exception as revoke_check_error: 

1289 # Fail-secure: if the revocation check itself errors, reject the token. 

1290 # Allowing through on error would let revoked tokens bypass enforcement 

1291 # when the DB is unreachable or the table is missing. 

1292 logger.warning(f"Token revocation check failed for JTI {jti} — denying access (fail-secure): {revoke_check_error}") 

1293 raise HTTPException( 

1294 status_code=status.HTTP_401_UNAUTHORIZED, 

1295 detail="Token validation failed", 

1296 headers={"WWW-Authenticate": "Bearer"}, 

1297 ) 

1298 

1299 # Resolve teams based on token_use 

1300 token_use = payload.get("token_use") 

1301 if token_use == "session": # nosec B105 - Not a password; token_use is a JWT claim type 

1302 # Session token: resolve teams from DB/cache (fallback path — separate query OK) 

1303 user_info = {"is_admin": payload.get("is_admin", False) or payload.get("user", {}).get("is_admin", False)} 

1304 normalized_teams = await _resolve_teams_from_db(email, user_info) 

1305 else: 

1306 # API token or legacy: use embedded teams 

1307 normalized_teams = normalize_token_teams(payload) 

1308 

1309 # Set team_id: only for single-team API tokens 

1310 if normalized_teams is None: 

1311 team_id = None 

1312 elif len(normalized_teams) == 1 and token_use != "session": # nosec B105 

1313 team_id = normalized_teams[0] if isinstance(normalized_teams[0], str) else normalized_teams[0].get("id") 

1314 else: 

1315 team_id = None 

1316 

1317 if request: 

1318 request.state.token_teams = normalized_teams 

1319 request.state.team_id = team_id 

1320 request.state.token_use = token_use 

1321 # Store JTI for use in middleware (e.g., token usage logging) 

1322 if jti: 

1323 request.state.jti = jti 

1324 await _set_auth_method_from_payload(payload) 

1325 

1326 except HTTPException: 

1327 # Re-raise HTTPException from verify_jwt_token (handles expired/invalid tokens) 

1328 raise 

1329 except Exception as jwt_error: 

1330 # JWT validation failed, try database API token 

1331 # Uses fresh DB session via asyncio.to_thread to avoid blocking event loop 

1332 logger.debug("JWT validation failed with error: %s, trying database API token", jwt_error) 

1333 try: 

1334 token_hash = hashlib.sha256(credentials.credentials.encode()).hexdigest() 

1335 

1336 # Lookup API token using fresh session in thread pool 

1337 api_token_info = await asyncio.to_thread(_lookup_api_token_sync, token_hash) 

1338 logger.debug(f"Database lookup result: {api_token_info is not None}") 

1339 

1340 if api_token_info: 

1341 # Check for error conditions returned by helper 

1342 if api_token_info.get("expired"): 

1343 raise HTTPException( 

1344 status_code=status.HTTP_401_UNAUTHORIZED, 

1345 detail="API token expired", 

1346 headers={"WWW-Authenticate": "Bearer"}, 

1347 ) 

1348 

1349 if api_token_info.get("revoked"): 

1350 raise HTTPException( 

1351 status_code=status.HTTP_401_UNAUTHORIZED, 

1352 detail="API token has been revoked", 

1353 headers={"WWW-Authenticate": "Bearer"}, 

1354 ) 

1355 

1356 # Use the email from the API token 

1357 email = api_token_info["user_email"] 

1358 logger.debug(f"API token authentication successful for email: {email}") 

1359 

1360 # Set auth_method for database API tokens 

1361 if request: 

1362 request.state.auth_method = "api_token" 

1363 request.state.user_email = api_token_info["user_email"] 

1364 # Store JTI for use in middleware 

1365 if "jti" in api_token_info: 

1366 request.state.jti = api_token_info["jti"] 

1367 else: 

1368 logger.debug("API token not found in database") 

1369 logger.debug("No valid authentication method found") 

1370 # Neither JWT nor API token worked 

1371 raise HTTPException( 

1372 status_code=status.HTTP_401_UNAUTHORIZED, 

1373 detail="Invalid authentication credentials", 

1374 headers={"WWW-Authenticate": "Bearer"}, 

1375 ) 

1376 except HTTPException: 

1377 # Re-raise HTTP exceptions 

1378 raise 

1379 except Exception as e: 

1380 # Neither JWT nor API token validation worked 

1381 logger.debug(f"Database API token validation failed with exception: {e}") 

1382 raise HTTPException( 

1383 status_code=status.HTTP_401_UNAUTHORIZED, 

1384 detail="Invalid authentication credentials", 

1385 headers={"WWW-Authenticate": "Bearer"}, 

1386 ) 

1387 

1388 # Get user from database using fresh session in thread pool 

1389 user = await asyncio.to_thread(_get_user_by_email_sync, email) 

1390 

1391 if user is None: 

1392 # Check if strict user-in-DB mode is enabled 

1393 if settings.require_user_in_db: 

1394 logger.warning( 

1395 f"Authentication rejected for {email}: user not found in database. " "REQUIRE_USER_IN_DB is enabled.", 

1396 extra={"security_event": "user_not_in_db_rejected", "user_id": email}, 

1397 ) 

1398 raise HTTPException( 

1399 status_code=status.HTTP_401_UNAUTHORIZED, 

1400 detail="User not found in database", 

1401 headers={"WWW-Authenticate": "Bearer"}, 

1402 ) 

1403 

1404 # Platform admin bootstrap (only when REQUIRE_USER_IN_DB=false) 

1405 # If user doesn't exist but token is valid and email matches platform admin, 

1406 # create a virtual admin user object 

1407 if email == getattr(settings, "platform_admin_email", "admin@example.com"): 

1408 logger.info( 

1409 f"Platform admin bootstrap authentication for {email}. " "User authenticated via platform admin configuration.", 

1410 extra={"security_event": "platform_admin_bootstrap", "user_id": email}, 

1411 ) 

1412 # Create a virtual admin user for authentication purposes 

1413 user = EmailUser( 

1414 email=email, 

1415 password_hash="", # nosec B106 - Not used for JWT authentication 

1416 full_name=getattr(settings, "platform_admin_full_name", "Platform Administrator"), 

1417 is_admin=True, 

1418 is_active=True, 

1419 auth_provider="local", 

1420 password_change_required=False, 

1421 email_verified_at=datetime.now(timezone.utc), 

1422 created_at=datetime.now(timezone.utc), 

1423 updated_at=datetime.now(timezone.utc), 

1424 ) 

1425 else: 

1426 raise HTTPException( 

1427 status_code=status.HTTP_401_UNAUTHORIZED, 

1428 detail="User not found", 

1429 headers={"WWW-Authenticate": "Bearer"}, 

1430 ) 

1431 

1432 if not user.is_active: 

1433 raise HTTPException( 

1434 status_code=status.HTTP_401_UNAUTHORIZED, 

1435 detail="Account disabled", 

1436 headers={"WWW-Authenticate": "Bearer"}, 

1437 ) 

1438 

1439 if plugin_manager and plugin_manager.config.plugin_settings.include_user_info: 

1440 _inject_userinfo_instate(request, user) 

1441 

1442 return user 

1443 

1444 

1445def _inject_userinfo_instate(request: Optional[object] = None, user: Optional[EmailUser] = None) -> None: 

1446 """This function injects user related information into the plugin_global_context, if the config has 

1447 include_user_info key set as true. 

1448 

1449 Args: 

1450 request: Optional request object for plugin hooks 

1451 user: User related information 

1452 """ 

1453 

1454 logger = logging.getLogger(__name__) 

1455 # Get request ID from correlation ID context (set by CorrelationIDMiddleware) 

1456 request_id = get_correlation_id() 

1457 if not request_id: 

1458 # Fallback chain for safety 

1459 if request and hasattr(request, "state") and hasattr(request.state, "request_id"): 

1460 request_id = request.state.request_id 

1461 else: 

1462 request_id = uuid.uuid4().hex 

1463 logger.debug(f"Generated fallback request ID in get_current_user: {request_id}") 

1464 

1465 # Get plugin contexts from request state if available 

1466 global_context = getattr(request.state, "plugin_global_context", None) if request else None 

1467 if not global_context: 

1468 # Create global context 

1469 global_context = GlobalContext( 

1470 request_id=request_id, 

1471 server_id=None, 

1472 tenant_id=None, 

1473 ) 

1474 

1475 if user: 

1476 if not global_context.user: 

1477 global_context.user = {} 

1478 global_context.user["email"] = user.email 

1479 global_context.user["is_admin"] = user.is_admin 

1480 global_context.user["full_name"] = user.full_name 

1481 

1482 if request and global_context: 

1483 request.state.plugin_global_context = global_context