Coverage for mcpgateway / auth.py: 98%
663 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/auth.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Shared authentication utilities.
9This module provides common authentication functions that can be shared
10across different parts of the application without creating circular imports.
11"""
13# Standard
14import asyncio
15from datetime import datetime, timezone
16import hashlib
17import logging
18import threading
19from typing import Any, Dict, Generator, List, Never, Optional
20import uuid
22# Third-Party
23from fastapi import Depends, HTTPException, status
24from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
25from sqlalchemy.orm import Session
26from starlette.requests import Request
28# First-Party
29from mcpgateway.common.validators import SecurityValidator
30from mcpgateway.config import settings
31from mcpgateway.db import EmailUser, fresh_db_session, SessionLocal
32from mcpgateway.plugins.framework import get_plugin_manager, GlobalContext, HttpAuthResolveUserPayload, HttpHeaderPayload, HttpHookType, PluginViolationError
33from mcpgateway.utils.correlation_id import get_correlation_id
34from mcpgateway.utils.trace_context import (
35 clear_trace_context,
36 set_trace_auth_method,
37 set_trace_context_from_teams,
38 set_trace_team_scope,
39 set_trace_user_email,
40 set_trace_user_is_admin,
41)
42from mcpgateway.utils.verify_credentials import verify_jwt_token_cached
44# Security scheme
45security = HTTPBearer(auto_error=False)
47# Module-level sync Redis client for rate-limiting (lazy-initialized)
48_SYNC_REDIS_CLIENT = None # pylint: disable=invalid-name
49_SYNC_REDIS_LOCK = threading.Lock()
50_SYNC_REDIS_FAILURE_TIME: Optional[float] = None # Backoff after connection failures
52# Module-level in-memory cache for last_used rate-limiting (fallback when Redis unavailable)
53_LAST_USED_CACHE: dict = {}
54_LAST_USED_CACHE_LOCK = threading.Lock()
57def _log_auth_event(
58 logger: logging.Logger,
59 message: str,
60 level: int = logging.INFO,
61 user_id: Optional[str] = None,
62 auth_method: Optional[str] = None,
63 auth_success: bool = False,
64 security_event: Optional[str] = None,
65 security_severity: str = "low",
66 **extra_context,
67) -> None:
68 """Log authentication event with structured context and request_id.
70 This helper creates structured log records that include request_id from the
71 correlation ID context, enabling end-to-end tracing of authentication flows.
73 Args:
74 logger: Logger instance to use
75 message: Log message
76 level: Log level (default: INFO)
77 user_id: User identifier
78 auth_method: Authentication method used (jwt, api_token, etc.)
79 auth_success: Whether authentication succeeded
80 security_event: Type of security event (authentication, authorization, etc.)
81 security_severity: Severity level (low, medium, high, critical)
82 **extra_context: Additional context fields
83 """
84 # Get request_id from correlation ID context
85 request_id = get_correlation_id()
87 # Build structured log record
88 extra = {
89 "request_id": request_id,
90 "entity_type": "auth",
91 "auth_success": auth_success,
92 "security_event": security_event or "authentication",
93 "security_severity": security_severity,
94 }
96 if user_id:
97 extra["user_id"] = user_id
98 if auth_method:
99 extra["auth_method"] = auth_method
101 # Add any additional context
102 extra.update(extra_context)
104 # Log with structured context
105 logger.log(level, message, extra=extra)
108def get_db() -> Generator[Session, Never, None]:
109 """Database dependency.
111 Commits the transaction on successful completion to avoid implicit rollbacks
112 for read-only operations. Rolls back explicitly on exception.
114 Yields:
115 Session: SQLAlchemy database session
117 Raises:
118 Exception: Re-raises any exception after rolling back the transaction.
120 Examples:
121 >>> db_gen = get_db()
122 >>> db = next(db_gen)
123 >>> hasattr(db, 'query')
124 True
125 >>> hasattr(db, 'close')
126 True
127 """
128 db = SessionLocal()
129 try:
130 yield db
131 db.commit()
132 except Exception:
133 try:
134 db.rollback()
135 except Exception:
136 try:
137 db.invalidate()
138 except Exception:
139 pass # nosec B110 - Best effort cleanup on connection failure
140 raise
141 finally:
142 db.close()
145def _get_personal_team_sync(user_email: str) -> Optional[str]:
146 """Synchronous helper to get user's personal team using a fresh DB session.
148 This runs in a thread pool to avoid blocking the event loop.
150 Args:
151 user_email: The user's email address.
153 Returns:
154 The personal team ID, or None if not found.
155 """
156 with fresh_db_session() as db:
157 # Third-Party
158 from sqlalchemy import select # pylint: disable=import-outside-toplevel
160 # First-Party
161 from mcpgateway.db import EmailTeam, EmailTeamMember # pylint: disable=import-outside-toplevel
163 result = db.execute(select(EmailTeam).join(EmailTeamMember).where(EmailTeamMember.user_email == user_email, EmailTeam.is_personal.is_(True)))
164 personal_team = result.scalar_one_or_none()
165 return personal_team.id if personal_team else None
168def _get_user_team_ids_sync(email: str) -> List[str]:
169 """Query all active team IDs for a user (including personal teams).
171 Uses a fresh DB session so this can be called from thread pool.
172 Matches the behavior of user.get_teams() which returns all active memberships.
174 Args:
175 email: User email address
177 Returns:
178 List of team ID strings
179 """
180 with fresh_db_session() as db:
181 # Third-Party
182 from sqlalchemy import select # pylint: disable=import-outside-toplevel
184 # First-Party
185 from mcpgateway.db import EmailTeamMember # pylint: disable=import-outside-toplevel
187 result = db.execute(
188 select(EmailTeamMember.team_id).where(
189 EmailTeamMember.user_email == email,
190 EmailTeamMember.is_active.is_(True),
191 )
192 )
193 return [row[0] for row in result.all()]
196def _get_team_name_by_id_sync(team_id: Optional[str]) -> Optional[str]:
197 """Return the active team display name for a team ID.
199 Args:
200 team_id: Team identifier to resolve.
202 Returns:
203 Team display name when the active team exists, otherwise ``None``.
204 """
205 if not team_id:
206 return None
208 with fresh_db_session() as db:
209 # Third-Party
210 from sqlalchemy import select # pylint: disable=import-outside-toplevel
212 # First-Party
213 from mcpgateway.db import EmailTeam # pylint: disable=import-outside-toplevel
215 result = db.execute(
216 select(EmailTeam.name).where(
217 EmailTeam.id == team_id,
218 EmailTeam.is_active.is_(True),
219 )
220 )
221 return result.scalar_one_or_none()
224def _extract_claim_team_name(payload: Dict[str, Any], team_id: Optional[str]) -> Optional[str]:
225 """Extract a matching team display name from raw JWT team claims.
227 Args:
228 payload: Decoded JWT payload.
229 team_id: Normalized primary team identifier to match.
231 Returns:
232 Matching team display name from the JWT claims, if present.
233 """
234 if not team_id:
235 return None
237 raw_teams = payload.get("teams")
238 if not isinstance(raw_teams, list):
239 return None
241 for raw_team in raw_teams:
242 raw_team_id = None
243 raw_team_name = None
244 if isinstance(raw_team, dict):
245 raw_team_id = raw_team.get("id")
246 raw_team_name = raw_team.get("name")
247 elif isinstance(raw_team, str):
248 raw_team_id = raw_team
250 if str(raw_team_id).strip() != team_id:
251 continue
253 if raw_team_name is None:
254 return None
256 normalized_name = str(raw_team_name).strip()
257 return normalized_name or None
259 return None
262async def resolve_trace_team_name(
263 payload: Dict[str, Any],
264 token_teams: Optional[List[str]],
265 *,
266 preresolved_team_names: Optional[Dict[str, str]] = None,
267) -> Optional[str]:
268 """Resolve the primary team display name for tracing.
270 The primary team name is additive trace metadata only. It does not affect
271 scope enforcement, which continues to rely on canonical team IDs. For
272 session tokens, DB-resolved membership is authoritative and raw JWT team
273 display names are only used as a best-effort fallback for non-session
274 tokens when no canonical name can be resolved.
276 Args:
277 payload: Decoded JWT payload.
278 token_teams: Canonical resolved team IDs, or ``None`` for admin scope.
279 preresolved_team_names: Optional mapping of team_id to display name from
280 a batched DB lookup.
282 Returns:
283 Display name for the primary concrete team, or ``None`` for public/admin
284 scopes or when the name cannot be resolved.
285 """
286 if not token_teams:
287 return None
289 primary_team_id = token_teams[0]
290 if preresolved_team_names:
291 resolved_name = preresolved_team_names.get(primary_team_id)
292 if resolved_name:
293 return resolved_name
295 try:
296 resolved_name = await asyncio.to_thread(_get_team_name_by_id_sync, primary_team_id)
297 if resolved_name:
298 return resolved_name
299 except Exception as exc:
300 logging.getLogger(__name__).debug("Failed to resolve trace team name for team_id=%s: %s", primary_team_id, exc)
302 if payload.get("token_use") == "session":
303 return None
305 claim_team_name = _extract_claim_team_name(payload, primary_team_id)
306 if claim_team_name:
307 return claim_team_name
309 return None
312def get_user_team_roles(db, user_email: str) -> Dict[str, str]:
313 """Return a {team_id: role} mapping for a user's active team memberships.
315 Args:
316 db: SQLAlchemy database session.
317 user_email: Email address of the user to query memberships for.
319 Returns:
320 Dict mapping team_id to the user's role in that team.
321 Returns empty dict on DB errors (safe default — headers stay masked).
322 """
323 try:
324 # First-Party
325 from mcpgateway.db import EmailTeamMember # pylint: disable=import-outside-toplevel
327 rows = db.query(EmailTeamMember.team_id, EmailTeamMember.role).filter(EmailTeamMember.user_email == user_email, EmailTeamMember.is_active.is_(True)).all()
328 return {r.team_id: r.role for r in rows}
329 except Exception:
330 return {}
333def _narrow_by_jwt_teams(payload: Dict[str, Any], db_teams: Optional[List[str]]) -> Optional[List[str]]:
334 """Apply JWT intersection policy to DB-resolved teams.
336 If *db_teams* is ``None`` (admin bypass), returns ``None`` immediately.
337 If the JWT ``teams`` claim is a non-empty list, returns the intersection
338 of *db_teams* and the JWT teams. If the intersection is empty (e.g.
339 all JWT-claimed teams have been revoked), returns ``[]`` so that
340 downstream enforcement denies the request rather than silently
341 broadening scope.
343 Args:
344 payload: The decoded JWT payload dict.
345 db_teams: Teams resolved from the database, or ``None`` for admin bypass.
347 Returns:
348 None (admin bypass), [] (public-only / empty intersection), or list of team ID strings.
349 """
350 if db_teams is None:
351 return None
353 jwt_teams = payload.get("teams")
354 if isinstance(jwt_teams, list) and jwt_teams:
355 # Non-empty JWT teams → intersect with DB teams. An empty
356 # intersection (all JWT teams revoked) returns [], which gives
357 # public-only access and lets downstream enforcement deny the
358 # request (fail-closed).
359 jwt_team_set = set(normalize_token_teams({"teams": jwt_teams}) or [])
360 return [t for t in db_teams if t in jwt_team_set]
362 # JWT teams absent, null, or empty list → no narrowing requested.
363 # An explicit ``teams: []`` means "don't restrict by team" (i.e. use
364 # the full DB membership), which intentionally differs from
365 # ``normalize_token_teams`` where ``[] → public-only``. The
366 # distinction exists because session tokens always start from DB-
367 # resolved teams — an empty JWT claim simply means the caller did not
368 # request a subset.
369 return db_teams
372async def _resolve_teams_from_db(email: str, user_info) -> Optional[List[str]]:
373 """Resolve teams for session tokens from DB/cache.
375 For admin users, returns None (admin bypass).
376 For non-admin users, returns the full list of team IDs from DB/cache.
378 Args:
379 email: User email address
380 user_info: User dict or EmailUser instance
382 Returns:
383 None (admin bypass), [] (no teams), or list of team ID strings
384 """
385 is_admin = user_info.get("is_admin", False) if isinstance(user_info, dict) else getattr(user_info, "is_admin", False)
386 if is_admin:
387 return None # Admin bypass
389 # Try auth cache first
390 try:
391 # First-Party
392 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
394 cached_teams = await auth_cache.get_user_teams(f"{email}:True")
395 if cached_teams is not None:
396 return cached_teams
397 except Exception: # nosec B110 - Cache unavailable is non-fatal, fall through to DB
398 pass
400 # Cache miss: query DB
401 team_ids = await asyncio.to_thread(_get_user_team_ids_sync, email)
403 # Cache the result
404 try:
405 # First-Party
406 from mcpgateway.cache.auth_cache import auth_cache # pylint: disable=import-outside-toplevel
408 await auth_cache.set_user_teams(f"{email}:True", team_ids)
409 except Exception: # nosec B110 - Cache write failure is non-fatal
410 pass
412 return team_ids
415_UNSET: Any = object() # sentinel distinguishing "not supplied" from explicit None
418async def resolve_session_teams(
419 payload: Dict[str, Any],
420 email: Optional[str],
421 user_info,
422 *,
423 preresolved_db_teams: Optional[List[str]] = _UNSET,
424) -> Optional[List[str]]:
425 """Resolve teams for a session token, using DB as the authority.
427 The database is always consulted first so that revoked team memberships
428 take effect immediately. If the JWT carries a ``teams`` claim, the
429 result is narrowed to the **intersection** of the DB teams and the JWT
430 teams — this lets callers scope a session to a subset of their actual
431 memberships (e.g. single-team mode) without risking stale grants.
433 This is the **single policy point** for session-token team resolution.
434 All code paths that need teams for a session token should call this
435 function rather than inlining the decision.
437 If *email* is ``None`` or empty, returns ``[]`` (public-only). An
438 identity-less session token never receives admin bypass — there is no
439 user to resolve from the database.
441 Policy:
442 1. If *email* is falsy, return ``[]`` immediately (public-only).
443 2. Resolve teams from DB/cache (``_resolve_teams_from_db``), or
444 use *preresolved_db_teams* when the caller already fetched them
445 (e.g. via a batched query).
446 3. If DB returns ``None`` (admin bypass), return ``None``.
447 4. If the JWT ``teams`` claim is a non-empty list, intersect with
448 DB teams. If the intersection is empty (all JWT-claimed teams
449 revoked), return ``[]`` so downstream enforcement denies the
450 request.
451 5. Otherwise return the full DB result.
453 Args:
454 payload: The decoded JWT payload dict.
455 email: User email address (for the DB lookup), or ``None``.
456 user_info: User dict or EmailUser instance (for admin detection).
457 preresolved_db_teams: If the caller already resolved DB teams (e.g.
458 from a batched query), pass them here to skip the DB call.
459 Pass ``None`` to indicate admin bypass was already determined.
461 Returns:
462 None (admin bypass), [] (public-only), or list of team ID strings.
463 """
464 if not email:
465 return [] # No identity — public-only; never admin bypass
466 if preresolved_db_teams is not _UNSET:
467 db_teams: Optional[List[str]] = preresolved_db_teams
468 else:
469 db_teams = await _resolve_teams_from_db(email, user_info)
471 return _narrow_by_jwt_teams(payload, db_teams)
474def normalize_token_teams(payload: Dict[str, Any]) -> Optional[List[str]]:
475 """
476 Normalize token teams to a canonical form for consistent security checks.
478 SECURITY: This is the single source of truth for token team normalization.
479 All code paths that read token teams should use this function.
481 Rules:
482 - "teams" key missing → [] (public-only, secure default)
483 - "teams" is null + is_admin=true → None (admin bypass, sees all)
484 - "teams" is null + is_admin=false → [] (public-only, no bypass for non-admins)
485 - "teams" is [] → [] (explicit public-only)
486 - "teams" is [...] → normalized list of string IDs
488 Args:
489 payload: The JWT payload dict
491 Returns:
492 None for admin bypass, [] for public-only, or list of normalized team ID strings
493 """
494 # Check if "teams" key exists (distinguishes missing from explicit null)
495 if "teams" not in payload:
496 # Missing teams key → public-only (secure default)
497 return []
499 teams = payload.get("teams")
501 if teams is None:
502 # Explicit null - only allow admin bypass if is_admin is true
503 # Check BOTH top-level is_admin AND nested user.is_admin
504 is_admin = payload.get("is_admin", False)
505 if not is_admin:
506 user_info = payload.get("user", {})
507 is_admin = user_info.get("is_admin", False) if isinstance(user_info, dict) else False
508 if is_admin:
509 # Admin with explicit null teams → admin bypass (sees all)
510 return None
511 # Non-admin with null teams → public-only (no bypass)
512 return []
514 # teams is a list - normalize to string IDs
515 # Handle both dict format [{"id": "team1"}] and string format ["team1"]
516 normalized: List[str] = []
517 for team in teams:
518 if isinstance(team, dict):
519 team_id = team.get("id")
520 if team_id:
521 normalized.append(str(team_id))
522 elif isinstance(team, str):
523 normalized.append(team)
524 return normalized
527async def get_team_from_token(payload: Dict[str, Any]) -> Optional[str]:
528 """
529 Extract the team ID from an authentication token payload.
531 SECURITY: This function uses secure-first defaults:
532 - Missing teams key = public-only (no personal team fallback)
533 - Empty teams list = public-only (no team access)
534 - Teams with values = use first team ID
536 This prevents privilege escalation where missing claims could grant
537 unintended team access.
539 Args:
540 payload (Dict[str, Any]):
541 The token payload. Expected fields:
542 - "sub" (str): The user's unique identifier (email).
543 - "teams" (List[str], optional): List containing team ID.
545 Returns:
546 Optional[str]:
547 The resolved team ID. Returns `None` if teams is missing or empty.
549 Examples:
550 >>> import asyncio
551 >>> # --- Case 1: Token has team ---
552 >>> payload = {"sub": "user@example.com", "teams": ["team_456"]}
553 >>> asyncio.run(get_team_from_token(payload))
554 'team_456'
556 >>> # --- Case 2: Token has explicit empty teams (public-only) ---
557 >>> payload = {"sub": "user@example.com", "teams": []}
558 >>> asyncio.run(get_team_from_token(payload)) # Returns None
559 >>> # None
561 >>> # --- Case 3: Token has no teams key (secure default) ---
562 >>> payload = {"sub": "user@example.com"}
563 >>> asyncio.run(get_team_from_token(payload)) # Returns None
564 >>> # None
565 """
566 teams = payload.get("teams")
568 # SECURITY: Treat missing teams as public-only (secure default)
569 # - teams is None (missing key): Public-only (secure default, no legacy fallback)
570 # - teams == [] (explicit empty list): Public-only, no team access
571 # - teams == [...] (has teams): Use first team
572 # Admin bypass is handled separately via is_admin flag in token, not via missing teams
573 if teams is None or len(teams) == 0:
574 # Missing teams or explicit empty = public-only, no fallback to personal team
575 return None
577 # Has teams - use the first one
578 team_id = teams[0]
579 if isinstance(team_id, dict):
580 team_id = team_id.get("id")
581 return team_id
584def _check_token_revoked_sync(jti: str) -> bool:
585 """Synchronous helper to check if a token is revoked.
587 This runs in a thread pool to avoid blocking the event loop.
589 Args:
590 jti: The JWT ID to check.
592 Returns:
593 True if the token is revoked, False otherwise.
594 """
595 with fresh_db_session() as db:
596 # Third-Party
597 from sqlalchemy import select # pylint: disable=import-outside-toplevel
599 # First-Party
600 from mcpgateway.db import TokenRevocation # pylint: disable=import-outside-toplevel
602 result = db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti))
603 return result.scalar_one_or_none() is not None
606def _lookup_api_token_sync(token_hash: str) -> Optional[Dict[str, Any]]:
607 """Synchronous helper to look up an API token by hash.
609 This runs in a thread pool to avoid blocking the event loop.
611 Args:
612 token_hash: SHA256 hash of the API token.
614 Returns:
615 Dict with token info if found and active, None otherwise.
616 """
617 with fresh_db_session() as db:
618 # Third-Party
619 from sqlalchemy import select # pylint: disable=import-outside-toplevel
621 # First-Party
622 from mcpgateway.db import EmailApiToken, utc_now # pylint: disable=import-outside-toplevel
624 result = db.execute(select(EmailApiToken).where(EmailApiToken.token_hash == token_hash, EmailApiToken.is_active.is_(True)))
625 api_token = result.scalar_one_or_none()
627 if not api_token:
628 return None
630 # Check expiration
631 if api_token.expires_at:
632 expires_at = api_token.expires_at.replace(tzinfo=timezone.utc) if api_token.expires_at.tzinfo is None else api_token.expires_at
633 if utc_now() > expires_at:
634 return {"expired": True}
636 # Check revocation
637 # First-Party
638 from mcpgateway.db import TokenRevocation # pylint: disable=import-outside-toplevel
640 revoke_result = db.execute(select(TokenRevocation).where(TokenRevocation.jti == api_token.jti))
641 if revoke_result.scalar_one_or_none() is not None:
642 return {"revoked": True}
644 # Update last_used timestamp
645 api_token.last_used = utc_now()
646 db.commit()
648 return {
649 "user_email": api_token.user_email,
650 "jti": api_token.jti,
651 }
654def _get_sync_redis_client():
655 """Get or create module-level synchronous Redis client for rate-limiting.
657 Returns:
658 Redis client or None if Redis is unavailable/disabled.
659 """
660 global _SYNC_REDIS_CLIENT, _SYNC_REDIS_FAILURE_TIME # pylint: disable=global-statement
662 # Standard
663 import logging as log # pylint: disable=import-outside-toplevel,reimported
664 import time # pylint: disable=import-outside-toplevel
666 # First-Party
667 from mcpgateway.config import settings as config_settings # pylint: disable=import-outside-toplevel,reimported
669 # Quick check without lock
670 if _SYNC_REDIS_CLIENT is not None or not (config_settings.redis_url and config_settings.redis_url.strip() and config_settings.cache_type == "redis"):
671 return _SYNC_REDIS_CLIENT
673 # Backoff after recent failure (30 seconds)
674 if _SYNC_REDIS_FAILURE_TIME and (time.time() - _SYNC_REDIS_FAILURE_TIME < 30):
675 return None
677 # Lazy initialization with lock
678 with _SYNC_REDIS_LOCK:
679 # Double-check after acquiring lock
680 if _SYNC_REDIS_CLIENT is not None:
681 return _SYNC_REDIS_CLIENT
683 try:
684 # Third-Party
685 import redis # pylint: disable=import-outside-toplevel
687 _SYNC_REDIS_CLIENT = redis.from_url(config_settings.redis_url, decode_responses=True, socket_connect_timeout=2, socket_timeout=2)
688 # Test connection
689 _SYNC_REDIS_CLIENT.ping()
690 _SYNC_REDIS_FAILURE_TIME = None # Clear failure state on success
691 log.getLogger(__name__).debug("Sync Redis client initialized for API token rate-limiting")
692 except Exception as e:
693 log.getLogger(__name__).debug(f"Sync Redis client unavailable: {e}")
694 _SYNC_REDIS_CLIENT = None
695 _SYNC_REDIS_FAILURE_TIME = time.time()
697 return _SYNC_REDIS_CLIENT
700def _update_api_token_last_used_sync(jti: str) -> None:
701 """Update last_used timestamp for an API token with rate-limiting.
703 This function is called when an API token is successfully validated via JWT,
704 ensuring the last_used field stays current for monitoring and security audits.
706 Rate-limiting: Uses Redis cache (or in-memory fallback) to track the last
707 update time and only writes to the database if the configured interval has
708 elapsed. This prevents excessive DB writes on high-traffic tokens.
710 Args:
711 jti: JWT ID of the API token
713 Note:
714 Called via asyncio.to_thread() to avoid blocking the event loop.
715 Uses fresh_db_session() for thread-safe database access.
716 """
717 # Standard
718 import time # pylint: disable=import-outside-toplevel,redefined-outer-name
720 # First-Party
721 from mcpgateway.config import settings as config_settings # pylint: disable=import-outside-toplevel,reimported
723 # Rate-limiting cache key
724 cache_key = f"api_token_last_used:{jti}"
725 update_interval_seconds = config_settings.token_last_used_update_interval_minutes * 60
727 # Try Redis rate-limiting first (if available)
728 redis_client = _get_sync_redis_client()
729 if redis_client:
730 try:
731 last_update = redis_client.get(cache_key)
732 if last_update:
733 # Check if enough time has elapsed
734 time_since_update = time.time() - float(last_update)
735 if time_since_update < update_interval_seconds:
736 return # Skip update - too soon
738 # Update DB and cache
739 with fresh_db_session() as db:
740 # Third-Party
741 from sqlalchemy import select # pylint: disable=import-outside-toplevel
743 # First-Party
744 from mcpgateway.db import EmailApiToken, utc_now # pylint: disable=import-outside-toplevel
746 result = db.execute(select(EmailApiToken).where(EmailApiToken.jti == jti))
747 api_token = result.scalar_one_or_none()
748 if api_token:
749 api_token.last_used = utc_now()
750 db.commit()
751 # Update Redis cache with current timestamp
752 redis_client.setex(cache_key, update_interval_seconds * 2, str(time.time()))
753 return
754 except Exception as exc:
755 # Redis failed, fall through to in-memory cache
756 logger = logging.getLogger(__name__)
757 logger.debug("Redis unavailable for API token rate-limiting, using in-memory fallback: %s", exc)
759 # Fallback: In-memory cache (module-level dict with threading.Lock for thread-safety)
760 # Note: This is per-process and won't work in multi-worker deployments
761 # but provides basic rate-limiting when Redis is unavailable
762 max_cache_size = 1024 # Prevent unbounded growth
764 with _LAST_USED_CACHE_LOCK:
765 last_update = _LAST_USED_CACHE.get(jti)
766 if last_update:
767 time_since_update = time.time() - last_update
768 if time_since_update < update_interval_seconds:
769 return # Skip update - too soon
771 # Update DB and cache
772 with fresh_db_session() as db:
773 # Third-Party
774 from sqlalchemy import select # pylint: disable=import-outside-toplevel
776 # First-Party
777 from mcpgateway.db import EmailApiToken, utc_now # pylint: disable=import-outside-toplevel
779 result = db.execute(select(EmailApiToken).where(EmailApiToken.jti == jti))
780 api_token = result.scalar_one_or_none()
781 if api_token:
782 api_token.last_used = utc_now()
783 db.commit()
784 # Update in-memory cache (with lock for thread-safety)
785 with _LAST_USED_CACHE_LOCK:
786 if len(_LAST_USED_CACHE) >= max_cache_size:
787 # Evict oldest entries (by timestamp value)
788 sorted_keys = sorted(_LAST_USED_CACHE, key=_LAST_USED_CACHE.get) # type: ignore[arg-type]
789 for k in sorted_keys[: len(_LAST_USED_CACHE) // 2]:
790 del _LAST_USED_CACHE[k]
791 _LAST_USED_CACHE[jti] = time.time()
794def _is_api_token_jti_sync(jti: str) -> bool:
795 """Check if JTI belongs to an API token (legacy fallback) - SYNC version.
797 Used for tokens created before auth_provider was added to the payload.
798 Called via asyncio.to_thread() to avoid blocking the event loop.
800 SECURITY: Fail-closed on DB errors. If we can't verify the token isn't
801 an API token, treat it as one to preserve the hard-block policy.
803 Args:
804 jti: JWT ID to check
806 Returns:
807 bool: True if JTI exists in email_api_tokens table OR if lookup fails
808 """
809 # Third-Party
810 from sqlalchemy import select # pylint: disable=import-outside-toplevel
812 # First-Party
813 from mcpgateway.db import EmailApiToken # pylint: disable=import-outside-toplevel
815 try:
816 with fresh_db_session() as db:
817 result = db.execute(select(EmailApiToken.id).where(EmailApiToken.jti == jti).limit(1))
818 return result.scalar_one_or_none() is not None
819 except Exception as e:
820 logging.getLogger(__name__).warning(f"Legacy API token check failed, failing closed: {e}")
821 return True # FAIL-CLOSED: treat as API token to preserve hard-block
824def _get_user_by_email_sync(email: str) -> Optional[EmailUser]:
825 """Synchronous helper to get user by email.
827 This runs in a thread pool to avoid blocking the event loop.
829 Args:
830 email: The user's email address.
832 Returns:
833 EmailUser if found, None otherwise.
834 """
835 with fresh_db_session() as db:
836 # Third-Party
837 from sqlalchemy import select # pylint: disable=import-outside-toplevel
839 result = db.execute(select(EmailUser).where(EmailUser.email == email))
840 user = result.scalar_one_or_none()
841 if user:
842 # Detach from session and return a copy of attributes
843 # since the session will be closed
844 return EmailUser(
845 email=user.email,
846 password_hash=user.password_hash,
847 full_name=user.full_name,
848 is_admin=user.is_admin,
849 is_active=user.is_active,
850 auth_provider=user.auth_provider,
851 password_change_required=user.password_change_required,
852 email_verified_at=user.email_verified_at,
853 created_at=user.created_at,
854 updated_at=user.updated_at,
855 )
856 return None
859def _resolve_plugin_authenticated_user_sync(user_dict: Dict[str, Any]) -> Optional[EmailUser]:
860 """Resolve plugin-authenticated user against database-backed identity state.
862 Plugin hooks may authenticate a request and return identity claims. This
863 helper enforces that admin status is always derived from the database record.
865 Behavior:
866 - Existing DB user: return DB user (authoritative for is_admin/is_active).
867 - Missing DB user and REQUIRE_USER_IN_DB=true: reject (None).
868 - Missing DB user and REQUIRE_USER_IN_DB=false: allow a non-admin virtual
869 user built from non-privileged plugin claims.
871 Args:
872 user_dict: Identity claims returned by plugin auth hook.
874 Returns:
875 EmailUser when identity is accepted, otherwise None.
876 """
877 email = str(user_dict.get("email") or "").strip()
878 if not email:
879 return None
881 db_user = _get_user_by_email_sync(email)
882 if db_user:
883 return db_user
885 if settings.require_user_in_db:
886 return None
888 return EmailUser(
889 email=email,
890 password_hash=user_dict.get("password_hash", ""),
891 full_name=user_dict.get("full_name"),
892 is_admin=False,
893 is_active=user_dict.get("is_active", True),
894 auth_provider=user_dict.get("auth_provider", "local"),
895 password_change_required=user_dict.get("password_change_required", False),
896 email_verified_at=user_dict.get("email_verified_at"),
897 created_at=user_dict.get("created_at", datetime.now(timezone.utc)),
898 updated_at=user_dict.get("updated_at", datetime.now(timezone.utc)),
899 )
902def _get_auth_context_batched_sync(email: str, jti: Optional[str] = None) -> Dict[str, Any]:
903 """Batched auth context lookup in a single DB session.
905 Combines what were 3 separate asyncio.to_thread calls into 1:
906 1. _get_user_by_email_sync - user data
907 2. _get_personal_team_sync - personal team ID
908 3. _check_token_revoked_sync - token revocation status
909 4. _get_user_team_ids - all active team memberships (for session tokens)
911 This reduces thread pool contention and DB connection overhead.
913 Args:
914 email: User email address
915 jti: JWT ID for revocation check (optional)
917 Returns:
918 Dict with keys: user (dict or None), personal_team_id (str or None),
919 is_token_revoked (bool), team_ids (list of str), team_names (dict)
921 Examples:
922 >>> # This function runs in a thread pool
923 >>> # result = _get_auth_context_batched_sync("test@example.com", "jti-123")
924 >>> # result["is_token_revoked"] # False if not revoked
925 """
926 with fresh_db_session() as db:
927 # Third-Party
928 from sqlalchemy import select # pylint: disable=import-outside-toplevel
930 # First-Party
931 from mcpgateway.db import EmailTeam, EmailTeamMember, TokenRevocation # pylint: disable=import-outside-toplevel
933 result = {
934 "user": None,
935 "personal_team_id": None,
936 "is_token_revoked": False, # nosec B105 - boolean flag, not a password
937 "team_ids": [],
938 "team_names": {},
939 }
941 # Query 1: Get user data
942 user_result = db.execute(select(EmailUser).where(EmailUser.email == email))
943 user = user_result.scalar_one_or_none()
945 if user:
946 # Detach user data as dict (session will close)
947 result["user"] = {
948 "email": user.email,
949 "password_hash": user.password_hash,
950 "full_name": user.full_name,
951 "is_admin": user.is_admin,
952 "is_active": user.is_active,
953 "auth_provider": user.auth_provider,
954 "password_change_required": user.password_change_required,
955 "email_verified_at": user.email_verified_at,
956 "created_at": user.created_at,
957 "updated_at": user.updated_at,
958 }
960 # Query 2: Get personal team (only if user exists)
961 team_result = db.execute(
962 select(EmailTeam)
963 .join(EmailTeamMember)
964 .where(
965 EmailTeamMember.user_email == email,
966 EmailTeam.is_personal.is_(True),
967 )
968 )
969 personal_team = team_result.scalar_one_or_none()
970 if personal_team:
971 result["personal_team_id"] = personal_team.id
973 # Query 4: Get all active team memberships (for session token team resolution)
974 team_ids_result = db.execute(
975 select(EmailTeamMember.team_id, EmailTeam.name)
976 .join(EmailTeam, EmailTeam.id == EmailTeamMember.team_id)
977 .where(
978 EmailTeamMember.user_email == email,
979 EmailTeamMember.is_active.is_(True),
980 EmailTeam.is_active.is_(True),
981 )
982 )
983 team_rows = team_ids_result.all()
984 team_ids: list[str] = []
985 team_names: dict[str, str] = {}
987 for row in team_rows:
988 team_id = None
989 team_name = None
991 mapping = getattr(row, "_mapping", None)
992 if mapping is not None:
993 team_id = mapping.get("team_id")
994 team_name = mapping.get("name")
996 if team_id is None:
997 team_id = getattr(row, "team_id", None)
998 if team_name is None:
999 team_name = getattr(row, "name", None)
1001 if team_id is None and isinstance(row, tuple):
1002 team_id = row[0] if len(row) > 0 else None
1003 team_name = row[1] if len(row) > 1 else None
1005 if not team_id:
1006 continue
1008 team_id_str = str(team_id)
1009 team_ids.append(team_id_str)
1010 if team_name:
1011 team_names[team_id_str] = str(team_name)
1013 result["team_ids"] = team_ids
1014 result["team_names"] = team_names
1016 # Query 3: Check token revocation (if JTI provided)
1017 if jti:
1018 revoke_result = db.execute(select(TokenRevocation).where(TokenRevocation.jti == jti))
1019 result["is_token_revoked"] = revoke_result.scalar_one_or_none() is not None
1021 return result
1024def _user_from_cached_dict(user_dict: Dict[str, Any]) -> EmailUser:
1025 """Create EmailUser instance from cached dict.
1027 Args:
1028 user_dict: User data dictionary from cache
1030 Returns:
1031 EmailUser instance (detached from any session)
1032 """
1033 return EmailUser(
1034 email=user_dict["email"],
1035 password_hash=user_dict.get("password_hash", ""),
1036 full_name=user_dict.get("full_name"),
1037 is_admin=user_dict.get("is_admin", False),
1038 is_active=user_dict.get("is_active", True),
1039 auth_provider=user_dict.get("auth_provider", "local"),
1040 password_change_required=user_dict.get("password_change_required", False),
1041 email_verified_at=user_dict.get("email_verified_at"),
1042 created_at=user_dict.get("created_at", datetime.now(timezone.utc)),
1043 updated_at=user_dict.get("updated_at", datetime.now(timezone.utc)),
1044 )
1047async def get_current_user(
1048 credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
1049 request: Request = None, # type: ignore[assignment]
1050) -> EmailUser:
1051 """Get current authenticated user from JWT token with revocation checking.
1053 Supports plugin-based custom authentication via HTTP_AUTH_RESOLVE_USER hook.
1055 Args:
1056 credentials: HTTP authorization credentials
1057 request: Optional request object for plugin hooks
1059 Returns:
1060 EmailUser: Authenticated user
1062 Raises:
1063 HTTPException: If authentication fails
1064 """
1065 logger = logging.getLogger(__name__)
1066 clear_trace_context()
1068 async def _set_auth_method_from_payload(payload: dict) -> None:
1069 """Set request.state.auth_method based on JWT payload.
1071 Args:
1072 payload: Decoded JWT payload
1073 """
1074 if not request:
1075 return
1077 # NOTE: Cannot use structural check (scopes dict) because email login JWTs
1078 # also have scopes dict (see email_auth.py:160)
1079 user_info = payload.get("user", {})
1080 auth_provider = user_info.get("auth_provider")
1082 if auth_provider == "api_token":
1083 request.state.auth_method = "api_token"
1084 jti = payload.get("jti")
1085 if jti:
1086 request.state.jti = jti
1087 try:
1088 await asyncio.to_thread(_update_api_token_last_used_sync, jti)
1089 except Exception as e:
1090 logger.debug(f"Failed to update API token last_used: {e}")
1091 # Continue authentication - last_used update is not critical
1092 return
1094 if auth_provider:
1095 # email, oauth, saml, or any other interactive auth provider
1096 request.state.auth_method = "jwt"
1097 return
1099 # Legacy API token fallback: check if JTI exists in API token table
1100 # This handles tokens created before auth_provider was added
1101 jti_for_check = payload.get("jti")
1102 if jti_for_check:
1103 is_legacy_api_token = await asyncio.to_thread(_is_api_token_jti_sync, jti_for_check)
1104 if is_legacy_api_token:
1105 request.state.auth_method = "api_token"
1106 request.state.jti = jti_for_check
1107 logger.debug(f"Legacy API token detected via DB lookup (JTI: ...{jti_for_check[-8:]})")
1108 try:
1109 await asyncio.to_thread(_update_api_token_last_used_sync, jti_for_check)
1110 except Exception as e:
1111 logger.debug(f"Failed to update legacy API token last_used: {e}")
1112 # Continue authentication - last_used update is not critical
1113 else:
1114 request.state.auth_method = "jwt"
1115 else:
1116 # No auth_provider or JTI; default to interactive
1117 request.state.auth_method = "jwt"
1119 def _set_trace_for_user(user_obj: EmailUser, *, teams: Any = _UNSET, auth_method: Optional[str] = None, team_name: Optional[str] = None) -> None:
1120 """Populate trace context from the resolved user and request state.
1122 Args:
1123 user_obj: Resolved authenticated user object.
1124 teams: Optional resolved team scope override. When unset, team scope is derived from the user object.
1125 auth_method: Optional explicit authentication method label to record on the trace.
1126 team_name: Optional display name for the primary concrete team.
1127 """
1128 resolved_auth_method = auth_method
1129 if resolved_auth_method is None and request:
1130 resolved_auth_method = getattr(request.state, "auth_method", None)
1132 if teams is not _UNSET:
1133 set_trace_context_from_teams(
1134 teams,
1135 user_email=user_obj.email,
1136 is_admin=bool(user_obj.is_admin),
1137 auth_method=resolved_auth_method,
1138 team_name=team_name,
1139 )
1140 return
1142 set_trace_user_email(user_obj.email)
1143 set_trace_user_is_admin(bool(user_obj.is_admin))
1144 if resolved_auth_method:
1145 set_trace_auth_method(resolved_auth_method)
1146 if user_obj.is_admin:
1147 set_trace_team_scope("admin")
1149 # NEW: Custom authentication hook - allows plugins to provide alternative auth
1150 # This hook is invoked BEFORE standard JWT/API token validation
1151 try:
1152 # Get plugin manager singleton
1153 plugin_manager = get_plugin_manager()
1155 if plugin_manager and plugin_manager.has_hooks_for(HttpHookType.HTTP_AUTH_RESOLVE_USER):
1156 # Extract client information
1157 client_host = None
1158 client_port = None
1159 if request and hasattr(request, "client") and request.client:
1160 client_host = request.client.host
1161 client_port = request.client.port
1163 # Serialize credentials for plugin
1164 credentials_dict = None
1165 if credentials:
1166 credentials_dict = {
1167 "scheme": credentials.scheme,
1168 "credentials": credentials.credentials,
1169 }
1171 # Extract headers from request
1172 # Note: Middleware modifies request.scope["headers"], so request.headers
1173 # will automatically reflect any modifications made by HTTP_PRE_REQUEST hooks
1174 headers = {}
1175 if request and hasattr(request, "headers"):
1176 headers = dict(request.headers)
1178 # Get request ID from correlation ID context (set by CorrelationIDMiddleware)
1179 request_id = get_correlation_id()
1180 if not request_id:
1181 # Fallback chain for safety
1182 if request and hasattr(request, "state") and hasattr(request.state, "request_id"):
1183 request_id = request.state.request_id
1184 else:
1185 request_id = uuid.uuid4().hex
1186 logger.debug(f"Generated fallback request ID in get_current_user: {request_id}")
1188 # Get plugin contexts from request state if available
1189 global_context = getattr(request.state, "plugin_global_context", None) if request else None
1190 if not global_context:
1191 # Propagate team_id → tenant_id for by_tenant rate limiting
1192 team_id = getattr(getattr(request, "state", None), "team_id", None) if request else None
1193 global_context = GlobalContext(
1194 request_id=request_id,
1195 server_id=None,
1196 tenant_id=team_id,
1197 )
1199 context_table = getattr(request.state, "plugin_context_table", None) if request else None
1201 # Invoke custom auth resolution hook
1202 # violations_as_exceptions=True so PluginViolationError is raised for explicit denials
1203 auth_result, context_table_result = await plugin_manager.invoke_hook(
1204 HttpHookType.HTTP_AUTH_RESOLVE_USER,
1205 payload=HttpAuthResolveUserPayload(
1206 credentials=credentials_dict,
1207 headers=HttpHeaderPayload(root=headers),
1208 client_host=client_host,
1209 client_port=client_port,
1210 ),
1211 global_context=global_context,
1212 local_contexts=context_table,
1213 violations_as_exceptions=True, # Raise PluginViolationError for auth denials
1214 )
1216 # If plugin successfully authenticated user, return it
1217 if auth_result.modified_payload and isinstance(auth_result.modified_payload, dict):
1218 logger.info("User authenticated via plugin hook")
1219 # Resolve plugin claims against DB state so admin flags are authoritative.
1220 user_dict = auth_result.modified_payload
1221 user = await asyncio.to_thread(_resolve_plugin_authenticated_user_sync, user_dict)
1223 if user is None:
1224 logger.warning("Plugin auth rejected: user identity could not be resolved against DB policy")
1225 raise HTTPException(
1226 status_code=status.HTTP_401_UNAUTHORIZED,
1227 detail="User not found in database",
1228 headers={"WWW-Authenticate": "Bearer"},
1229 )
1231 if not user.is_active:
1232 raise HTTPException(
1233 status_code=status.HTTP_401_UNAUTHORIZED,
1234 detail="Account disabled",
1235 headers={"WWW-Authenticate": "Bearer"},
1236 )
1238 # Store auth_method in request.state so it can be accessed by RBAC middleware
1239 if request and auth_result.metadata:
1240 auth_method = auth_result.metadata.get("auth_method")
1241 if auth_method:
1242 request.state.auth_method = auth_method
1243 logger.debug(f"Stored auth_method '{auth_method}' in request.state")
1245 if request and context_table_result:
1246 request.state.plugin_context_table = context_table_result
1248 if request and global_context:
1249 request.state.plugin_global_context = global_context
1251 if plugin_manager and plugin_manager.config.plugin_settings.include_user_info:
1252 _inject_userinfo_instate(request, user)
1253 _propagate_tenant_id(request)
1255 _set_trace_for_user(user)
1256 return user
1257 # If continue_processing=True (no payload), fall through to standard auth
1259 except PluginViolationError as e:
1260 # Plugin explicitly denied authentication with custom message
1261 logger.warning(f"Authentication denied by plugin: {SecurityValidator.sanitize_log_message(e.message)}")
1262 raise HTTPException(
1263 status_code=status.HTTP_401_UNAUTHORIZED,
1264 detail=e.message, # Use plugin's custom error message
1265 headers={"WWW-Authenticate": "Bearer"},
1266 )
1267 except HTTPException:
1268 # Re-raise HTTP exceptions
1269 raise
1270 except Exception as e:
1271 # Log but don't fail on plugin errors - fall back to standard auth
1272 logger.warning(f"HTTP_AUTH_RESOLVE_USER hook failed, falling back to standard auth: {SecurityValidator.sanitize_log_message(str(e))}")
1274 # EXISTING: Standard authentication (JWT, API tokens)
1275 if not credentials:
1276 logger.warning("No credentials provided")
1277 raise HTTPException(
1278 status_code=status.HTTP_401_UNAUTHORIZED,
1279 detail="Authentication required",
1280 headers={"WWW-Authenticate": "Bearer"},
1281 )
1283 logger.debug("Attempting authentication with bearer credentials")
1284 email = None
1286 try:
1287 # Try JWT token first using the centralized verify_jwt_token_cached function
1288 logger.debug("Attempting JWT token validation")
1289 payload = await verify_jwt_token_cached(credentials.credentials, request)
1291 logger.debug("JWT token validated successfully")
1292 # Extract user identifier (support both new and legacy token formats)
1293 email = payload.get("sub")
1294 if email is None:
1295 # Try legacy format
1296 email = payload.get("email")
1298 if email is None:
1299 logger.debug("No email/sub found in JWT payload")
1300 raise HTTPException(
1301 status_code=status.HTTP_401_UNAUTHORIZED,
1302 detail="Invalid token",
1303 headers={"WWW-Authenticate": "Bearer"},
1304 )
1306 logger.debug("JWT authentication successful for email: %s", email)
1308 # Extract JTI for revocation check
1309 jti = payload.get("jti")
1311 # === AUTH CACHING: Check cache before DB queries ===
1312 if settings.auth_cache_enabled:
1313 try:
1314 # First-Party
1315 from mcpgateway.cache.auth_cache import auth_cache, CachedAuthContext # pylint: disable=import-outside-toplevel
1317 cached_ctx = await auth_cache.get_auth_context(email, jti)
1318 if cached_ctx:
1319 logger.debug(f"Auth cache hit for {email}")
1321 # Check revocation from cache
1322 if cached_ctx.is_token_revoked:
1323 raise HTTPException(
1324 status_code=status.HTTP_401_UNAUTHORIZED,
1325 detail="Token has been revoked",
1326 headers={"WWW-Authenticate": "Bearer"},
1327 )
1329 # Check user active status from cache
1330 if cached_ctx.user and not cached_ctx.user.get("is_active", True):
1331 raise HTTPException(
1332 status_code=status.HTTP_401_UNAUTHORIZED,
1333 detail="Account disabled",
1334 headers={"WWW-Authenticate": "Bearer"},
1335 )
1337 # Resolve teams based on token_use
1338 if request:
1339 token_use = payload.get("token_use")
1340 request.state.token_use = token_use
1342 if token_use == "session": # nosec B105 - Not a password; token_use is a JWT claim type
1343 # Session token: resolve teams from DB/cache
1344 user_info = cached_ctx.user or {"is_admin": False}
1345 teams = await resolve_session_teams(payload, email, user_info)
1346 else:
1347 # API token or legacy: use embedded teams
1348 teams = normalize_token_teams(payload)
1350 request.state.token_teams = teams
1352 # Set team_id: only for single-team API tokens
1353 if teams is None:
1354 request.state.team_id = None
1355 elif len(teams) == 1 and token_use != "session": # nosec B105
1356 request.state.team_id = teams[0] if isinstance(teams[0], str) else teams[0].get("id")
1357 else:
1358 request.state.team_id = None
1360 request.state.trace_team_name = await resolve_trace_team_name(payload, teams)
1362 await _set_auth_method_from_payload(payload)
1364 # Return user from cache
1365 if cached_ctx.user:
1366 # When require_user_in_db is enabled, verify user still exists in DB
1367 # This prevents stale cache from bypassing strict mode
1368 if settings.require_user_in_db:
1369 db_user = await asyncio.to_thread(_get_user_by_email_sync, email)
1370 if db_user is None:
1371 logger.warning(
1372 f"Authentication rejected for {email}: cached user not found in database. " "REQUIRE_USER_IN_DB is enabled.",
1373 extra={"security_event": "user_not_in_db_rejected", "user_id": email},
1374 )
1375 raise HTTPException(
1376 status_code=status.HTTP_401_UNAUTHORIZED,
1377 detail="User not found in database",
1378 headers={"WWW-Authenticate": "Bearer"},
1379 )
1381 if plugin_manager and plugin_manager.config.plugin_settings.include_user_info:
1382 _inject_userinfo_instate(request, _user_from_cached_dict(cached_ctx.user))
1383 _propagate_tenant_id(request)
1385 cached_user = _user_from_cached_dict(cached_ctx.user)
1386 _set_trace_for_user(
1387 cached_user,
1388 teams=getattr(request.state, "token_teams", _UNSET) if request else _UNSET,
1389 team_name=getattr(request.state, "trace_team_name", None) if request else None,
1390 )
1391 return cached_user
1393 # User not in cache but context was (shouldn't happen, but handle it)
1394 logger.debug("Auth context cached but user missing, falling through to DB")
1396 except HTTPException:
1397 raise
1398 except Exception as cache_error:
1399 logger.debug(f"Auth cache check failed, falling through to DB: {cache_error}")
1401 # === BATCHED QUERIES: Single DB call for user + team + revocation ===
1402 if settings.auth_cache_batch_queries:
1403 try:
1404 auth_ctx = await asyncio.to_thread(_get_auth_context_batched_sync, email, jti)
1406 # Check revocation
1407 if auth_ctx.get("is_token_revoked"):
1408 raise HTTPException(
1409 status_code=status.HTTP_401_UNAUTHORIZED,
1410 detail="Token has been revoked",
1411 headers={"WWW-Authenticate": "Bearer"},
1412 )
1414 # Resolve teams based on token_use
1415 token_use = payload.get("token_use")
1416 if token_use == "session": # nosec B105 - Not a password; token_use is a JWT claim type
1417 # Session token: use team_ids from batched query via resolve_session_teams
1418 user_dict = auth_ctx.get("user")
1419 is_admin = user_dict.get("is_admin", False) if user_dict else False
1420 batch_teams = None if is_admin else auth_ctx.get("team_ids", [])
1421 teams = await resolve_session_teams(payload, email, {"is_admin": is_admin}, preresolved_db_teams=batch_teams)
1422 else:
1423 # API token or legacy: use embedded teams
1424 teams = normalize_token_teams(payload)
1426 # Set team_id: only for single-team API tokens
1427 if teams is None:
1428 team_id = None
1429 elif len(teams) == 1 and token_use != "session": # nosec B105
1430 team_id = teams[0] if isinstance(teams[0], str) else teams[0].get("id")
1431 else:
1432 team_id = None
1434 if request:
1435 request.state.token_teams = teams
1436 request.state.team_id = team_id
1437 request.state.token_use = token_use
1438 request.state.trace_team_name = await resolve_trace_team_name(payload, teams, preresolved_team_names=auth_ctx.get("team_names"))
1439 await _set_auth_method_from_payload(payload)
1441 # Store in cache for future requests
1442 if settings.auth_cache_enabled:
1443 try:
1444 # First-Party
1445 from mcpgateway.cache.auth_cache import auth_cache, CachedAuthContext # noqa: F811 pylint: disable=import-outside-toplevel
1447 await auth_cache.set_auth_context(
1448 email,
1449 jti,
1450 CachedAuthContext(
1451 user=auth_ctx.get("user"),
1452 personal_team_id=auth_ctx.get("personal_team_id"),
1453 is_token_revoked=auth_ctx.get("is_token_revoked", False),
1454 ),
1455 )
1456 # Also populate teams-list cache so cached-path requests
1457 # don't need an extra DB query via _resolve_teams_from_db()
1458 # Cache the raw DB teams (batch_teams), not the narrowed
1459 # intersection (teams), so that other sessions for the same
1460 # user see the full membership and can narrow independently.
1461 if token_use == "session" and batch_teams is not None: # nosec B105
1462 await auth_cache.set_user_teams(f"{email}:True", batch_teams)
1463 except Exception as cache_set_error:
1464 logger.debug(f"Failed to cache auth context: {cache_set_error}")
1466 # Create user from batched result
1467 if auth_ctx.get("user"):
1468 user_dict = auth_ctx["user"]
1469 if not user_dict.get("is_active", True):
1470 raise HTTPException(
1471 status_code=status.HTTP_401_UNAUTHORIZED,
1472 detail="Account disabled",
1473 headers={"WWW-Authenticate": "Bearer"},
1474 )
1475 # Store user for return at end of function
1476 # We'll check platform admin case and return below
1477 _batched_user = _user_from_cached_dict(user_dict)
1478 else:
1479 _batched_user = None
1481 # Handle user not found case
1482 if _batched_user is None:
1483 # Check if strict user-in-DB mode is enabled
1484 if settings.require_user_in_db:
1485 logger.warning(
1486 f"Authentication rejected for {email}: user not found in database. " "REQUIRE_USER_IN_DB is enabled.",
1487 extra={"security_event": "user_not_in_db_rejected", "user_id": email},
1488 )
1489 raise HTTPException(
1490 status_code=status.HTTP_401_UNAUTHORIZED,
1491 detail="User not found in database",
1492 headers={"WWW-Authenticate": "Bearer"},
1493 )
1495 # Platform admin bootstrap (only when REQUIRE_USER_IN_DB=false)
1496 if email == getattr(settings, "platform_admin_email", "admin@example.com"):
1497 logger.info(
1498 f"Platform admin bootstrap authentication for {email}. " "User authenticated via platform admin configuration.",
1499 extra={"security_event": "platform_admin_bootstrap", "user_id": email},
1500 )
1501 _batched_user = EmailUser(
1502 email=email,
1503 password_hash="", # nosec B106
1504 full_name=getattr(settings, "platform_admin_full_name", "Platform Administrator"),
1505 is_admin=True,
1506 is_active=True,
1507 auth_provider="local",
1508 password_change_required=False,
1509 email_verified_at=datetime.now(timezone.utc),
1510 created_at=datetime.now(timezone.utc),
1511 updated_at=datetime.now(timezone.utc),
1512 )
1513 else:
1514 raise HTTPException(
1515 status_code=status.HTTP_401_UNAUTHORIZED,
1516 detail="User not found",
1517 headers={"WWW-Authenticate": "Bearer"},
1518 )
1520 if plugin_manager and plugin_manager.config.plugin_settings.include_user_info:
1521 _inject_userinfo_instate(request, _batched_user)
1522 _propagate_tenant_id(request)
1524 _set_trace_for_user(
1525 _batched_user,
1526 teams=getattr(request.state, "token_teams", _UNSET) if request else _UNSET,
1527 team_name=getattr(request.state, "trace_team_name", None) if request else None,
1528 )
1529 return _batched_user
1531 except HTTPException:
1532 raise
1533 except Exception as batch_error:
1534 logger.warning(f"Batched auth query failed, falling back to individual queries: {batch_error}")
1536 # === FALLBACK: Original individual queries (if batching disabled or failed) ===
1537 if jti:
1538 try:
1539 is_revoked = await asyncio.to_thread(_check_token_revoked_sync, jti)
1540 if is_revoked:
1541 raise HTTPException(
1542 status_code=status.HTTP_401_UNAUTHORIZED,
1543 detail="Token has been revoked",
1544 headers={"WWW-Authenticate": "Bearer"},
1545 )
1546 except HTTPException:
1547 raise
1548 except Exception as revoke_check_error:
1549 # Fail-secure: if the revocation check itself errors, reject the token.
1550 # Allowing through on error would let revoked tokens bypass enforcement
1551 # when the DB is unreachable or the table is missing.
1552 logger.warning(
1553 f"Token revocation check failed for JTI {SecurityValidator.sanitize_log_message(jti)} — denying access (fail-secure): {SecurityValidator.sanitize_log_message(str(revoke_check_error))}"
1554 )
1555 raise HTTPException(
1556 status_code=status.HTTP_401_UNAUTHORIZED,
1557 detail="Token validation failed",
1558 headers={"WWW-Authenticate": "Bearer"},
1559 )
1561 # Resolve teams based on token_use
1562 token_use = payload.get("token_use")
1563 if token_use == "session": # nosec B105 - Not a password; token_use is a JWT claim type
1564 # Session token: resolve teams from DB/cache (fallback path — separate query OK)
1565 user_info = {"is_admin": payload.get("is_admin", False) or payload.get("user", {}).get("is_admin", False)}
1566 normalized_teams = await resolve_session_teams(payload, email, user_info)
1567 else:
1568 # API token or legacy: use embedded teams
1569 normalized_teams = normalize_token_teams(payload)
1571 # Set team_id: only for single-team API tokens
1572 if normalized_teams is None:
1573 team_id = None
1574 elif len(normalized_teams) == 1 and token_use != "session": # nosec B105
1575 team_id = normalized_teams[0] if isinstance(normalized_teams[0], str) else normalized_teams[0].get("id")
1576 else:
1577 team_id = None
1579 if request:
1580 request.state.token_teams = normalized_teams
1581 request.state.team_id = team_id
1582 request.state.token_use = token_use
1583 request.state.trace_team_name = await resolve_trace_team_name(payload, normalized_teams)
1584 # Store JTI for use in middleware (e.g., token usage logging)
1585 if jti:
1586 request.state.jti = jti
1587 await _set_auth_method_from_payload(payload)
1589 except HTTPException:
1590 # Re-raise HTTPException from verify_jwt_token (handles expired/invalid tokens)
1591 raise
1592 except Exception as jwt_error:
1593 # JWT validation failed, try database API token
1594 # Uses fresh DB session via asyncio.to_thread to avoid blocking event loop
1595 logger.debug("JWT validation failed with error: %s, trying database API token", jwt_error)
1596 try:
1597 token_hash = hashlib.sha256(credentials.credentials.encode()).hexdigest()
1599 # Lookup API token using fresh session in thread pool
1600 api_token_info = await asyncio.to_thread(_lookup_api_token_sync, token_hash)
1601 logger.debug(f"Database lookup result: {api_token_info is not None}")
1603 if api_token_info:
1604 # Check for error conditions returned by helper
1605 if api_token_info.get("expired"):
1606 raise HTTPException(
1607 status_code=status.HTTP_401_UNAUTHORIZED,
1608 detail="API token expired",
1609 headers={"WWW-Authenticate": "Bearer"},
1610 )
1612 if api_token_info.get("revoked"):
1613 raise HTTPException(
1614 status_code=status.HTTP_401_UNAUTHORIZED,
1615 detail="API token has been revoked",
1616 headers={"WWW-Authenticate": "Bearer"},
1617 )
1619 # Use the email from the API token
1620 email = api_token_info["user_email"]
1621 logger.debug(f"API token authentication successful for email: {email}")
1623 # Set auth_method for database API tokens
1624 if request:
1625 request.state.auth_method = "api_token"
1626 request.state.user_email = api_token_info["user_email"]
1627 # Store JTI for use in middleware
1628 if "jti" in api_token_info:
1629 request.state.jti = api_token_info["jti"]
1630 else:
1631 logger.debug("API token not found in database")
1632 logger.debug("No valid authentication method found")
1633 # Neither JWT nor API token worked
1634 raise HTTPException(
1635 status_code=status.HTTP_401_UNAUTHORIZED,
1636 detail="Invalid authentication credentials",
1637 headers={"WWW-Authenticate": "Bearer"},
1638 )
1639 except HTTPException:
1640 # Re-raise HTTP exceptions
1641 raise
1642 except Exception as e:
1643 # Neither JWT nor API token validation worked
1644 logger.debug(f"Database API token validation failed with exception: {SecurityValidator.sanitize_log_message(str(e))}")
1645 raise HTTPException(
1646 status_code=status.HTTP_401_UNAUTHORIZED,
1647 detail="Invalid authentication credentials",
1648 headers={"WWW-Authenticate": "Bearer"},
1649 )
1651 # Get user from database using fresh session in thread pool
1652 user = await asyncio.to_thread(_get_user_by_email_sync, email)
1654 if user is None:
1655 # Check if strict user-in-DB mode is enabled
1656 if settings.require_user_in_db:
1657 logger.warning(
1658 f"Authentication rejected for {email}: user not found in database. " "REQUIRE_USER_IN_DB is enabled.",
1659 extra={"security_event": "user_not_in_db_rejected", "user_id": email},
1660 )
1661 raise HTTPException(
1662 status_code=status.HTTP_401_UNAUTHORIZED,
1663 detail="User not found in database",
1664 headers={"WWW-Authenticate": "Bearer"},
1665 )
1667 # Platform admin bootstrap (only when REQUIRE_USER_IN_DB=false)
1668 # If user doesn't exist but token is valid and email matches platform admin,
1669 # create a virtual admin user object
1670 if email == getattr(settings, "platform_admin_email", "admin@example.com"):
1671 logger.info(
1672 f"Platform admin bootstrap authentication for {email}. " "User authenticated via platform admin configuration.",
1673 extra={"security_event": "platform_admin_bootstrap", "user_id": email},
1674 )
1675 # Create a virtual admin user for authentication purposes
1676 user = EmailUser(
1677 email=email,
1678 password_hash="", # nosec B106 - Not used for JWT authentication
1679 full_name=getattr(settings, "platform_admin_full_name", "Platform Administrator"),
1680 is_admin=True,
1681 is_active=True,
1682 auth_provider="local",
1683 password_change_required=False,
1684 email_verified_at=datetime.now(timezone.utc),
1685 created_at=datetime.now(timezone.utc),
1686 updated_at=datetime.now(timezone.utc),
1687 )
1688 else:
1689 raise HTTPException(
1690 status_code=status.HTTP_401_UNAUTHORIZED,
1691 detail="User not found",
1692 headers={"WWW-Authenticate": "Bearer"},
1693 )
1695 if not user.is_active:
1696 raise HTTPException(
1697 status_code=status.HTTP_401_UNAUTHORIZED,
1698 detail="Account disabled",
1699 headers={"WWW-Authenticate": "Bearer"},
1700 )
1702 if plugin_manager and plugin_manager.config.plugin_settings.include_user_info:
1703 _inject_userinfo_instate(request, user)
1704 _propagate_tenant_id(request)
1706 trace_teams = getattr(request.state, "token_teams", _UNSET) if request else _UNSET
1707 _set_trace_for_user(user, teams=trace_teams, team_name=getattr(request.state, "trace_team_name", None) if request else None)
1708 return user
1711def _propagate_tenant_id(request: Optional[object] = None) -> None:
1712 """Propagate request.state.team_id into GlobalContext.tenant_id for rate limiting.
1714 Called unconditionally at every return path in get_current_user() — unlike
1715 _inject_userinfo_instate() which is gated by include_user_info. This
1716 ensures by_tenant rate limiting works even when include_user_info is False
1717 (the default) and the middleware has already created plugin_global_context.
1719 Only writes when tenant_id is still None (no overwrite of plugin-set values).
1721 Args:
1722 request: The incoming request object, or ``None`` if unavailable.
1723 """
1724 if not request:
1725 return
1726 global_context = getattr(getattr(request, "state", None), "plugin_global_context", None)
1727 if global_context and global_context.tenant_id is None:
1728 team_id = getattr(getattr(request, "state", None), "team_id", None)
1729 if team_id:
1730 global_context.tenant_id = team_id
1733def _inject_userinfo_instate(request: Optional[object] = None, user: Optional[EmailUser] = None) -> None:
1734 """This function injects user related information into the plugin_global_context, if the config has
1735 include_user_info key set as true.
1737 Args:
1738 request: Optional request object for plugin hooks
1739 user: User related information
1740 """
1742 logger = logging.getLogger(__name__)
1743 # Get request ID from correlation ID context (set by CorrelationIDMiddleware)
1744 request_id = get_correlation_id()
1745 if not request_id:
1746 # Fallback chain for safety
1747 if request and hasattr(request, "state") and hasattr(request.state, "request_id"):
1748 request_id = request.state.request_id
1749 else:
1750 request_id = uuid.uuid4().hex
1751 logger.debug(f"Generated fallback request ID in get_current_user: {request_id}")
1753 # Get plugin contexts from request state if available
1754 global_context = getattr(request.state, "plugin_global_context", None) if request else None
1755 if not global_context:
1756 # Create global context
1757 global_context = GlobalContext(
1758 request_id=request_id,
1759 server_id=None,
1760 tenant_id=None,
1761 )
1763 if user:
1764 if not global_context.user:
1765 global_context.user = {}
1766 global_context.user["email"] = user.email
1767 global_context.user["is_admin"] = user.is_admin
1768 global_context.user["full_name"] = user.full_name
1770 if request and global_context:
1771 request.state.plugin_global_context = global_context