Coverage for mcpgateway / auth.py: 99%
556 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/auth.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Shared authentication utilities.
9This module provides common authentication functions that can be shared
10across different parts of the application without creating circular imports.
11"""
13# Standard
14import asyncio
15from datetime import datetime, timezone
16import hashlib
17import logging
18import threading
19from typing import Any, Dict, Generator, List, Never, Optional
20import uuid
22# Third-Party
23from fastapi import Depends, HTTPException, status
24from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
25from sqlalchemy.orm import Session
26from starlette.requests import Request
28# First-Party
29from mcpgateway.config import settings
30from mcpgateway.db import EmailUser, fresh_db_session, SessionLocal
31from mcpgateway.plugins.framework import get_plugin_manager, GlobalContext, HttpAuthResolveUserPayload, HttpHeaderPayload, HttpHookType, PluginViolationError
32from mcpgateway.utils.correlation_id import get_correlation_id
33from mcpgateway.utils.verify_credentials import verify_jwt_token_cached
35# Security scheme
36security = HTTPBearer(auto_error=False)
38# Module-level sync Redis client for rate-limiting (lazy-initialized)
39_SYNC_REDIS_CLIENT = None # pylint: disable=invalid-name
40_SYNC_REDIS_LOCK = threading.Lock()
41_SYNC_REDIS_FAILURE_TIME: Optional[float] = None # Backoff after connection failures
43# Module-level in-memory cache for last_used rate-limiting (fallback when Redis unavailable)
44_LAST_USED_CACHE: dict = {}
45_LAST_USED_CACHE_LOCK = threading.Lock()
48def _log_auth_event(
49 logger: logging.Logger,
50 message: str,
51 level: int = logging.INFO,
52 user_id: Optional[str] = None,
53 auth_method: Optional[str] = None,
54 auth_success: bool = False,
55 security_event: Optional[str] = None,
56 security_severity: str = "low",
57 **extra_context,
58) -> None:
59 """Log authentication event with structured context and request_id.
61 This helper creates structured log records that include request_id from the
62 correlation ID context, enabling end-to-end tracing of authentication flows.
64 Args:
65 logger: Logger instance to use
66 message: Log message
67 level: Log level (default: INFO)
68 user_id: User identifier
69 auth_method: Authentication method used (jwt, api_token, etc.)
70 auth_success: Whether authentication succeeded
71 security_event: Type of security event (authentication, authorization, etc.)
72 security_severity: Severity level (low, medium, high, critical)
73 **extra_context: Additional context fields
74 """
75 # Get request_id from correlation ID context
76 request_id = get_correlation_id()
78 # Build structured log record
79 extra = {
80 "request_id": request_id,
81 "entity_type": "auth",
82 "auth_success": auth_success,
83 "security_event": security_event or "authentication",
84 "security_severity": security_severity,
85 }
87 if user_id:
88 extra["user_id"] = user_id
89 if auth_method:
90 extra["auth_method"] = auth_method
92 # Add any additional context
93 extra.update(extra_context)
95 # Log with structured context
96 logger.log(level, message, extra=extra)
99def get_db() -> Generator[Session, Never, None]:
100 """Database dependency.
102 Commits the transaction on successful completion to avoid implicit rollbacks
103 for read-only operations. Rolls back explicitly on exception.
105 Yields:
106 Session: SQLAlchemy database session
108 Raises:
109 Exception: Re-raises any exception after rolling back the transaction.
111 Examples:
112 >>> db_gen = get_db()
113 >>> db = next(db_gen)
114 >>> hasattr(db, 'query')
115 True
116 >>> hasattr(db, 'close')
117 True
118 """
119 db = SessionLocal()
120 try:
121 yield db
122 db.commit()
123 except Exception:
124 try:
125 db.rollback()
126 except Exception:
127 try:
128 db.invalidate()
129 except Exception:
130 pass # nosec B110 - Best effort cleanup on connection failure
131 raise
132 finally:
133 db.close()
136def _get_personal_team_sync(user_email: str) -> Optional[str]:
137 """Synchronous helper to get user's personal team using a fresh DB session.
139 This runs in a thread pool to avoid blocking the event loop.
141 Args:
142 user_email: The user's email address.
144 Returns:
145 The personal team ID, or None if not found.
146 """
147 with fresh_db_session() as db:
148 # Third-Party
149 from sqlalchemy import select # pylint: disable=import-outside-toplevel
151 # First-Party
152 from mcpgateway.db import EmailTeam, EmailTeamMember # pylint: disable=import-outside-toplevel
154 result = db.execute(select(EmailTeam).join(EmailTeamMember).where(EmailTeamMember.user_email == user_email, EmailTeam.is_personal.is_(True)))
155 personal_team = result.scalar_one_or_none()
156 return personal_team.id if personal_team else None
159def _get_user_team_ids_sync(email: str) -> List[str]:
160 """Query all active team IDs for a user (including personal teams).
162 Uses a fresh DB session so this can be called from thread pool.
163 Matches the behavior of user.get_teams() which returns all active memberships.
165 Args:
166 email: User email address
168 Returns:
169 List of team ID strings
170 """
171 with fresh_db_session() as db:
172 # Third-Party
173 from sqlalchemy import select # pylint: disable=import-outside-toplevel
175 # First-Party
176 from mcpgateway.db import EmailTeamMember # pylint: disable=import-outside-toplevel
178 result = db.execute(
179 select(EmailTeamMember.team_id).where(
180 EmailTeamMember.user_email == email,
181 EmailTeamMember.is_active.is_(True),
182 )
183 )
184 return [row[0] for row in result.all()]
187def get_user_team_roles(db, user_email: str) -> Dict[str, str]:
188 """Return a {team_id: role} mapping for a user's active team memberships.
190 Args:
191 db: SQLAlchemy database session.
192 user_email: Email address of the user to query memberships for.
194 Returns:
195 Dict mapping team_id to the user's role in that team.
196 Returns empty dict on DB errors (safe default — headers stay masked).
197 """
198 try:
199 # First-Party
200 from mcpgateway.db import EmailTeamMember # pylint: disable=import-outside-toplevel
202 rows = db.query(EmailTeamMember.team_id, EmailTeamMember.role).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).all()
203 return {r.team_id: r.role for r in rows}
204 except Exception:
205 return {}
208def _resolve_teams_from_db_sync(email: str, is_admin: bool) -> Optional[List[str]]:
209 """Resolve teams synchronously with L1 cache support.
211 Used by StreamableHTTP transport which runs in a sync context.
212 Checks the in-memory L1 cache before falling back to DB.
214 Args:
215 email: User email address
216 is_admin: Whether the user is an admin
218 Returns:
219 None (admin bypass), [] (no teams), or list of team ID strings
220 """
221 if is_admin:
222 return None # Admin bypass
224 cache_key = f"{email}:True"
226 # Check L1 in-memory cache (sync-safe, no network I/O)
227 try:
228 # First-Party
229 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
231 entry = auth_cache._teams_list_cache.get(cache_key) # pylint: disable=protected-access
232 if entry and not entry.is_expired():
233 auth_cache._hit_count += 1 # pylint: disable=protected-access
234 return entry.value
235 except Exception: # nosec B110 - Cache unavailable is non-fatal
236 pass
238 # Cache miss: query DB
239 team_ids = _get_user_team_ids_sync(email)
241 # Populate L1 cache for subsequent requests
242 try:
243 # Standard
244 import time # pylint: disable=import-outside-toplevel
246 # First-Party
247 from mcpgateway.cache.auth_cache import auth_cache, CacheEntry # pylint: disable=import-outside-toplevel
249 with auth_cache._lock: # pylint: disable=protected-access
250 auth_cache._teams_list_cache[cache_key] = CacheEntry( # pylint: disable=protected-access
251 value=team_ids,
252 expiry=time.time() + auth_cache._teams_list_ttl, # pylint: disable=protected-access
253 )
254 except Exception: # nosec B110 - Cache write failure is non-fatal
255 pass
257 return team_ids
260async def _resolve_teams_from_db(email: str, user_info) -> Optional[List[str]]:
261 """Resolve teams for session tokens from DB/cache.
263 For admin users, returns None (admin bypass).
264 For non-admin users, returns the full list of team IDs from DB/cache.
266 Args:
267 email: User email address
268 user_info: User dict or EmailUser instance
270 Returns:
271 None (admin bypass), [] (no teams), or list of team ID strings
272 """
273 is_admin = user_info.get("is_admin", False) if isinstance(user_info, dict) else getattr(user_info, "is_admin", False)
274 if is_admin:
275 return None # Admin bypass
277 # Try auth cache first
278 try:
279 # First-Party
280 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
282 cached_teams = await auth_cache.get_user_teams(f"{email}:True")
283 if cached_teams is not None:
284 return cached_teams
285 except Exception: # nosec B110 - Cache unavailable is non-fatal, fall through to DB
286 pass
288 # Cache miss: query DB
289 team_ids = await asyncio.to_thread(_get_user_team_ids_sync, email)
291 # Cache the result
292 try:
293 # First-Party
294 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
296 await auth_cache.set_user_teams(f"{email}:True", team_ids)
297 except Exception: # nosec B110 - Cache write failure is non-fatal
298 pass
300 return team_ids
303def normalize_token_teams(payload: Dict[str, Any]) -> Optional[List[str]]:
304 """
305 Normalize token teams to a canonical form for consistent security checks.
307 SECURITY: This is the single source of truth for token team normalization.
308 All code paths that read token teams should use this function.
310 Rules:
311 - "teams" key missing → [] (public-only, secure default)
312 - "teams" is null + is_admin=true → None (admin bypass, sees all)
313 - "teams" is null + is_admin=false → [] (public-only, no bypass for non-admins)
314 - "teams" is [] → [] (explicit public-only)
315 - "teams" is [...] → normalized list of string IDs
317 Args:
318 payload: The JWT payload dict
320 Returns:
321 None for admin bypass, [] for public-only, or list of normalized team ID strings
322 """
323 # Check if "teams" key exists (distinguishes missing from explicit null)
324 if "teams" not in payload:
325 # Missing teams key → public-only (secure default)
326 return []
328 teams = payload.get("teams")
330 if teams is None:
331 # Explicit null - only allow admin bypass if is_admin is true
332 # Check BOTH top-level is_admin AND nested user.is_admin
333 is_admin = payload.get("is_admin", False)
334 if not is_admin:
335 user_info = payload.get("user", {})
336 is_admin = user_info.get("is_admin", False) if isinstance(user_info, dict) else False
337 if is_admin:
338 # Admin with explicit null teams → admin bypass (sees all)
339 return None
340 # Non-admin with null teams → public-only (no bypass)
341 return []
343 # teams is a list - normalize to string IDs
344 # Handle both dict format [{"id": "team1"}] and string format ["team1"]
345 normalized: List[str] = []
346 for team in teams:
347 if isinstance(team, dict):
348 team_id = team.get("id")
349 if team_id:
350 normalized.append(str(team_id))
351 elif isinstance(team, str):
352 normalized.append(team)
353 return normalized
356async def get_team_from_token(payload: Dict[str, Any]) -> Optional[str]:
357 """
358 Extract the team ID from an authentication token payload.
360 SECURITY: This function uses secure-first defaults:
361 - Missing teams key = public-only (no personal team fallback)
362 - Empty teams list = public-only (no team access)
363 - Teams with values = use first team ID
365 This prevents privilege escalation where missing claims could grant
366 unintended team access.
368 Args:
369 payload (Dict[str, Any]):
370 The token payload. Expected fields:
371 - "sub" (str): The user's unique identifier (email).
372 - "teams" (List[str], optional): List containing team ID.
374 Returns:
375 Optional[str]:
376 The resolved team ID. Returns `None` if teams is missing or empty.
378 Examples:
379 >>> import asyncio
380 >>> # --- Case 1: Token has team ---
381 >>> payload = {"sub": "user@example.com", "teams": ["team_456"]}
382 >>> asyncio.run(get_team_from_token(payload))
383 'team_456'
385 >>> # --- Case 2: Token has explicit empty teams (public-only) ---
386 >>> payload = {"sub": "user@example.com", "teams": []}
387 >>> asyncio.run(get_team_from_token(payload)) # Returns None
388 >>> # None
390 >>> # --- Case 3: Token has no teams key (secure default) ---
391 >>> payload = {"sub": "user@example.com"}
392 >>> asyncio.run(get_team_from_token(payload)) # Returns None
393 >>> # None
394 """
395 teams = payload.get("teams")
397 # SECURITY: Treat missing teams as public-only (secure default)
398 # - teams is None (missing key): Public-only (secure default, no legacy fallback)
399 # - teams == [] (explicit empty list): Public-only, no team access
400 # - teams == [...] (has teams): Use first team
401 # Admin bypass is handled separately via is_admin flag in token, not via missing teams
402 if teams is None or len(teams) == 0:
403 # Missing teams or explicit empty = public-only, no fallback to personal team
404 return None
406 # Has teams - use the first one
407 team_id = teams[0]
408 if isinstance(team_id, dict):
409 team_id = team_id.get("id")
410 return team_id
413def _check_token_revoked_sync(jti: str) -> bool:
414 """Synchronous helper to check if a token is revoked.
416 This runs in a thread pool to avoid blocking the event loop.
418 Args:
419 jti: The JWT ID to check.
421 Returns:
422 True if the token is revoked, False otherwise.
423 """
424 with fresh_db_session() as db:
425 # Third-Party
426 from sqlalchemy import select # pylint: disable=import-outside-toplevel
428 # First-Party
429 from mcpgateway.db import TokenRevocation # pylint: disable=import-outside-toplevel
431 result = db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti))
432 return result.scalar_one_or_none() is not None
435def _lookup_api_token_sync(token_hash: str) -> Optional[Dict[str, Any]]:
436 """Synchronous helper to look up an API token by hash.
438 This runs in a thread pool to avoid blocking the event loop.
440 Args:
441 token_hash: SHA256 hash of the API token.
443 Returns:
444 Dict with token info if found and active, None otherwise.
445 """
446 with fresh_db_session() as db:
447 # Third-Party
448 from sqlalchemy import select # pylint: disable=import-outside-toplevel
450 # First-Party
451 from mcpgateway.db import EmailApiToken, utc_now # pylint: disable=import-outside-toplevel
453 result = db.execute(select(EmailApiToken).where(EmailApiToken.token_hash == token_hash, EmailApiToken.is_active.is_(True)))
454 api_token = result.scalar_one_or_none()
456 if api_token:
457 # Check expiration
458 if api_token.expires_at and api_token.expires_at < datetime.now(timezone.utc):
459 return {"expired": True}
461 # Check revocation
462 # First-Party
463 from mcpgateway.db import TokenRevocation # pylint: disable=import-outside-toplevel
465 revoke_result = db.execute(select(TokenRevocation).where(TokenRevocation.jti == api_token.jti))
466 if revoke_result.scalar_one_or_none() is not None:
467 return {"revoked": True}
469 # Update last_used timestamp
470 api_token.last_used = utc_now()
471 db.commit()
473 return {
474 "user_email": api_token.user_email,
475 "jti": api_token.jti,
476 }
477 return None
480def _get_sync_redis_client():
481 """Get or create module-level synchronous Redis client for rate-limiting.
483 Returns:
484 Redis client or None if Redis is unavailable/disabled.
485 """
486 global _SYNC_REDIS_CLIENT, _SYNC_REDIS_FAILURE_TIME # pylint: disable=global-statement
488 # Standard
489 import logging as log # pylint: disable=import-outside-toplevel,reimported
490 import time # pylint: disable=import-outside-toplevel
492 # First-Party
493 from mcpgateway.config import settings as config_settings # pylint: disable=import-outside-toplevel,reimported
495 # Quick check without lock
496 if _SYNC_REDIS_CLIENT is not None or not (config_settings.redis_url and config_settings.redis_url.strip() and config_settings.cache_type == "redis"):
497 return _SYNC_REDIS_CLIENT
499 # Backoff after recent failure (30 seconds)
500 if _SYNC_REDIS_FAILURE_TIME and (time.time() - _SYNC_REDIS_FAILURE_TIME < 30):
501 return None
503 # Lazy initialization with lock
504 with _SYNC_REDIS_LOCK:
505 # Double-check after acquiring lock
506 if _SYNC_REDIS_CLIENT is not None:
507 return _SYNC_REDIS_CLIENT
509 try:
510 # Third-Party
511 import redis # pylint: disable=import-outside-toplevel
513 _SYNC_REDIS_CLIENT = redis.from_url(config_settings.redis_url, decode_responses=True, socket_connect_timeout=2, socket_timeout=2)
514 # Test connection
515 _SYNC_REDIS_CLIENT.ping()
516 _SYNC_REDIS_FAILURE_TIME = None # Clear failure state on success
517 log.getLogger(__name__).debug("Sync Redis client initialized for API token rate-limiting")
518 except Exception as e:
519 log.getLogger(__name__).debug(f"Sync Redis client unavailable: {e}")
520 _SYNC_REDIS_CLIENT = None
521 _SYNC_REDIS_FAILURE_TIME = time.time()
523 return _SYNC_REDIS_CLIENT
526def _update_api_token_last_used_sync(jti: str) -> None:
527 """Update last_used timestamp for an API token with rate-limiting.
529 This function is called when an API token is successfully validated via JWT,
530 ensuring the last_used field stays current for monitoring and security audits.
532 Rate-limiting: Uses Redis cache (or in-memory fallback) to track the last
533 update time and only writes to the database if the configured interval has
534 elapsed. This prevents excessive DB writes on high-traffic tokens.
536 Args:
537 jti: JWT ID of the API token
539 Note:
540 Called via asyncio.to_thread() to avoid blocking the event loop.
541 Uses fresh_db_session() for thread-safe database access.
542 """
543 # Standard
544 import time # pylint: disable=import-outside-toplevel,redefined-outer-name
546 # First-Party
547 from mcpgateway.config import settings as config_settings # pylint: disable=import-outside-toplevel,reimported
549 # Rate-limiting cache key
550 cache_key = f"api_token_last_used:{jti}"
551 update_interval_seconds = config_settings.token_last_used_update_interval_minutes * 60
553 # Try Redis rate-limiting first (if available)
554 redis_client = _get_sync_redis_client()
555 if redis_client:
556 try:
557 last_update = redis_client.get(cache_key)
558 if last_update:
559 # Check if enough time has elapsed
560 time_since_update = time.time() - float(last_update)
561 if time_since_update < update_interval_seconds:
562 return # Skip update - too soon
564 # Update DB and cache
565 with fresh_db_session() as db:
566 # Third-Party
567 from sqlalchemy import select # pylint: disable=import-outside-toplevel
569 # First-Party
570 from mcpgateway.db import EmailApiToken, utc_now # pylint: disable=import-outside-toplevel
572 result = db.execute(select(EmailApiToken).where(EmailApiToken.jti == jti))
573 api_token = result.scalar_one_or_none()
574 if api_token:
575 api_token.last_used = utc_now()
576 db.commit()
577 # Update Redis cache with current timestamp
578 redis_client.setex(cache_key, update_interval_seconds * 2, str(time.time()))
579 return
580 except Exception as exc:
581 # Redis failed, fall through to in-memory cache
582 logger = logging.getLogger(__name__)
583 logger.debug("Redis unavailable for API token rate-limiting, using in-memory fallback: %s", exc)
585 # Fallback: In-memory cache (module-level dict with threading.Lock for thread-safety)
586 # Note: This is per-process and won't work in multi-worker deployments
587 # but provides basic rate-limiting when Redis is unavailable
588 max_cache_size = 1024 # Prevent unbounded growth
590 with _LAST_USED_CACHE_LOCK:
591 last_update = _LAST_USED_CACHE.get(jti)
592 if last_update:
593 time_since_update = time.time() - last_update
594 if time_since_update < update_interval_seconds:
595 return # Skip update - too soon
597 # Update DB and cache
598 with fresh_db_session() as db:
599 # Third-Party
600 from sqlalchemy import select # pylint: disable=import-outside-toplevel
602 # First-Party
603 from mcpgateway.db import EmailApiToken, utc_now # pylint: disable=import-outside-toplevel
605 result = db.execute(select(EmailApiToken).where(EmailApiToken.jti == jti))
606 api_token = result.scalar_one_or_none()
607 if api_token:
608 api_token.last_used = utc_now()
609 db.commit()
610 # Update in-memory cache (with lock for thread-safety)
611 with _LAST_USED_CACHE_LOCK:
612 if len(_LAST_USED_CACHE) >= max_cache_size:
613 # Evict oldest entries (by timestamp value)
614 sorted_keys = sorted(_LAST_USED_CACHE, key=_LAST_USED_CACHE.get) # type: ignore[arg-type]
615 for k in sorted_keys[: len(_LAST_USED_CACHE) // 2]:
616 del _LAST_USED_CACHE[k]
617 _LAST_USED_CACHE[jti] = time.time()
620def _is_api_token_jti_sync(jti: str) -> bool:
621 """Check if JTI belongs to an API token (legacy fallback) - SYNC version.
623 Used for tokens created before auth_provider was added to the payload.
624 Called via asyncio.to_thread() to avoid blocking the event loop.
626 SECURITY: Fail-closed on DB errors. If we can't verify the token isn't
627 an API token, treat it as one to preserve the hard-block policy.
629 Args:
630 jti: JWT ID to check
632 Returns:
633 bool: True if JTI exists in email_api_tokens table OR if lookup fails
634 """
635 # Third-Party
636 from sqlalchemy import select # pylint: disable=import-outside-toplevel
638 # First-Party
639 from mcpgateway.db import EmailApiToken # pylint: disable=import-outside-toplevel
641 try:
642 with fresh_db_session() as db:
643 result = db.execute(select(EmailApiToken.id).where(EmailApiToken.jti == jti).limit(1))
644 return result.scalar_one_or_none() is not None
645 except Exception as e:
646 logging.getLogger(__name__).warning(f"Legacy API token check failed, failing closed: {e}")
647 return True # FAIL-CLOSED: treat as API token to preserve hard-block
650def _get_user_by_email_sync(email: str) -> Optional[EmailUser]:
651 """Synchronous helper to get user by email.
653 This runs in a thread pool to avoid blocking the event loop.
655 Args:
656 email: The user's email address.
658 Returns:
659 EmailUser if found, None otherwise.
660 """
661 with fresh_db_session() as db:
662 # Third-Party
663 from sqlalchemy import select # pylint: disable=import-outside-toplevel
665 result = db.execute(select(EmailUser).where(EmailUser.email == email))
666 user = result.scalar_one_or_none()
667 if user:
668 # Detach from session and return a copy of attributes
669 # since the session will be closed
670 return EmailUser(
671 email=user.email,
672 password_hash=user.password_hash,
673 full_name=user.full_name,
674 is_admin=user.is_admin,
675 is_active=user.is_active,
676 auth_provider=user.auth_provider,
677 password_change_required=user.password_change_required,
678 email_verified_at=user.email_verified_at,
679 created_at=user.created_at,
680 updated_at=user.updated_at,
681 )
682 return None
685def _resolve_plugin_authenticated_user_sync(user_dict: Dict[str, Any]) -> Optional[EmailUser]:
686 """Resolve plugin-authenticated user against database-backed identity state.
688 Plugin hooks may authenticate a request and return identity claims. This
689 helper enforces that admin status is always derived from the database record.
691 Behavior:
692 - Existing DB user: return DB user (authoritative for is_admin/is_active).
693 - Missing DB user and REQUIRE_USER_IN_DB=true: reject (None).
694 - Missing DB user and REQUIRE_USER_IN_DB=false: allow a non-admin virtual
695 user built from non-privileged plugin claims.
697 Args:
698 user_dict: Identity claims returned by plugin auth hook.
700 Returns:
701 EmailUser when identity is accepted, otherwise None.
702 """
703 email = str(user_dict.get("email") or "").strip()
704 if not email:
705 return None
707 db_user = _get_user_by_email_sync(email)
708 if db_user:
709 return db_user
711 if settings.require_user_in_db:
712 return None
714 return EmailUser(
715 email=email,
716 password_hash=user_dict.get("password_hash", ""),
717 full_name=user_dict.get("full_name"),
718 is_admin=False,
719 is_active=user_dict.get("is_active", True),
720 auth_provider=user_dict.get("auth_provider", "local"),
721 password_change_required=user_dict.get("password_change_required", False),
722 email_verified_at=user_dict.get("email_verified_at"),
723 created_at=user_dict.get("created_at", datetime.now(timezone.utc)),
724 updated_at=user_dict.get("updated_at", datetime.now(timezone.utc)),
725 )
728def _get_auth_context_batched_sync(email: str, jti: Optional[str] = None) -> Dict[str, Any]:
729 """Batched auth context lookup in a single DB session.
731 Combines what were 3 separate asyncio.to_thread calls into 1:
732 1. _get_user_by_email_sync - user data
733 2. _get_personal_team_sync - personal team ID
734 3. _check_token_revoked_sync - token revocation status
735 4. _get_user_team_ids - all active team memberships (for session tokens)
737 This reduces thread pool contention and DB connection overhead.
739 Args:
740 email: User email address
741 jti: JWT ID for revocation check (optional)
743 Returns:
744 Dict with keys: user (dict or None), personal_team_id (str or None),
745 is_token_revoked (bool), team_ids (list of str)
747 Examples:
748 >>> # This function runs in a thread pool
749 >>> # result = _get_auth_context_batched_sync("test@example.com", "jti-123")
750 >>> # result["is_token_revoked"] # False if not revoked
751 """
752 with fresh_db_session() as db:
753 # Third-Party
754 from sqlalchemy import select # pylint: disable=import-outside-toplevel
756 # First-Party
757 from mcpgateway.db import EmailTeam, EmailTeamMember, TokenRevocation # pylint: disable=import-outside-toplevel
759 result = {
760 "user": None,
761 "personal_team_id": None,
762 "is_token_revoked": False, # nosec B105 - boolean flag, not a password
763 "team_ids": [],
764 }
766 # Query 1: Get user data
767 user_result = db.execute(select(EmailUser).where(EmailUser.email == email))
768 user = user_result.scalar_one_or_none()
770 if user:
771 # Detach user data as dict (session will close)
772 result["user"] = {
773 "email": user.email,
774 "password_hash": user.password_hash,
775 "full_name": user.full_name,
776 "is_admin": user.is_admin,
777 "is_active": user.is_active,
778 "auth_provider": user.auth_provider,
779 "password_change_required": user.password_change_required,
780 "email_verified_at": user.email_verified_at,
781 "created_at": user.created_at,
782 "updated_at": user.updated_at,
783 }
785 # Query 2: Get personal team (only if user exists)
786 team_result = db.execute(
787 select(EmailTeam)
788 .join(EmailTeamMember)
789 .where(
790 EmailTeamMember.user_email == email,
791 EmailTeam.is_personal.is_(True),
792 )
793 )
794 personal_team = team_result.scalar_one_or_none()
795 if personal_team:
796 result["personal_team_id"] = personal_team.id
798 # Query 4: Get all active team memberships (for session token team resolution)
799 team_ids_result = db.execute(
800 select(EmailTeamMember.team_id).where(
801 EmailTeamMember.user_email == email,
802 EmailTeamMember.is_active.is_(True),
803 )
804 )
805 result["team_ids"] = [row[0] for row in team_ids_result.all()]
807 # Query 3: Check token revocation (if JTI provided)
808 if jti:
809 revoke_result = db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti))
810 result["is_token_revoked"] = revoke_result.scalar_one_or_none() is not None
812 return result
815def _user_from_cached_dict(user_dict: Dict[str, Any]) -> EmailUser:
816 """Create EmailUser instance from cached dict.
818 Args:
819 user_dict: User data dictionary from cache
821 Returns:
822 EmailUser instance (detached from any session)
823 """
824 return EmailUser(
825 email=user_dict["email"],
826 password_hash=user_dict.get("password_hash", ""),
827 full_name=user_dict.get("full_name"),
828 is_admin=user_dict.get("is_admin", False),
829 is_active=user_dict.get("is_active", True),
830 auth_provider=user_dict.get("auth_provider", "local"),
831 password_change_required=user_dict.get("password_change_required", False),
832 email_verified_at=user_dict.get("email_verified_at"),
833 created_at=user_dict.get("created_at", datetime.now(timezone.utc)),
834 updated_at=user_dict.get("updated_at", datetime.now(timezone.utc)),
835 )
838async def get_current_user(
839 credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
840 request: Request = None, # type: ignore[assignment]
841) -> EmailUser:
842 """Get current authenticated user from JWT token with revocation checking.
844 Supports plugin-based custom authentication via HTTP_AUTH_RESOLVE_USER hook.
846 Args:
847 credentials: HTTP authorization credentials
848 request: Optional request object for plugin hooks
850 Returns:
851 EmailUser: Authenticated user
853 Raises:
854 HTTPException: If authentication fails
855 """
856 logger = logging.getLogger(__name__)
858 async def _set_auth_method_from_payload(payload: dict) -> None:
859 """Set request.state.auth_method based on JWT payload.
861 Args:
862 payload: Decoded JWT payload
863 """
864 if not request:
865 return
867 # NOTE: Cannot use structural check (scopes dict) because email login JWTs
868 # also have scopes dict (see email_auth.py:160)
869 user_info = payload.get("user", {})
870 auth_provider = user_info.get("auth_provider")
872 if auth_provider == "api_token":
873 request.state.auth_method = "api_token"
874 jti = payload.get("jti")
875 if jti:
876 request.state.jti = jti
877 try:
878 await asyncio.to_thread(_update_api_token_last_used_sync, jti)
879 except Exception as e:
880 logger.debug(f"Failed to update API token last_used: {e}")
881 # Continue authentication - last_used update is not critical
882 return
884 if auth_provider:
885 # email, oauth, saml, or any other interactive auth provider
886 request.state.auth_method = "jwt"
887 return
889 # Legacy API token fallback: check if JTI exists in API token table
890 # This handles tokens created before auth_provider was added
891 jti_for_check = payload.get("jti")
892 if jti_for_check:
893 is_legacy_api_token = await asyncio.to_thread(_is_api_token_jti_sync, jti_for_check)
894 if is_legacy_api_token:
895 request.state.auth_method = "api_token"
896 request.state.jti = jti_for_check
897 logger.debug(f"Legacy API token detected via DB lookup (JTI: ...{jti_for_check[-8:]})")
898 try:
899 await asyncio.to_thread(_update_api_token_last_used_sync, jti_for_check)
900 except Exception as e:
901 logger.debug(f"Failed to update legacy API token last_used: {e}")
902 # Continue authentication - last_used update is not critical
903 else:
904 request.state.auth_method = "jwt"
905 else:
906 # No auth_provider or JTI; default to interactive
907 request.state.auth_method = "jwt"
909 # NEW: Custom authentication hook - allows plugins to provide alternative auth
910 # This hook is invoked BEFORE standard JWT/API token validation
911 try:
912 # Get plugin manager singleton
913 plugin_manager = get_plugin_manager()
915 if plugin_manager and plugin_manager.has_hooks_for(HttpHookType.HTTP_AUTH_RESOLVE_USER):
916 # Extract client information
917 client_host = None
918 client_port = None
919 if request and hasattr(request, "client") and request.client:
920 client_host = request.client.host
921 client_port = request.client.port
923 # Serialize credentials for plugin
924 credentials_dict = None
925 if credentials:
926 credentials_dict = {
927 "scheme": credentials.scheme,
928 "credentials": credentials.credentials,
929 }
931 # Extract headers from request
932 # Note: Middleware modifies request.scope["headers"], so request.headers
933 # will automatically reflect any modifications made by HTTP_PRE_REQUEST hooks
934 headers = {}
935 if request and hasattr(request, "headers"):
936 headers = dict(request.headers)
938 # Get request ID from correlation ID context (set by CorrelationIDMiddleware)
939 request_id = get_correlation_id()
940 if not request_id:
941 # Fallback chain for safety
942 if request and hasattr(request, "state") and hasattr(request.state, "request_id"):
943 request_id = request.state.request_id
944 else:
945 request_id = uuid.uuid4().hex
946 logger.debug(f"Generated fallback request ID in get_current_user: {request_id}")
948 # Get plugin contexts from request state if available
949 global_context = getattr(request.state, "plugin_global_context", None) if request else None
950 if not global_context:
951 # Create global context
952 global_context = GlobalContext(
953 request_id=request_id,
954 server_id=None,
955 tenant_id=None,
956 )
958 context_table = getattr(request.state, "plugin_context_table", None) if request else None
960 # Invoke custom auth resolution hook
961 # violations_as_exceptions=True so PluginViolationError is raised for explicit denials
962 auth_result, context_table_result = await plugin_manager.invoke_hook(
963 HttpHookType.HTTP_AUTH_RESOLVE_USER,
964 payload=HttpAuthResolveUserPayload(
965 credentials=credentials_dict,
966 headers=HttpHeaderPayload(root=headers),
967 client_host=client_host,
968 client_port=client_port,
969 ),
970 global_context=global_context,
971 local_contexts=context_table,
972 violations_as_exceptions=True, # Raise PluginViolationError for auth denials
973 )
975 # If plugin successfully authenticated user, return it
976 if auth_result.modified_payload and isinstance(auth_result.modified_payload, dict):
977 logger.info("User authenticated via plugin hook")
978 # Resolve plugin claims against DB state so admin flags are authoritative.
979 user_dict = auth_result.modified_payload
980 user = await asyncio.to_thread(_resolve_plugin_authenticated_user_sync, user_dict)
982 if user is None:
983 logger.warning("Plugin auth rejected: user identity could not be resolved against DB policy")
984 raise HTTPException(
985 status_code=status.HTTP_401_UNAUTHORIZED,
986 detail="User not found in database",
987 headers={"WWW-Authenticate": "Bearer"},
988 )
990 if not user.is_active:
991 raise HTTPException(
992 status_code=status.HTTP_401_UNAUTHORIZED,
993 detail="Account disabled",
994 headers={"WWW-Authenticate": "Bearer"},
995 )
997 # Store auth_method in request.state so it can be accessed by RBAC middleware
998 if request and auth_result.metadata:
999 auth_method = auth_result.metadata.get("auth_method")
1000 if auth_method:
1001 request.state.auth_method = auth_method
1002 logger.debug(f"Stored auth_method '{auth_method}' in request.state")
1004 if request and context_table_result:
1005 request.state.plugin_context_table = context_table_result
1007 if request and global_context:
1008 request.state.plugin_global_context = global_context
1010 if plugin_manager and plugin_manager.config.plugin_settings.include_user_info:
1011 _inject_userinfo_instate(request, user)
1013 return user
1014 # If continue_processing=True (no payload), fall through to standard auth
1016 except PluginViolationError as e:
1017 # Plugin explicitly denied authentication with custom message
1018 logger.warning(f"Authentication denied by plugin: {e.message}")
1019 raise HTTPException(
1020 status_code=status.HTTP_401_UNAUTHORIZED,
1021 detail=e.message, # Use plugin's custom error message
1022 headers={"WWW-Authenticate": "Bearer"},
1023 )
1024 except HTTPException:
1025 # Re-raise HTTP exceptions
1026 raise
1027 except Exception as e:
1028 # Log but don't fail on plugin errors - fall back to standard auth
1029 logger.warning(f"HTTP_AUTH_RESOLVE_USER hook failed, falling back to standard auth: {e}")
1031 # EXISTING: Standard authentication (JWT, API tokens)
1032 if not credentials:
1033 logger.warning("No credentials provided")
1034 raise HTTPException(
1035 status_code=status.HTTP_401_UNAUTHORIZED,
1036 detail="Authentication required",
1037 headers={"WWW-Authenticate": "Bearer"},
1038 )
1040 logger.debug("Attempting authentication with bearer credentials")
1041 email = None
1043 try:
1044 # Try JWT token first using the centralized verify_jwt_token_cached function
1045 logger.debug("Attempting JWT token validation")
1046 payload = await verify_jwt_token_cached(credentials.credentials, request)
1048 logger.debug("JWT token validated successfully")
1049 # Extract user identifier (support both new and legacy token formats)
1050 email = payload.get("sub")
1051 if email is None:
1052 # Try legacy format
1053 email = payload.get("email")
1055 if email is None:
1056 logger.debug("No email/sub found in JWT payload")
1057 raise HTTPException(
1058 status_code=status.HTTP_401_UNAUTHORIZED,
1059 detail="Invalid token",
1060 headers={"WWW-Authenticate": "Bearer"},
1061 )
1063 logger.debug("JWT authentication successful for email: %s", email)
1065 # Extract JTI for revocation check
1066 jti = payload.get("jti")
1068 # === AUTH CACHING: Check cache before DB queries ===
1069 if settings.auth_cache_enabled:
1070 try:
1071 # First-Party
1072 from mcpgateway.cache.auth_cache import auth_cache, CachedAuthContext # pylint: disable=import-outside-toplevel
1074 cached_ctx = await auth_cache.get_auth_context(email, jti)
1075 if cached_ctx:
1076 logger.debug(f"Auth cache hit for {email}")
1078 # Check revocation from cache
1079 if cached_ctx.is_token_revoked:
1080 raise HTTPException(
1081 status_code=status.HTTP_401_UNAUTHORIZED,
1082 detail="Token has been revoked",
1083 headers={"WWW-Authenticate": "Bearer"},
1084 )
1086 # Check user active status from cache
1087 if cached_ctx.user and not cached_ctx.user.get("is_active", True):
1088 raise HTTPException(
1089 status_code=status.HTTP_401_UNAUTHORIZED,
1090 detail="Account disabled",
1091 headers={"WWW-Authenticate": "Bearer"},
1092 )
1094 # Resolve teams based on token_use
1095 if request:
1096 token_use = payload.get("token_use")
1097 request.state.token_use = token_use
1099 if token_use == "session": # nosec B105 - Not a password; token_use is a JWT claim type
1100 # Session token: resolve teams from DB/cache
1101 user_info = cached_ctx.user or {"is_admin": False}
1102 teams = await _resolve_teams_from_db(email, user_info)
1103 else:
1104 # API token or legacy: use embedded teams
1105 teams = normalize_token_teams(payload)
1107 request.state.token_teams = teams
1109 # Set team_id: only for single-team API tokens
1110 if teams is None:
1111 request.state.team_id = None
1112 elif len(teams) == 1 and token_use != "session": # nosec B105
1113 request.state.team_id = teams[0] if isinstance(teams[0], str) else teams[0].get("id")
1114 else:
1115 request.state.team_id = None
1117 await _set_auth_method_from_payload(payload)
1119 # Return user from cache
1120 if cached_ctx.user:
1121 # When require_user_in_db is enabled, verify user still exists in DB
1122 # This prevents stale cache from bypassing strict mode
1123 if settings.require_user_in_db:
1124 db_user = await asyncio.to_thread(_get_user_by_email_sync, email)
1125 if db_user is None:
1126 logger.warning(
1127 f"Authentication rejected for {email}: cached user not found in database. " "REQUIRE_USER_IN_DB is enabled.",
1128 extra={"security_event": "user_not_in_db_rejected", "user_id": email},
1129 )
1130 raise HTTPException(
1131 status_code=status.HTTP_401_UNAUTHORIZED,
1132 detail="User not found in database",
1133 headers={"WWW-Authenticate": "Bearer"},
1134 )
1136 if plugin_manager and plugin_manager.config.plugin_settings.include_user_info:
1137 _inject_userinfo_instate(request, _user_from_cached_dict(cached_ctx.user))
1139 return _user_from_cached_dict(cached_ctx.user)
1141 # User not in cache but context was (shouldn't happen, but handle it)
1142 logger.debug("Auth context cached but user missing, falling through to DB")
1144 except HTTPException:
1145 raise
1146 except Exception as cache_error:
1147 logger.debug(f"Auth cache check failed, falling through to DB: {cache_error}")
1149 # === BATCHED QUERIES: Single DB call for user + team + revocation ===
1150 if settings.auth_cache_batch_queries:
1151 try:
1152 auth_ctx = await asyncio.to_thread(_get_auth_context_batched_sync, email, jti)
1154 # Check revocation
1155 if auth_ctx.get("is_token_revoked"):
1156 raise HTTPException(
1157 status_code=status.HTTP_401_UNAUTHORIZED,
1158 detail="Token has been revoked",
1159 headers={"WWW-Authenticate": "Bearer"},
1160 )
1162 # Resolve teams based on token_use
1163 token_use = payload.get("token_use")
1164 if token_use == "session": # nosec B105 - Not a password; token_use is a JWT claim type
1165 # Session token: use team_ids from batched query
1166 user_dict = auth_ctx.get("user")
1167 is_admin = user_dict.get("is_admin", False) if user_dict else False
1168 if is_admin:
1169 teams = None # Admin bypass
1170 else:
1171 teams = auth_ctx.get("team_ids", [])
1172 else:
1173 # API token or legacy: use embedded teams
1174 teams = normalize_token_teams(payload)
1176 # Set team_id: only for single-team API tokens
1177 if teams is None:
1178 team_id = None
1179 elif len(teams) == 1 and token_use != "session": # nosec B105
1180 team_id = teams[0] if isinstance(teams[0], str) else teams[0].get("id")
1181 else:
1182 team_id = None
1184 if request:
1185 request.state.token_teams = teams
1186 request.state.team_id = team_id
1187 request.state.token_use = token_use
1188 await _set_auth_method_from_payload(payload)
1190 # Store in cache for future requests
1191 if settings.auth_cache_enabled:
1192 try:
1193 # First-Party
1194 from mcpgateway.cache.auth_cache import auth_cache, CachedAuthContext # noqa: F811 pylint: disable=import-outside-toplevel
1196 await auth_cache.set_auth_context(
1197 email,
1198 jti,
1199 CachedAuthContext(
1200 user=auth_ctx.get("user"),
1201 personal_team_id=auth_ctx.get("personal_team_id"),
1202 is_token_revoked=auth_ctx.get("is_token_revoked", False),
1203 ),
1204 )
1205 # Also populate teams-list cache so cached-path requests
1206 # don't need an extra DB query via _resolve_teams_from_db()
1207 if token_use == "session" and teams is not None: # nosec B105
1208 await auth_cache.set_user_teams(f"{email}:True", teams)
1209 except Exception as cache_set_error:
1210 logger.debug(f"Failed to cache auth context: {cache_set_error}")
1212 # Create user from batched result
1213 if auth_ctx.get("user"):
1214 user_dict = auth_ctx["user"]
1215 if not user_dict.get("is_active", True):
1216 raise HTTPException(
1217 status_code=status.HTTP_401_UNAUTHORIZED,
1218 detail="Account disabled",
1219 headers={"WWW-Authenticate": "Bearer"},
1220 )
1221 # Store user for return at end of function
1222 # We'll check platform admin case and return below
1223 _batched_user = _user_from_cached_dict(user_dict)
1224 else:
1225 _batched_user = None
1227 # Handle user not found case
1228 if _batched_user is None:
1229 # Check if strict user-in-DB mode is enabled
1230 if settings.require_user_in_db:
1231 logger.warning(
1232 f"Authentication rejected for {email}: user not found in database. " "REQUIRE_USER_IN_DB is enabled.",
1233 extra={"security_event": "user_not_in_db_rejected", "user_id": email},
1234 )
1235 raise HTTPException(
1236 status_code=status.HTTP_401_UNAUTHORIZED,
1237 detail="User not found in database",
1238 headers={"WWW-Authenticate": "Bearer"},
1239 )
1241 # Platform admin bootstrap (only when REQUIRE_USER_IN_DB=false)
1242 if email == getattr(settings, "platform_admin_email", "admin@example.com"):
1243 logger.info(
1244 f"Platform admin bootstrap authentication for {email}. " "User authenticated via platform admin configuration.",
1245 extra={"security_event": "platform_admin_bootstrap", "user_id": email},
1246 )
1247 _batched_user = EmailUser(
1248 email=email,
1249 password_hash="", # nosec B106
1250 full_name=getattr(settings, "platform_admin_full_name", "Platform Administrator"),
1251 is_admin=True,
1252 is_active=True,
1253 auth_provider="local",
1254 password_change_required=False,
1255 email_verified_at=datetime.now(timezone.utc),
1256 created_at=datetime.now(timezone.utc),
1257 updated_at=datetime.now(timezone.utc),
1258 )
1259 else:
1260 raise HTTPException(
1261 status_code=status.HTTP_401_UNAUTHORIZED,
1262 detail="User not found",
1263 headers={"WWW-Authenticate": "Bearer"},
1264 )
1266 if plugin_manager and plugin_manager.config.plugin_settings.include_user_info:
1267 _inject_userinfo_instate(request, _batched_user)
1269 return _batched_user
1271 except HTTPException:
1272 raise
1273 except Exception as batch_error:
1274 logger.warning(f"Batched auth query failed, falling back to individual queries: {batch_error}")
1276 # === FALLBACK: Original individual queries (if batching disabled or failed) ===
1277 if jti:
1278 try:
1279 is_revoked = await asyncio.to_thread(_check_token_revoked_sync, jti)
1280 if is_revoked:
1281 raise HTTPException(
1282 status_code=status.HTTP_401_UNAUTHORIZED,
1283 detail="Token has been revoked",
1284 headers={"WWW-Authenticate": "Bearer"},
1285 )
1286 except HTTPException:
1287 raise
1288 except Exception as revoke_check_error:
1289 # Fail-secure: if the revocation check itself errors, reject the token.
1290 # Allowing through on error would let revoked tokens bypass enforcement
1291 # when the DB is unreachable or the table is missing.
1292 logger.warning(f"Token revocation check failed for JTI {jti} — denying access (fail-secure): {revoke_check_error}")
1293 raise HTTPException(
1294 status_code=status.HTTP_401_UNAUTHORIZED,
1295 detail="Token validation failed",
1296 headers={"WWW-Authenticate": "Bearer"},
1297 )
1299 # Resolve teams based on token_use
1300 token_use = payload.get("token_use")
1301 if token_use == "session": # nosec B105 - Not a password; token_use is a JWT claim type
1302 # Session token: resolve teams from DB/cache (fallback path — separate query OK)
1303 user_info = {"is_admin": payload.get("is_admin", False) or payload.get("user", {}).get("is_admin", False)}
1304 normalized_teams = await _resolve_teams_from_db(email, user_info)
1305 else:
1306 # API token or legacy: use embedded teams
1307 normalized_teams = normalize_token_teams(payload)
1309 # Set team_id: only for single-team API tokens
1310 if normalized_teams is None:
1311 team_id = None
1312 elif len(normalized_teams) == 1 and token_use != "session": # nosec B105
1313 team_id = normalized_teams[0] if isinstance(normalized_teams[0], str) else normalized_teams[0].get("id")
1314 else:
1315 team_id = None
1317 if request:
1318 request.state.token_teams = normalized_teams
1319 request.state.team_id = team_id
1320 request.state.token_use = token_use
1321 # Store JTI for use in middleware (e.g., token usage logging)
1322 if jti:
1323 request.state.jti = jti
1324 await _set_auth_method_from_payload(payload)
1326 except HTTPException:
1327 # Re-raise HTTPException from verify_jwt_token (handles expired/invalid tokens)
1328 raise
1329 except Exception as jwt_error:
1330 # JWT validation failed, try database API token
1331 # Uses fresh DB session via asyncio.to_thread to avoid blocking event loop
1332 logger.debug("JWT validation failed with error: %s, trying database API token", jwt_error)
1333 try:
1334 token_hash = hashlib.sha256(credentials.credentials.encode()).hexdigest()
1336 # Lookup API token using fresh session in thread pool
1337 api_token_info = await asyncio.to_thread(_lookup_api_token_sync, token_hash)
1338 logger.debug(f"Database lookup result: {api_token_info is not None}")
1340 if api_token_info:
1341 # Check for error conditions returned by helper
1342 if api_token_info.get("expired"):
1343 raise HTTPException(
1344 status_code=status.HTTP_401_UNAUTHORIZED,
1345 detail="API token expired",
1346 headers={"WWW-Authenticate": "Bearer"},
1347 )
1349 if api_token_info.get("revoked"):
1350 raise HTTPException(
1351 status_code=status.HTTP_401_UNAUTHORIZED,
1352 detail="API token has been revoked",
1353 headers={"WWW-Authenticate": "Bearer"},
1354 )
1356 # Use the email from the API token
1357 email = api_token_info["user_email"]
1358 logger.debug(f"API token authentication successful for email: {email}")
1360 # Set auth_method for database API tokens
1361 if request:
1362 request.state.auth_method = "api_token"
1363 request.state.user_email = api_token_info["user_email"]
1364 # Store JTI for use in middleware
1365 if "jti" in api_token_info:
1366 request.state.jti = api_token_info["jti"]
1367 else:
1368 logger.debug("API token not found in database")
1369 logger.debug("No valid authentication method found")
1370 # Neither JWT nor API token worked
1371 raise HTTPException(
1372 status_code=status.HTTP_401_UNAUTHORIZED,
1373 detail="Invalid authentication credentials",
1374 headers={"WWW-Authenticate": "Bearer"},
1375 )
1376 except HTTPException:
1377 # Re-raise HTTP exceptions
1378 raise
1379 except Exception as e:
1380 # Neither JWT nor API token validation worked
1381 logger.debug(f"Database API token validation failed with exception: {e}")
1382 raise HTTPException(
1383 status_code=status.HTTP_401_UNAUTHORIZED,
1384 detail="Invalid authentication credentials",
1385 headers={"WWW-Authenticate": "Bearer"},
1386 )
1388 # Get user from database using fresh session in thread pool
1389 user = await asyncio.to_thread(_get_user_by_email_sync, email)
1391 if user is None:
1392 # Check if strict user-in-DB mode is enabled
1393 if settings.require_user_in_db:
1394 logger.warning(
1395 f"Authentication rejected for {email}: user not found in database. " "REQUIRE_USER_IN_DB is enabled.",
1396 extra={"security_event": "user_not_in_db_rejected", "user_id": email},
1397 )
1398 raise HTTPException(
1399 status_code=status.HTTP_401_UNAUTHORIZED,
1400 detail="User not found in database",
1401 headers={"WWW-Authenticate": "Bearer"},
1402 )
1404 # Platform admin bootstrap (only when REQUIRE_USER_IN_DB=false)
1405 # If user doesn't exist but token is valid and email matches platform admin,
1406 # create a virtual admin user object
1407 if email == getattr(settings, "platform_admin_email", "admin@example.com"):
1408 logger.info(
1409 f"Platform admin bootstrap authentication for {email}. " "User authenticated via platform admin configuration.",
1410 extra={"security_event": "platform_admin_bootstrap", "user_id": email},
1411 )
1412 # Create a virtual admin user for authentication purposes
1413 user = EmailUser(
1414 email=email,
1415 password_hash="", # nosec B106 - Not used for JWT authentication
1416 full_name=getattr(settings, "platform_admin_full_name", "Platform Administrator"),
1417 is_admin=True,
1418 is_active=True,
1419 auth_provider="local",
1420 password_change_required=False,
1421 email_verified_at=datetime.now(timezone.utc),
1422 created_at=datetime.now(timezone.utc),
1423 updated_at=datetime.now(timezone.utc),
1424 )
1425 else:
1426 raise HTTPException(
1427 status_code=status.HTTP_401_UNAUTHORIZED,
1428 detail="User not found",
1429 headers={"WWW-Authenticate": "Bearer"},
1430 )
1432 if not user.is_active:
1433 raise HTTPException(
1434 status_code=status.HTTP_401_UNAUTHORIZED,
1435 detail="Account disabled",
1436 headers={"WWW-Authenticate": "Bearer"},
1437 )
1439 if plugin_manager and plugin_manager.config.plugin_settings.include_user_info:
1440 _inject_userinfo_instate(request, user)
1442 return user
1445def _inject_userinfo_instate(request: Optional[object] = None, user: Optional[EmailUser] = None) -> None:
1446 """This function injects user related information into the plugin_global_context, if the config has
1447 include_user_info key set as true.
1449 Args:
1450 request: Optional request object for plugin hooks
1451 user: User related information
1452 """
1454 logger = logging.getLogger(__name__)
1455 # Get request ID from correlation ID context (set by CorrelationIDMiddleware)
1456 request_id = get_correlation_id()
1457 if not request_id:
1458 # Fallback chain for safety
1459 if request and hasattr(request, "state") and hasattr(request.state, "request_id"):
1460 request_id = request.state.request_id
1461 else:
1462 request_id = uuid.uuid4().hex
1463 logger.debug(f"Generated fallback request ID in get_current_user: {request_id}")
1465 # Get plugin contexts from request state if available
1466 global_context = getattr(request.state, "plugin_global_context", None) if request else None
1467 if not global_context:
1468 # Create global context
1469 global_context = GlobalContext(
1470 request_id=request_id,
1471 server_id=None,
1472 tenant_id=None,
1473 )
1475 if user:
1476 if not global_context.user:
1477 global_context.user = {}
1478 global_context.user["email"] = user.email
1479 global_context.user["is_admin"] = user.is_admin
1480 global_context.user["full_name"] = user.full_name
1482 if request and global_context:
1483 request.state.plugin_global_context = global_context