Coverage for mcpgateway / utils / trace_context.py: 97%

96 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Trace context helpers for OpenTelemetry span enrichment.""" 

3 

4# Standard 

5from contextvars import ContextVar 

6import re 

7from typing import Any, Iterable, Optional 

8 

9_trace_user_email: ContextVar[Optional[str]] = ContextVar("trace_user_email", default=None) 

10_trace_user_is_admin: ContextVar[bool] = ContextVar("trace_user_is_admin", default=False) 

11_trace_team_scope: ContextVar[Optional[str]] = ContextVar("trace_team_scope", default=None) 

12_trace_team_name: ContextVar[Optional[str]] = ContextVar("trace_team_name", default=None) 

13_trace_auth_method: ContextVar[Optional[str]] = ContextVar("trace_auth_method", default=None) 

14_trace_session_id: ContextVar[Optional[str]] = ContextVar("trace_session_id", default=None) 

15 

16_TEAM_SCOPE_SEPARATOR = "," 

17_ELLIPSIS_MARKER = "..." 

18 

19 

20def get_trace_user_email() -> Optional[str]: 

21 """Return the current trace user email. 

22 

23 Returns: 

24 Email address recorded in the current trace context, if any. 

25 """ 

26 return _trace_user_email.get() 

27 

28 

29def set_trace_user_email(value: Optional[str]) -> None: 

30 """Set the current trace user email. 

31 

32 Args: 

33 value: Email address to store for the current trace context. 

34 """ 

35 _trace_user_email.set(value) 

36 

37 

38def get_trace_user_is_admin() -> bool: 

39 """Return whether the current trace user is an admin. 

40 

41 Returns: 

42 ``True`` when the current trace context represents an admin user. 

43 """ 

44 return _trace_user_is_admin.get() 

45 

46 

47def set_trace_user_is_admin(value: bool) -> None: 

48 """Set whether the current trace user is an admin. 

49 

50 Args: 

51 value: Admin flag to store for the current trace context. 

52 """ 

53 _trace_user_is_admin.set(bool(value)) 

54 

55 

56def get_trace_team_scope() -> Optional[str]: 

57 """Return the current trace team scope label. 

58 

59 Returns: 

60 Serialized team scope label for the current trace context, if any. 

61 """ 

62 return _trace_team_scope.get() 

63 

64 

65def set_trace_team_scope(value: Optional[str]) -> None: 

66 """Set the current trace team scope label. 

67 

68 Args: 

69 value: Serialized team scope label to store for the current trace context. 

70 """ 

71 _trace_team_scope.set(value) 

72 

73 

74def get_trace_team_name() -> Optional[str]: 

75 """Return the current primary trace team name. 

76 

77 Returns: 

78 Team display name recorded for the primary team in the current trace context, if any. 

79 """ 

80 return _trace_team_name.get() 

81 

82 

83def set_trace_team_name(value: Optional[str]) -> None: 

84 """Set the current primary trace team name. 

85 

86 Args: 

87 value: Team display name to store for the current trace context. 

88 """ 

89 _trace_team_name.set(value) 

90 

91 

92def get_trace_auth_method() -> Optional[str]: 

93 """Return the current trace auth method. 

94 

95 Returns: 

96 Authentication method label for the current trace context, if any. 

97 """ 

98 return _trace_auth_method.get() 

99 

100 

101def set_trace_auth_method(value: Optional[str]) -> None: 

102 """Set the current trace auth method. 

103 

104 Args: 

105 value: Authentication method label to store for the current trace context. 

106 """ 

107 _trace_auth_method.set(value) 

108 

109 

110def get_trace_session_id() -> Optional[str]: 

111 """Return the current trace session identifier. 

112 

113 Returns: 

114 Session identifier recorded in the current trace context, if any. 

115 """ 

116 return _trace_session_id.get() 

117 

118 

119def set_trace_session_id(value: Optional[str]) -> None: 

120 """Set the current trace session identifier. 

121 

122 Args: 

123 value: Session identifier to store for the current trace context. 

124 """ 

125 _trace_session_id.set(value) 

126 

127 

128def clear_trace_context() -> None: 

129 """Clear all trace context values for the current execution context.""" 

130 set_trace_user_email(None) 

131 set_trace_user_is_admin(False) 

132 set_trace_team_scope(None) 

133 set_trace_team_name(None) 

134 set_trace_auth_method(None) 

135 set_trace_session_id(None) 

136 

137 

138def _normalize_team_id(team: Any) -> Optional[str]: 

139 """Normalize a team identifier from mixed token formats. 

140 

141 Args: 

142 team: Team identifier in string, object, or mapping form. 

143 

144 Returns: 

145 Normalized team identifier string, or ``None`` when no usable value exists. 

146 """ 

147 if isinstance(team, dict): 

148 team = team.get("id") 

149 if team is None: 

150 return None 

151 team_id = str(team).strip() 

152 return team_id or None 

153 

154 

155def _normalize_team_name(team: Any) -> Optional[str]: 

156 """Normalize a team display name from mixed token formats. 

157 

158 Args: 

159 team: Team value in mapping or scalar form. 

160 

161 Returns: 

162 Normalized team display name string, or ``None`` when unavailable. 

163 """ 

164 if not isinstance(team, dict): 

165 return None 

166 

167 team_name = team.get("name") 

168 if team_name is None: 

169 return None 

170 

171 normalized_name = str(team_name).strip() 

172 return normalized_name or None 

173 

174 

175def format_trace_team_scope(token_teams: Optional[Iterable[Any]], *, max_teams: int = 5) -> str: 

176 """Format token team scope for trace labels. 

177 

178 Args: 

179 token_teams: Iterable of token team values, or ``None`` to represent admin scope. 

180 max_teams: Maximum number of explicit team identifiers to include before truncating. 

181 

182 Returns: 

183 Serialized team scope label for use in span attributes. 

184 """ 

185 if token_teams is None: 

186 return "admin" 

187 

188 normalized: list[str] = [] 

189 for team in token_teams: 

190 team_id = _normalize_team_id(team) 

191 if team_id: 

192 normalized.append(team_id) 

193 

194 if not normalized: 

195 return "public" 

196 

197 if len(normalized) <= max_teams: 

198 return _TEAM_SCOPE_SEPARATOR.join(normalized) 

199 

200 limited = normalized[:max_teams] 

201 limited.append(_ELLIPSIS_MARKER) 

202 return _TEAM_SCOPE_SEPARATOR.join(limited) 

203 

204 

205def primary_team_from_scope(team_scope: Optional[str]) -> Optional[str]: 

206 """Return the first team id from a formatted trace team scope label. 

207 

208 Args: 

209 team_scope: Serialized team scope label produced by ``format_trace_team_scope``. 

210 

211 Returns: 

212 First concrete team identifier in the scope, or ``None`` for admin/public scopes. 

213 """ 

214 if not team_scope or team_scope in {"admin", "public"}: 

215 return None 

216 

217 for candidate in re.split(r"\s*,\s*", team_scope): 

218 if candidate and candidate != _ELLIPSIS_MARKER: 

219 return candidate 

220 return None 

221 

222 

223def primary_team_name_from_teams(token_teams: Optional[Iterable[Any]]) -> Optional[str]: 

224 """Return the primary team display name from raw team values. 

225 

226 The primary team is defined by the first concrete team identifier that would 

227 appear in ``team.scope``. A name is only returned when that same team value 

228 also includes a non-empty display name. 

229 

230 Args: 

231 token_teams: Iterable of raw token team values, or ``None`` for admin scope. 

232 

233 Returns: 

234 Team display name for the primary concrete team, or ``None`` when unavailable. 

235 """ 

236 if token_teams is None: 

237 return None 

238 

239 for team in token_teams: 

240 team_id = _normalize_team_id(team) 

241 if not team_id: 

242 continue 

243 return _normalize_team_name(team) 

244 return None 

245 

246 

247def set_trace_context_from_teams( 

248 token_teams: Optional[Iterable[Any]], 

249 *, 

250 user_email: Optional[str] = None, 

251 is_admin: bool = False, 

252 auth_method: Optional[str] = None, 

253 team_name: Optional[str] = None, 

254 max_teams: int = 5, 

255) -> None: 

256 """Populate trace context using the canonical token-teams model. 

257 

258 Args: 

259 token_teams: Iterable of team identifiers, or ``None`` for admin scope. 

260 user_email: Optional user email to record on the trace. 

261 is_admin: Whether the trace context should be marked as admin. 

262 auth_method: Optional authentication method label to record. 

263 team_name: Optional display name for the primary concrete team. 

264 max_teams: Maximum number of team identifiers to include in the scope label. 

265 """ 

266 if user_email is not None: 

267 set_trace_user_email(user_email) 

268 set_trace_user_is_admin(is_admin) 

269 if auth_method is not None: 

270 set_trace_auth_method(auth_method) 

271 set_trace_team_name(team_name or primary_team_name_from_teams(token_teams)) 

272 set_trace_team_scope(format_trace_team_scope(token_teams, max_teams=max_teams))