Coverage for mcpgateway / utils / trace_context.py: 97%
96 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Trace context helpers for OpenTelemetry span enrichment."""
4# Standard
5from contextvars import ContextVar
6import re
7from typing import Any, Iterable, Optional
9_trace_user_email: ContextVar[Optional[str]] = ContextVar("trace_user_email", default=None)
10_trace_user_is_admin: ContextVar[bool] = ContextVar("trace_user_is_admin", default=False)
11_trace_team_scope: ContextVar[Optional[str]] = ContextVar("trace_team_scope", default=None)
12_trace_team_name: ContextVar[Optional[str]] = ContextVar("trace_team_name", default=None)
13_trace_auth_method: ContextVar[Optional[str]] = ContextVar("trace_auth_method", default=None)
14_trace_session_id: ContextVar[Optional[str]] = ContextVar("trace_session_id", default=None)
16_TEAM_SCOPE_SEPARATOR = ","
17_ELLIPSIS_MARKER = "..."
20def get_trace_user_email() -> Optional[str]:
21 """Return the current trace user email.
23 Returns:
24 Email address recorded in the current trace context, if any.
25 """
26 return _trace_user_email.get()
29def set_trace_user_email(value: Optional[str]) -> None:
30 """Set the current trace user email.
32 Args:
33 value: Email address to store for the current trace context.
34 """
35 _trace_user_email.set(value)
38def get_trace_user_is_admin() -> bool:
39 """Return whether the current trace user is an admin.
41 Returns:
42 ``True`` when the current trace context represents an admin user.
43 """
44 return _trace_user_is_admin.get()
47def set_trace_user_is_admin(value: bool) -> None:
48 """Set whether the current trace user is an admin.
50 Args:
51 value: Admin flag to store for the current trace context.
52 """
53 _trace_user_is_admin.set(bool(value))
56def get_trace_team_scope() -> Optional[str]:
57 """Return the current trace team scope label.
59 Returns:
60 Serialized team scope label for the current trace context, if any.
61 """
62 return _trace_team_scope.get()
65def set_trace_team_scope(value: Optional[str]) -> None:
66 """Set the current trace team scope label.
68 Args:
69 value: Serialized team scope label to store for the current trace context.
70 """
71 _trace_team_scope.set(value)
74def get_trace_team_name() -> Optional[str]:
75 """Return the current primary trace team name.
77 Returns:
78 Team display name recorded for the primary team in the current trace context, if any.
79 """
80 return _trace_team_name.get()
83def set_trace_team_name(value: Optional[str]) -> None:
84 """Set the current primary trace team name.
86 Args:
87 value: Team display name to store for the current trace context.
88 """
89 _trace_team_name.set(value)
92def get_trace_auth_method() -> Optional[str]:
93 """Return the current trace auth method.
95 Returns:
96 Authentication method label for the current trace context, if any.
97 """
98 return _trace_auth_method.get()
101def set_trace_auth_method(value: Optional[str]) -> None:
102 """Set the current trace auth method.
104 Args:
105 value: Authentication method label to store for the current trace context.
106 """
107 _trace_auth_method.set(value)
110def get_trace_session_id() -> Optional[str]:
111 """Return the current trace session identifier.
113 Returns:
114 Session identifier recorded in the current trace context, if any.
115 """
116 return _trace_session_id.get()
119def set_trace_session_id(value: Optional[str]) -> None:
120 """Set the current trace session identifier.
122 Args:
123 value: Session identifier to store for the current trace context.
124 """
125 _trace_session_id.set(value)
128def clear_trace_context() -> None:
129 """Clear all trace context values for the current execution context."""
130 set_trace_user_email(None)
131 set_trace_user_is_admin(False)
132 set_trace_team_scope(None)
133 set_trace_team_name(None)
134 set_trace_auth_method(None)
135 set_trace_session_id(None)
138def _normalize_team_id(team: Any) -> Optional[str]:
139 """Normalize a team identifier from mixed token formats.
141 Args:
142 team: Team identifier in string, object, or mapping form.
144 Returns:
145 Normalized team identifier string, or ``None`` when no usable value exists.
146 """
147 if isinstance(team, dict):
148 team = team.get("id")
149 if team is None:
150 return None
151 team_id = str(team).strip()
152 return team_id or None
155def _normalize_team_name(team: Any) -> Optional[str]:
156 """Normalize a team display name from mixed token formats.
158 Args:
159 team: Team value in mapping or scalar form.
161 Returns:
162 Normalized team display name string, or ``None`` when unavailable.
163 """
164 if not isinstance(team, dict):
165 return None
167 team_name = team.get("name")
168 if team_name is None:
169 return None
171 normalized_name = str(team_name).strip()
172 return normalized_name or None
175def format_trace_team_scope(token_teams: Optional[Iterable[Any]], *, max_teams: int = 5) -> str:
176 """Format token team scope for trace labels.
178 Args:
179 token_teams: Iterable of token team values, or ``None`` to represent admin scope.
180 max_teams: Maximum number of explicit team identifiers to include before truncating.
182 Returns:
183 Serialized team scope label for use in span attributes.
184 """
185 if token_teams is None:
186 return "admin"
188 normalized: list[str] = []
189 for team in token_teams:
190 team_id = _normalize_team_id(team)
191 if team_id:
192 normalized.append(team_id)
194 if not normalized:
195 return "public"
197 if len(normalized) <= max_teams:
198 return _TEAM_SCOPE_SEPARATOR.join(normalized)
200 limited = normalized[:max_teams]
201 limited.append(_ELLIPSIS_MARKER)
202 return _TEAM_SCOPE_SEPARATOR.join(limited)
205def primary_team_from_scope(team_scope: Optional[str]) -> Optional[str]:
206 """Return the first team id from a formatted trace team scope label.
208 Args:
209 team_scope: Serialized team scope label produced by ``format_trace_team_scope``.
211 Returns:
212 First concrete team identifier in the scope, or ``None`` for admin/public scopes.
213 """
214 if not team_scope or team_scope in {"admin", "public"}:
215 return None
217 for candidate in re.split(r"\s*,\s*", team_scope):
218 if candidate and candidate != _ELLIPSIS_MARKER:
219 return candidate
220 return None
223def primary_team_name_from_teams(token_teams: Optional[Iterable[Any]]) -> Optional[str]:
224 """Return the primary team display name from raw team values.
226 The primary team is defined by the first concrete team identifier that would
227 appear in ``team.scope``. A name is only returned when that same team value
228 also includes a non-empty display name.
230 Args:
231 token_teams: Iterable of raw token team values, or ``None`` for admin scope.
233 Returns:
234 Team display name for the primary concrete team, or ``None`` when unavailable.
235 """
236 if token_teams is None:
237 return None
239 for team in token_teams:
240 team_id = _normalize_team_id(team)
241 if not team_id:
242 continue
243 return _normalize_team_name(team)
244 return None
247def set_trace_context_from_teams(
248 token_teams: Optional[Iterable[Any]],
249 *,
250 user_email: Optional[str] = None,
251 is_admin: bool = False,
252 auth_method: Optional[str] = None,
253 team_name: Optional[str] = None,
254 max_teams: int = 5,
255) -> None:
256 """Populate trace context using the canonical token-teams model.
258 Args:
259 token_teams: Iterable of team identifiers, or ``None`` for admin scope.
260 user_email: Optional user email to record on the trace.
261 is_admin: Whether the trace context should be marked as admin.
262 auth_method: Optional authentication method label to record.
263 team_name: Optional display name for the primary concrete team.
264 max_teams: Maximum number of team identifiers to include in the scope label.
265 """
266 if user_email is not None:
267 set_trace_user_email(user_email)
268 set_trace_user_is_admin(is_admin)
269 if auth_method is not None:
270 set_trace_auth_method(auth_method)
271 set_trace_team_name(team_name or primary_team_name_from_teams(token_teams))
272 set_trace_team_scope(format_trace_team_scope(token_teams, max_teams=max_teams))