Coverage for mcpgateway / observability.py: 96%

559 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/observability.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Vendor-agnostic OpenTelemetry instrumentation for ContextForge. 

8Supports any OTLP-compatible backend (Jaeger, Zipkin, Tempo, Phoenix, etc.). 

9""" 

10 

11# Standard 

12import base64 

13from contextlib import nullcontext 

14from importlib import import_module as _im 

15import logging 

16import os 

17from typing import Any, Callable, cast, Dict, Mapping, Optional 

18from urllib.parse import urlparse 

19 

20# Third-Party - Try to import OpenTelemetry core components - make them truly optional 

21OTEL_AVAILABLE = False 

22try: 

23 # Third-Party 

24 from opentelemetry import trace 

25 from opentelemetry.propagate import extract as otel_extract 

26 from opentelemetry.propagate import inject as otel_inject 

27 from opentelemetry.sdk.resources import Resource 

28 from opentelemetry.sdk.trace import TracerProvider 

29 from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor 

30 from opentelemetry.trace import SpanKind, Status, StatusCode 

31 

32 OTEL_AVAILABLE = True 

33except ImportError: 

34 # OpenTelemetry not installed - set to None for graceful degradation 

35 trace = None 

36 otel_extract = None 

37 otel_inject = None 

38 

39 class _SpanKindShim: 

40 """Minimal SpanKind shim used when OpenTelemetry isn't installed.""" 

41 

42 SERVER = "server" 

43 

44 # Provide a lightweight shim so tests can patch Resource.create 

45 class _ResourceShim: 

46 """Minimal Resource shim used when OpenTelemetry SDK isn't installed. 

47 

48 Exposes a static ``create`` method that simply returns the provided 

49 attributes mapping, enabling tests to patch and inspect the inputs 

50 without requiring the real OpenTelemetry classes. 

51 """ 

52 

53 @staticmethod 

54 def create(attrs: Dict[str, Any]) -> Dict[str, Any]: # type: ignore[override] 

55 """Return attributes unchanged to mimic ``Resource.create``. 

56 

57 Args: 

58 attrs: Resource attribute dictionary. 

59 

60 Returns: 

61 Dict[str, Any]: The same mapping passed in. 

62 """ 

63 return attrs 

64 

65 Resource = cast(Any, _ResourceShim) 

66 TracerProvider = None 

67 BatchSpanProcessor = None 

68 ConsoleSpanExporter = None 

69 SimpleSpanProcessor = None 

70 SpanKind = cast(Any, _SpanKindShim) 

71 Status = None 

72 StatusCode = None 

73 

74 # Provide minimal module shims so tests can patch ConsoleSpanExporter path 

75 try: 

76 # Standard 

77 import sys 

78 import types 

79 

80 if ("pytest" in sys.modules) or (os.getenv("MCP_TESTING") == "1"): 

81 otel_root = types.ModuleType("opentelemetry") 

82 otel_sdk = types.ModuleType("opentelemetry.sdk") 

83 otel_trace = types.ModuleType("opentelemetry.sdk.trace") 

84 otel_export = types.ModuleType("opentelemetry.sdk.trace.export") 

85 

86 class _ConsoleSpanExporterStub: # pragma: no cover - test patch replaces this 

87 """Lightweight stub for ConsoleSpanExporter used in tests. 

88 

89 Provides a placeholder class so unit tests can patch 

90 `opentelemetry.sdk.trace.export.ConsoleSpanExporter` even when 

91 the OpenTelemetry SDK is not installed in the environment. 

92 """ 

93 

94 setattr(otel_export, "ConsoleSpanExporter", _ConsoleSpanExporterStub) 

95 setattr(otel_trace, "export", otel_export) 

96 setattr(otel_sdk, "trace", otel_trace) 

97 setattr(otel_root, "sdk", otel_sdk) 

98 

99 # Only register the exact chain needed by tests 

100 sys.modules.setdefault("opentelemetry", otel_root) 

101 sys.modules.setdefault("opentelemetry.sdk", otel_sdk) 

102 sys.modules.setdefault("opentelemetry.sdk.trace", otel_trace) 

103 sys.modules.setdefault("opentelemetry.sdk.trace.export", otel_export) 

104 except Exception as exc: # nosec B110 - best-effort optional shim 

105 # Shimming is a non-critical, best-effort step for tests; log and continue. 

106 logging.getLogger(__name__).debug("Skipping OpenTelemetry shim setup: %s", exc) 

107 

108# First-Party 

109from mcpgateway.config import get_settings # noqa: E402 # pylint: disable=wrong-import-position 

110from mcpgateway.utils.correlation_id import get_correlation_id # noqa: E402 # pylint: disable=wrong-import-position 

111from mcpgateway.utils.log_sanitizer import sanitize_for_log # noqa: E402 # pylint: disable=wrong-import-position 

112from mcpgateway.utils.trace_context import ( # noqa: E402 # pylint: disable=wrong-import-position 

113 get_trace_auth_method, 

114 get_trace_session_id, 

115 get_trace_team_name, 

116 get_trace_team_scope, 

117 get_trace_user_email, 

118 get_trace_user_is_admin, 

119 primary_team_from_scope, 

120) 

121from mcpgateway.utils.trace_redaction import sanitize_trace_attribute_value, sanitize_trace_text # noqa: E402 # pylint: disable=wrong-import-position 

122 

123# Try to import optional exporters 

124try: 

125 OTLP_SPAN_EXPORTER = getattr(_im("opentelemetry.exporter.otlp.proto.grpc.trace_exporter"), "OTLPSpanExporter") 

126except Exception: 

127 try: 

128 OTLP_SPAN_EXPORTER = getattr(_im("opentelemetry.exporter.otlp.proto.http.trace_exporter"), "OTLPSpanExporter") 

129 except Exception: 

130 OTLP_SPAN_EXPORTER = None 

131 

132try: 

133 JAEGER_EXPORTER = getattr(_im("opentelemetry.exporter.jaeger.thrift"), "JaegerExporter") 

134except Exception: 

135 JAEGER_EXPORTER = None 

136 

137try: 

138 ZIPKIN_EXPORTER = getattr(_im("opentelemetry.exporter.zipkin.json"), "ZipkinExporter") 

139except Exception: 

140 ZIPKIN_EXPORTER = None 

141 

142try: 

143 HTTP_EXPORTER = getattr(_im("opentelemetry.exporter.otlp.proto.http.trace_exporter"), "OTLPSpanExporter") 

144except Exception: 

145 HTTP_EXPORTER = None 

146 

147logger = logging.getLogger(__name__) 

148 

149_LANGFUSE_OTEL_PATH_FRAGMENT = "/api/public/otel" 

150_MAX_SPAN_EXCEPTION_MESSAGE_LENGTH = 1024 

151_IDENTITY_ATTRIBUTE_KEYS = frozenset({"user.email", "user.is_admin", "team.scope", "team.name", "langfuse.user.id"}) 

152 

153 

154# Global tracer instance - using UPPER_CASE for module-level constant 

155# pylint: disable=invalid-name 

156_TRACER = None 

157 

158 

159def _sanitize_span_exception_message(exc_val: Optional[BaseException]) -> str: 

160 """Return a sanitized, bounded exception message for span attributes. 

161 

162 Args: 

163 exc_val: Exception instance captured by the span lifecycle. 

164 

165 Returns: 

166 Sanitized exception text safe to attach to OTEL and Langfuse attributes. 

167 """ 

168 if exc_val is None: 

169 return "" 

170 

171 sanitized = sanitize_trace_text(str(exc_val)) 

172 sanitized = sanitize_for_log(sanitized).strip() 

173 if not sanitized: 

174 sanitized = exc_val.__class__.__name__ 

175 

176 if len(sanitized) <= _MAX_SPAN_EXCEPTION_MESSAGE_LENGTH: 

177 return sanitized 

178 

179 truncated_length = _MAX_SPAN_EXCEPTION_MESSAGE_LENGTH - 3 

180 return f"{sanitized[:truncated_length]}..." 

181 

182 

183def _get_deployment_environment() -> str: 

184 """Return the current deployment environment label. 

185 

186 Returns: 

187 Deployment environment label derived from configuration. 

188 """ 

189 return get_settings().deployment_env 

190 

191 

192def _is_langfuse_otlp_endpoint(endpoint: Optional[str]) -> bool: 

193 """Return whether the OTLP endpoint points at a Langfuse ingestion path. 

194 

195 Args: 

196 endpoint: OTLP endpoint URL to inspect. 

197 

198 Returns: 

199 ``True`` when the endpoint path matches Langfuse's public OTLP ingestion path. 

200 """ 

201 if not endpoint: 

202 return False 

203 

204 try: 

205 return _LANGFUSE_OTEL_PATH_FRAGMENT in urlparse(endpoint).path 

206 except Exception: 

207 return _LANGFUSE_OTEL_PATH_FRAGMENT in endpoint 

208 

209 

210def _resolve_langfuse_basic_auth() -> str: 

211 """Resolve Langfuse OTLP basic auth from explicit auth or project keys. 

212 

213 Returns: 

214 Base64-encoded Langfuse basic-auth credential string, or an empty string when Langfuse auth is not configured. 

215 """ 

216 cfg = get_settings() 

217 explicit_auth = cfg.langfuse_otel_auth.get_secret_value().strip() if cfg.langfuse_otel_auth else "" 

218 if explicit_auth: 

219 return explicit_auth 

220 

221 public_key = cfg.langfuse_public_key.get_secret_value().strip() if cfg.langfuse_public_key else "" 

222 secret_key = cfg.langfuse_secret_key.get_secret_value().strip() if cfg.langfuse_secret_key else "" 

223 if not public_key or not secret_key: 

224 return "" 

225 

226 return base64.b64encode(f"{public_key}:{secret_key}".encode("utf-8")).decode("ascii") 

227 

228 

229def _resolve_otlp_endpoint() -> Optional[str]: 

230 """Resolve the OTLP endpoint from generic or Langfuse-specific configuration. 

231 

232 Returns: 

233 Configured OTLP endpoint, preferring the Langfuse-specific override when present. 

234 """ 

235 cfg = get_settings() 

236 return cfg.langfuse_otel_endpoint or cfg.otel_exporter_otlp_endpoint 

237 

238 

239def _parse_otlp_headers(headers: Optional[str]) -> Dict[str, str]: 

240 """Parse OTLP headers from a comma-separated key=value string. 

241 

242 Args: 

243 headers: Raw header string from configuration. 

244 

245 Returns: 

246 Parsed OTLP headers mapping. 

247 """ 

248 parsed: Dict[str, str] = {} 

249 if not headers: 

250 return parsed 

251 

252 for header in headers.split(","): 

253 if "=" not in header: 

254 continue 

255 key, value = header.split("=", 1) 

256 key = key.strip() 

257 if key: 

258 parsed[key] = value.strip() 

259 return parsed 

260 

261 

262def _get_header_case_insensitive(headers: Dict[str, str], name: str) -> Optional[str]: 

263 """Return a header value using case-insensitive name matching. 

264 

265 Args: 

266 headers: Header mapping to inspect. 

267 name: Header name to resolve. 

268 

269 Returns: 

270 Matching header value, or ``None`` when not present. 

271 """ 

272 normalized_name = name.lower() 

273 for key, value in headers.items(): 

274 if key.lower() == normalized_name: 

275 return value 

276 return None 

277 

278 

279def _set_header_case_insensitive(headers: Dict[str, str], name: str, value: str) -> None: 

280 """Set a header value while preserving an existing key's original casing when present. 

281 

282 Args: 

283 headers: Header mapping to mutate. 

284 name: Header name to set. 

285 value: Header value to store. 

286 """ 

287 normalized_name = name.lower() 

288 for key in list(headers.keys()): 

289 if key.lower() == normalized_name: 

290 headers[key] = value 

291 return 

292 headers[name] = value 

293 

294 

295def _resolve_otlp_headers(endpoint: Optional[str]) -> Dict[str, str]: 

296 """Resolve OTLP headers, deriving Langfuse basic auth when possible. 

297 

298 Args: 

299 endpoint: Resolved OTLP endpoint URL. 

300 

301 Returns: 

302 OTLP header mapping suitable for exporter configuration. 

303 """ 

304 cfg = get_settings() 

305 headers = _parse_otlp_headers(cfg.otel_exporter_otlp_headers) 

306 

307 if not _is_langfuse_otlp_endpoint(endpoint): 

308 return headers 

309 

310 basic_auth = _resolve_langfuse_basic_auth() 

311 if basic_auth and not _get_header_case_insensitive(headers, "Authorization"): 

312 _set_header_case_insensitive(headers, "Authorization", f"Basic {basic_auth}") 

313 

314 return headers 

315 

316 

317def _validate_langfuse_configuration(endpoint: Optional[str], headers: Dict[str, str]) -> None: 

318 """Fail closed when Langfuse OTLP is configured without usable credentials. 

319 

320 Args: 

321 endpoint: OTLP endpoint URL configured for export. 

322 headers: Authorization header value configured for the OTLP exporter. 

323 

324 Raises: 

325 RuntimeError: If a Langfuse endpoint is configured without credentials. 

326 """ 

327 if not _is_langfuse_otlp_endpoint(endpoint): 

328 return 

329 

330 authorization = _get_header_case_insensitive(headers, "Authorization") 

331 if authorization: 

332 try: 

333 scheme, encoded = authorization.strip().split(None, 1) 

334 decoded = base64.b64decode(encoded.strip(), validate=True).decode("utf-8") 

335 public_key, secret_key = decoded.split(":", 1) 

336 if scheme.lower() == "basic" and public_key and secret_key: 

337 return 

338 except (ValueError, UnicodeDecodeError): 

339 pass 

340 

341 message = ( 

342 "Langfuse OTLP endpoint configured without valid Basic Authorization credentials. " + "Set OTEL_EXPORTER_OTLP_HEADERS, LANGFUSE_OTEL_AUTH, " + "or LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY." 

343 ) 

344 raise RuntimeError(message) 

345 

346 

347def _should_emit_langfuse_attributes() -> bool: 

348 """Return whether Langfuse-specific span attributes should be emitted. 

349 

350 Returns: 

351 ``True`` when Langfuse-specific span attributes should be attached. 

352 """ 

353 cfg = get_settings() 

354 if cfg.otel_emit_langfuse_attributes is not None: 

355 return cfg.otel_emit_langfuse_attributes 

356 return _is_langfuse_otlp_endpoint(_resolve_otlp_endpoint()) 

357 

358 

359def _should_capture_identity_attributes() -> bool: 

360 """Return whether user/team identity attributes should be emitted on spans. 

361 

362 Returns: 

363 ``True`` when user/team identity metadata should be attached to spans. 

364 """ 

365 cfg = get_settings() 

366 if cfg.otel_capture_identity_attributes is not None: 

367 return cfg.otel_capture_identity_attributes 

368 return _should_emit_langfuse_attributes() 

369 

370 

371def _should_emit_span_attribute(attribute_name: str) -> bool: 

372 """Return whether a span attribute should be emitted under current policy. 

373 

374 Args: 

375 attribute_name: Span attribute key. 

376 

377 Returns: 

378 ``True`` when the attribute passes the current export policy. 

379 """ 

380 if attribute_name.startswith("langfuse.") and not _should_emit_langfuse_attributes(): 

381 return False 

382 if attribute_name in _IDENTITY_ATTRIBUTE_KEYS and not _should_capture_identity_attributes(): 

383 return False 

384 return True 

385 

386 

387def set_span_attribute(span: Any, attribute_name: str, value: Any) -> None: 

388 """Set a span attribute after applying export and sanitization policy. 

389 

390 Args: 

391 span: Active span object. 

392 attribute_name: Span attribute key. 

393 value: Attribute value to set. 

394 """ 

395 if not span or value is None: 

396 return 

397 if not _should_emit_span_attribute(attribute_name): 

398 return 

399 span.set_attribute(attribute_name, sanitize_trace_attribute_value(attribute_name, value)) 

400 

401 

402def _set_pre_sanitized_span_attribute(span: Any, attribute_name: str, value: Any) -> None: 

403 """Set a span attribute that has already been sanitized and bounded. 

404 

405 Args: 

406 span: Active span object. 

407 attribute_name: Span attribute key. 

408 value: Pre-sanitized attribute value to set. 

409 """ 

410 if not span or value is None: 

411 return 

412 if not _should_emit_span_attribute(attribute_name): 

413 return 

414 span.set_attribute(attribute_name, value) 

415 

416 

417def _record_sanitized_exception_event(span: Any, exc_type: Optional[type], error_message: str) -> None: 

418 """Record a sanitized exception event on a span without exporting raw exception text. 

419 

420 Args: 

421 span: Active span object. 

422 exc_type: Exception class associated with the failure. 

423 error_message: Sanitized exception message to attach to the event. 

424 """ 

425 if not span or exc_type is None: 

426 return 

427 

428 attrs = { 

429 "exception.type": exc_type.__name__, 

430 "exception.message": error_message, 

431 "exception.escaped": True, 

432 } 

433 

434 if hasattr(span, "add_event"): 

435 span.add_event("exception", attributes=attrs) 

436 return 

437 

438 if hasattr(span, "record_exception"): 

439 try: 

440 sanitized_exc = exc_type(error_message) 

441 except Exception: 

442 sanitized_exc = Exception(error_message) 

443 span.record_exception(sanitized_exc) 

444 

445 

446def set_span_error( 

447 span: Any, 

448 error: str | BaseException, 

449 *, 

450 exc_type: Optional[type] = None, 

451 record_exception: bool = False, 

452) -> None: 

453 """Mark a span as failed with a sanitized error message. 

454 

455 Args: 

456 span: Active span object. 

457 error: Exception instance or message text describing the failure. 

458 exc_type: Optional explicit exception type for the failure. 

459 record_exception: Whether to add a sanitized exception event to the span. 

460 """ 

461 if not span: 

462 return 

463 

464 if isinstance(error, BaseException): 

465 error_message = _sanitize_span_exception_message(error) 

466 resolved_exc_type = exc_type or type(error) 

467 else: 

468 error_message = sanitize_for_log(sanitize_trace_text(str(error))).strip() or "Error" 

469 resolved_exc_type = exc_type 

470 

471 if record_exception: 

472 _record_sanitized_exception_event(span, resolved_exc_type, error_message) 

473 

474 if OTEL_AVAILABLE and Status and StatusCode: 

475 span.set_status(Status(StatusCode.ERROR, error_message)) 

476 

477 set_span_attribute(span, "error", True) 

478 if resolved_exc_type is not None: 

479 set_span_attribute(span, "error.type", resolved_exc_type.__name__) 

480 _set_pre_sanitized_span_attribute(span, "error.message", error_message) 

481 set_span_attribute(span, "langfuse.observation.level", "ERROR") 

482 _set_pre_sanitized_span_attribute(span, "langfuse.observation.status_message", error_message) 

483 

484 

485def _derive_langfuse_trace_name(name: str, attributes: Dict[str, Any]) -> str: 

486 """Derive a human-readable Langfuse trace name from span context. 

487 

488 Args: 

489 name: Raw span name. 

490 attributes: Span attributes used to derive a more readable Langfuse trace label. 

491 

492 Returns: 

493 Human-readable trace name suitable for Langfuse dashboards. 

494 """ 

495 

496 def _display_value(value: Any) -> Any: 

497 """Sanitize string display values before promoting them into a trace name. 

498 

499 Args: 

500 value: Candidate trace-name fragment. 

501 

502 Returns: 

503 Sanitized string values, or the original non-string value unchanged. 

504 """ 

505 if isinstance(value, str): 

506 return sanitize_trace_text(value) 

507 return value 

508 

509 if name == "tool.invoke" and attributes.get("tool.name"): 

510 return f"Tool: {_display_value(attributes['tool.name'])}" 

511 if name == "tool.list": 

512 return "Tools" 

513 if name == "prompt.render" and attributes.get("prompt.id"): 

514 return f"Prompt: {_display_value(attributes['prompt.id'])}" 

515 if name == "prompt.list": 

516 return "Prompts" 

517 if name == "resource.read" and attributes.get("resource.uri"): 

518 return f"Resource: {_display_value(attributes['resource.uri'])}" 

519 if name == "resource.list": 

520 return "Resources" 

521 if name == "resource_template.list": 

522 return "Resource Templates" 

523 if name == "root.list": 

524 return "Roots" 

525 if name in {"llm.proxy", "llm.chat"} and attributes.get("gen_ai.request.model"): 

526 prefix = "LLM Proxy" if name == "llm.proxy" else "LLM Chat" 

527 return f"{prefix}: {_display_value(attributes['gen_ai.request.model'])}" 

528 if name.startswith("a2a.") and attributes.get("a2a.agent.name"): 

529 return f"A2A: {_display_value(attributes['a2a.agent.name'])}" 

530 return name 

531 

532 

533def otel_tracing_enabled() -> bool: 

534 """Return whether OpenTelemetry tracing is active in this process. 

535 

536 Returns: 

537 bool: ``True`` when a tracer has been initialised, otherwise ``False``. 

538 """ 

539 

540 return _TRACER is not None 

541 

542 

543def otel_context_active() -> bool: 

544 """Return whether the current async context carries an active OTEL span. 

545 

546 Returns: 

547 bool: ``True`` when the current context has a valid OTEL span. 

548 """ 

549 

550 if not OTEL_AVAILABLE or trace is None: 

551 return False 

552 try: 

553 current_span = trace.get_current_span() 

554 if current_span is None: 

555 return False 

556 span_context = current_span.get_span_context() 

557 return bool(getattr(span_context, "is_valid", False)) 

558 except Exception: 

559 return False 

560 

561 

562def inject_trace_context_headers(headers: Optional[Mapping[str, str]] = None) -> Dict[str, str]: 

563 """Return a header carrier populated with the active W3C trace context. 

564 

565 Args: 

566 headers: Existing outbound headers to copy into the carrier before trace injection. 

567 

568 Returns: 

569 Dict[str, str]: Header mapping including any injected trace context. 

570 """ 

571 

572 carrier = {str(key): str(value) for key, value in (headers or {}).items() if key and value} 

573 if not otel_context_active() or otel_inject is None: 

574 return carrier 

575 try: 

576 otel_inject(carrier=carrier) 

577 except Exception as exc: 

578 logger.debug("Failed to inject W3C trace context into outbound headers: %s", exc) 

579 return carrier 

580 

581 

582def _scope_headers_to_carrier(scope_headers: list[tuple[bytes, bytes]]) -> Dict[str, str]: 

583 """Convert ASGI scope headers to a text carrier for propagation/extraction. 

584 

585 Args: 

586 scope_headers: Raw ASGI header tuples from the request scope. 

587 

588 Returns: 

589 Dict[str, str]: Decoded lower-cased header carrier suitable for OTEL propagation. 

590 """ 

591 

592 carrier: Dict[str, str] = {} 

593 for key, value in scope_headers: 

594 try: 

595 decoded_key = key.decode("latin-1").lower() 

596 decoded_value = value.decode("latin-1") 

597 except (AttributeError, TypeError, UnicodeDecodeError): 

598 continue 

599 carrier[decoded_key] = decoded_value 

600 return carrier 

601 

602 

603def _should_trace_request_path(path: str) -> bool: 

604 """Return whether Phase 1 OTEL request tracing should instrument the path. 

605 

606 Args: 

607 path: Incoming request path. 

608 

609 Returns: 

610 bool: ``True`` when the path should be wrapped in a request span. 

611 """ 

612 

613 normalized = path.rstrip("/") or "/" 

614 if normalized in {"/rpc", "/mcp", "/mcp/sse", "/mcp/message", "/message", "/sse"}: 

615 return True 

616 if normalized.startswith("/servers/") and (normalized.endswith("/mcp") or normalized.endswith("/message") or normalized.endswith("/sse")): 

617 return True 

618 if normalized.startswith("/_internal/mcp/"): 

619 return True 

620 return False 

621 

622 

623class OpenTelemetryRequestMiddleware: 

624 """Raw ASGI middleware that creates request-root spans for gateway transport flows.""" 

625 

626 def __init__(self, app: Any, should_trace_request_path: Optional[Callable[[str], bool]] = None): 

627 """Initialize the middleware wrapper. 

628 

629 Args: 

630 app: Wrapped ASGI application. 

631 should_trace_request_path: Optional predicate that decides whether a request path 

632 should be instrumented. 

633 """ 

634 self.app = app 

635 self.should_trace_request_path = should_trace_request_path or _should_trace_request_path 

636 

637 def __getattr__(self, name: str) -> Any: 

638 """Proxy attribute access to the wrapped ASGI app. 

639 

640 This preserves access to app-specific attributes such as ``routes`` when 

641 the middleware is used as the top-level object passed to uvicorn. 

642 

643 Args: 

644 name: Attribute name to resolve on the wrapped ASGI app. 

645 

646 Returns: 

647 Any: The proxied attribute value from ``self.app``. 

648 """ 

649 

650 return getattr(self.app, name) 

651 

652 async def __call__(self, scope: Mapping[str, Any], receive: Any, send: Any) -> None: 

653 """Handle an ASGI request and create a request-root span when tracing applies. 

654 

655 Args: 

656 scope: ASGI connection scope. 

657 receive: ASGI receive callable. 

658 send: ASGI send callable. 

659 

660 Raises: 

661 Exception: Any exception raised by the wrapped ASGI application. 

662 """ 

663 if scope.get("type") != "http" or _TRACER is None: 

664 await self.app(scope, receive, send) 

665 return 

666 

667 path = str(scope.get("path", "") or "") 

668 if not self.should_trace_request_path(path): 

669 await self.app(scope, receive, send) 

670 return 

671 

672 method = str(scope.get("method", "GET") or "GET").upper() 

673 scope_headers = list(scope.get("headers", []) or []) 

674 carrier = _scope_headers_to_carrier(scope_headers) 

675 

676 parent_context = None 

677 if otel_extract is not None: 

678 try: 

679 parent_context = otel_extract(carrier=carrier) 

680 except Exception as exc: 

681 logger.debug("Failed to extract W3C trace context for %s %s: %s", method, path, exc) 

682 

683 server = scope.get("server") or ("", None) 

684 client = scope.get("client") or ("", None) 

685 query_string = scope.get("query_string", b"") or b"" 

686 if isinstance(query_string, bytes): 

687 query_text = query_string.decode("latin-1") 

688 else: 

689 query_text = str(query_string) 

690 

691 # Sanitize query string to prevent leaking session_id and other sensitive parameters 

692 sanitized_query = sanitize_trace_text(query_text) if query_text else None 

693 

694 span_name = f"{method} {path or '/'}" 

695 span_attributes: Dict[str, Any] = { 

696 "http.request.method": method, 

697 "http.route": path or "/", 

698 "url.path": path or "/", 

699 "url.query": sanitized_query, 

700 "network.protocol.version": scope.get("http_version"), 

701 "server.address": server[0] if len(server) > 0 else None, 

702 "server.port": server[1] if len(server) > 1 else None, 

703 "client.address": client[0] if len(client) > 0 else None, 

704 "client.port": client[1] if len(client) > 1 else None, 

705 "user_agent.original": carrier.get("user-agent"), 

706 "correlation_id": carrier.get("x-correlation-id"), 

707 } 

708 

709 start_span_kwargs: Dict[str, Any] = {} 

710 if parent_context is not None: 

711 start_span_kwargs["context"] = parent_context 

712 if SpanKind is not None: 

713 start_span_kwargs["kind"] = SpanKind.SERVER 

714 

715 status_code_holder: Dict[str, int] = {} 

716 

717 async def _send_with_span_status(message: Mapping[str, Any]) -> None: 

718 if message.get("type") == "http.response.start": 

719 status_code = int(message.get("status", 0) or 0) 

720 status_code_holder["status"] = status_code 

721 if span is not None and status_code: 

722 set_span_attribute(span, "http.response.status_code", status_code) 

723 if OTEL_AVAILABLE and Status and StatusCode: 

724 if status_code >= 500: 

725 span.set_status(Status(StatusCode.ERROR)) 

726 else: 

727 span.set_status(Status(StatusCode.OK)) 

728 await send(message) 

729 

730 with _TRACER.start_as_current_span(span_name, **start_span_kwargs) as span: 

731 if span is not None: 

732 for key, value in span_attributes.items(): 

733 if value is not None: 

734 set_span_attribute(span, key, value) 

735 

736 try: 

737 await self.app(scope, receive, _send_with_span_status) 

738 if span is not None and "status" not in status_code_holder and OTEL_AVAILABLE and Status and StatusCode: 

739 span.set_status(Status(StatusCode.OK)) 

740 except Exception as exc: 

741 if span is not None: 

742 error_message = _sanitize_span_exception_message(exc) 

743 set_span_error(span, exc, record_exception=True) 

744 if OTEL_AVAILABLE and Status and StatusCode: 

745 span.set_status(Status(StatusCode.ERROR, error_message)) 

746 raise 

747 

748 

749def init_telemetry() -> Optional[Any]: 

750 """Initialize OpenTelemetry with configurable backend. 

751 

752 Supports multiple backends via environment variables: 

753 - OTEL_TRACES_EXPORTER: Exporter type (otlp, jaeger, zipkin, console, none) 

754 - OTEL_EXPORTER_OTLP_ENDPOINT: OTLP endpoint (for otlp exporter) 

755 - OTEL_EXPORTER_JAEGER_ENDPOINT: Jaeger endpoint (for jaeger exporter) 

756 - OTEL_EXPORTER_ZIPKIN_ENDPOINT: Zipkin endpoint (for zipkin exporter) 

757 - OTEL_ENABLE_OBSERVABILITY: Set to 'true' to enable (disabled by default) 

758 

759 Returns: 

760 The initialized tracer instance or None if disabled. 

761 """ 

762 # pylint: disable=global-statement 

763 global _TRACER 

764 cfg = get_settings() 

765 

766 # Check if observability is disabled (default: disabled) 

767 if not cfg.otel_enable_observability: 

768 logger.info("Observability disabled via OTEL_ENABLE_OBSERVABILITY=false") 

769 return None 

770 

771 # If OpenTelemetry isn't installed, return early with graceful degradation 

772 if not OTEL_AVAILABLE: 

773 logger.warning("OpenTelemetry not installed - telemetry disabled") 

774 logger.info("To enable telemetry, install: pip install mcp-contextforge-gateway[observability]") 

775 return None 

776 

777 # Get exporter type from environment 

778 exporter_type = cfg.otel_traces_exporter.lower() 

779 

780 # Handle 'none' exporter (tracing disabled) 

781 if exporter_type == "none": 

782 logger.info("Tracing disabled via OTEL_TRACES_EXPORTER=none") 

783 return None 

784 

785 # Check if endpoint is configured for otlp 

786 if exporter_type == "otlp": 

787 endpoint = _resolve_otlp_endpoint() 

788 if not endpoint: 

789 logger.info("OTLP endpoint not configured, skipping telemetry init") 

790 return None 

791 _validate_langfuse_configuration(endpoint, _resolve_otlp_headers(endpoint)) 

792 

793 try: 

794 # Create resource attributes 

795 resource_attributes: Dict[str, Any] = { 

796 "service.name": cfg.otel_service_name, 

797 "service.version": "1.0.0-RC-2", 

798 "deployment.environment": _get_deployment_environment(), 

799 } 

800 

801 # Add custom resource attributes from environment 

802 custom_attrs = cfg.otel_resource_attributes or "" 

803 if custom_attrs: 

804 for attr in custom_attrs.split(","): 

805 if "=" in attr: 

806 key, value = attr.split("=", 1) 

807 resource_attributes[key.strip()] = value.strip() 

808 

809 # Narrow types for mypy/pyrefly 

810 # Create resource if available, else skip 

811 resource: Optional[Any] 

812 if Resource is not None and hasattr(Resource, "create"): 

813 resource = cast(Any, Resource).create(resource_attributes) 

814 else: 

815 resource = None 

816 

817 # Set up tracer provider with optional sampling 

818 # Initialize tracer provider (with resource if available) 

819 if resource is not None: 

820 provider = cast(Any, TracerProvider)(resource=resource) 

821 else: 

822 provider = cast(Any, TracerProvider)() 

823 

824 # Register provider if trace API is present 

825 if trace is not None and hasattr(trace, "set_tracer_provider"): 

826 cast(Any, trace).set_tracer_provider(provider) 

827 

828 # Create a custom span processor to copy resource attributes to span attributes 

829 # This is needed because Arize requires arize.project.name as a span attribute 

830 class ResourceAttributeSpanProcessor: 

831 """Span processor that copies specific resource attributes to span attributes.""" 

832 

833 def __init__(self, attributes_to_copy=None): 

834 self.attributes_to_copy = attributes_to_copy or ["arize.project.name", "model_id"] 

835 logger.info(f"ResourceAttributeSpanProcessor will copy: {self.attributes_to_copy}") 

836 

837 def on_start(self, span, _parent_context=None): 

838 """Copy specified resource attributes to span attributes when span starts. 

839 

840 Args: 

841 span: The span being started. 

842 _parent_context: The parent context (unused, required by interface). 

843 """ 

844 if not hasattr(span, "resource") or span.resource is None: 

845 return 

846 

847 # Get resource attributes 

848 resource_attributes = getattr(span.resource, "attributes", {}) 

849 

850 # Copy specified attributes from resource to span 

851 for attr in self.attributes_to_copy: 

852 if attr in resource_attributes: 

853 value = resource_attributes[attr] 

854 span.set_attribute(attr, value) 

855 logger.debug(f"Copied resource attribute to span: {attr}={value}") 

856 

857 def on_end(self, span): 

858 """Handle span end event. 

859 

860 Required by the SpanProcessor interface but not used. 

861 

862 Args: 

863 span: The span being ended. 

864 """ 

865 pass # pylint: disable=unnecessary-pass 

866 

867 # Add the custom span processor to copy resource attributes to spans 

868 # This is needed for Arize which requires certain attributes as span attributes 

869 # Enable via OTEL_COPY_RESOURCE_ATTRS_TO_SPANS=true (disabled by default) 

870 copy_resource_attrs = cfg.otel_copy_resource_attrs_to_spans 

871 if resource is not None and copy_resource_attrs: 

872 logger.info("Adding ResourceAttributeSpanProcessor to copy resource attributes to spans") 

873 provider.add_span_processor(ResourceAttributeSpanProcessor()) 

874 

875 # Configure the appropriate exporter based on type 

876 exporter: Optional[Any] = None 

877 

878 if exporter_type == "otlp": 

879 endpoint = _resolve_otlp_endpoint() 

880 protocol = cfg.otel_exporter_otlp_protocol.lower() 

881 header_dict = _resolve_otlp_headers(endpoint) 

882 if _is_langfuse_otlp_endpoint(endpoint): 

883 protocol = "http" 

884 # Note: some versions of OTLP exporters may not accept 'insecure' kwarg; avoid passing it. 

885 # Use endpoint scheme or env to control TLS externally. 

886 

887 if protocol == "grpc" and OTLP_SPAN_EXPORTER: 

888 exporter = cast(Any, OTLP_SPAN_EXPORTER)(endpoint=endpoint, headers=header_dict or None) 

889 elif HTTP_EXPORTER: 

890 # Use HTTP exporter as fallback 

891 ep = str(endpoint) if endpoint is not None else "" 

892 http_ep = (ep.replace(":4317", ":4318") + "/v1/traces") if ":4317" in ep else ep 

893 exporter = cast(Any, HTTP_EXPORTER)(endpoint=http_ep, headers=header_dict or None) 

894 else: 

895 logger.error("No OTLP exporter available") 

896 return None 

897 

898 elif exporter_type == "jaeger": 

899 if JAEGER_EXPORTER: 

900 endpoint = cfg.otel_exporter_jaeger_endpoint or "http://localhost:14268/api/traces" 

901 exporter = JAEGER_EXPORTER( 

902 collector_endpoint=endpoint, 

903 username=cfg.otel_exporter_jaeger_user, 

904 password=cfg.otel_exporter_jaeger_password.get_secret_value() if cfg.otel_exporter_jaeger_password else None, 

905 ) 

906 else: 

907 logger.error("Jaeger exporter not available. Install with: pip install opentelemetry-exporter-jaeger") 

908 return None 

909 

910 elif exporter_type == "zipkin": 

911 if ZIPKIN_EXPORTER: 

912 endpoint = cfg.otel_exporter_zipkin_endpoint or "http://localhost:9411/api/v2/spans" 

913 exporter = ZIPKIN_EXPORTER(endpoint=endpoint) 

914 else: 

915 logger.error("Zipkin exporter not available. Install with: pip install opentelemetry-exporter-zipkin") 

916 return None 

917 

918 elif exporter_type == "console": 

919 # Console exporter for debugging 

920 exporter = cast(Any, ConsoleSpanExporter)() 

921 

922 else: 

923 logger.warning(f"Unknown exporter type: {exporter_type}. Using console exporter.") 

924 exporter = cast(Any, ConsoleSpanExporter)() 

925 

926 if exporter: 

927 # Add batch processor for better performance (except for console) 

928 if exporter_type == "console": 

929 span_processor = cast(Any, SimpleSpanProcessor)(exporter) 

930 else: 

931 span_processor = cast(Any, BatchSpanProcessor)( 

932 exporter, 

933 max_queue_size=cfg.otel_bsp_max_queue_size, 

934 max_export_batch_size=cfg.otel_bsp_max_export_batch_size, 

935 schedule_delay_millis=cfg.otel_bsp_schedule_delay, 

936 ) 

937 provider.add_span_processor(span_processor) 

938 

939 # Get tracer 

940 # Obtain a tracer if trace API available; otherwise create a no-op tracer 

941 if trace is not None and hasattr(trace, "get_tracer"): 

942 _TRACER = cast(Any, trace).get_tracer("mcp-gateway", "1.0.0-RC-2", schema_url="https://opentelemetry.io/schemas/1.11.0") 

943 else: 

944 

945 class _NoopTracer: 

946 """No-op tracer used when OpenTelemetry API isn't available.""" 

947 

948 def start_as_current_span(self, _name: str): # type: ignore[override] 

949 """Return a no-op context manager for span creation. 

950 

951 Args: 

952 _name: Span name (ignored in no-op implementation). 

953 

954 Returns: 

955 contextlib.AbstractContextManager: A null context. 

956 """ 

957 return nullcontext() 

958 

959 _TRACER = _NoopTracer() 

960 

961 logger.info(f"✅ OpenTelemetry initialized with {exporter_type} exporter") 

962 if exporter_type == "otlp": 

963 logger.info(f" Endpoint: {_resolve_otlp_endpoint()}") 

964 elif exporter_type == "jaeger": 

965 logger.info(f" Endpoint: {cfg.otel_exporter_jaeger_endpoint or 'default'}") 

966 elif exporter_type == "zipkin": 

967 logger.info(f" Endpoint: {cfg.otel_exporter_zipkin_endpoint or 'default'}") 

968 

969 return _TRACER 

970 

971 except Exception as e: 

972 logger.error(f"Failed to initialize OpenTelemetry: {e}") 

973 return None 

974 

975 

976def trace_operation(operation_name: str, attributes: Optional[Dict[str, Any]] = None) -> Callable[[Callable[..., Any]], Callable[..., Any]]: 

977 """ 

978 Simple decorator to trace any operation. 

979 

980 Args: 

981 operation_name: Name of the operation to trace (e.g., "tool.invoke"). 

982 attributes: Optional dictionary of attributes to add to the span. 

983 

984 Returns: 

985 Decorator function that wraps the target function with tracing. 

986 

987 Usage: 

988 @trace_operation("tool.invoke", {"tool.name": "calculator"}) 

989 async def invoke_tool(): 

990 ... 

991 """ 

992 

993 def decorator(func: Callable[..., Any]) -> Callable[..., Any]: 

994 """Decorator that wraps the function with tracing. 

995 

996 Args: 

997 func: The async function to wrap with tracing. 

998 

999 Returns: 

1000 The wrapped function with tracing capabilities. 

1001 """ 

1002 

1003 async def wrapper(*args: Any, **kwargs: Any) -> Any: 

1004 """Async wrapper that adds tracing to the decorated function. 

1005 

1006 Args: 

1007 *args: Positional arguments passed to the wrapped function. 

1008 **kwargs: Keyword arguments passed to the wrapped function. 

1009 

1010 Returns: 

1011 The result of the wrapped function. 

1012 

1013 Raises: 

1014 Exception: Any exception raised by the wrapped function. 

1015 """ 

1016 if not _TRACER: 

1017 # No tracing configured, just run the function 

1018 return await func(*args, **kwargs) 

1019 

1020 # Create span for this operation 

1021 with _TRACER.start_as_current_span(operation_name) as span: 

1022 # Add attributes if provided 

1023 if attributes: 

1024 for key, value in attributes.items(): 

1025 set_span_attribute(span, key, value) 

1026 

1027 try: 

1028 # Run the actual function 

1029 result = await func(*args, **kwargs) 

1030 set_span_attribute(span, "status", "success") 

1031 return result 

1032 except Exception as e: 

1033 set_span_attribute(span, "status", "error") 

1034 set_span_error(span, e, record_exception=True) 

1035 raise 

1036 

1037 return wrapper 

1038 

1039 return decorator 

1040 

1041 

1042def create_span(name: str, attributes: Optional[Dict[str, Any]] = None) -> Any: 

1043 """ 

1044 Create a span for manual instrumentation. 

1045 

1046 Args: 

1047 name: Name of the span to create (e.g., "database.query"). 

1048 attributes: Optional dictionary of attributes to add to the span. 

1049 

1050 Returns: 

1051 Context manager that creates and manages the span lifecycle. 

1052 

1053 Usage: 

1054 with create_span("database.query", {"db.statement": "SELECT * FROM tools"}): 

1055 # Your code here 

1056 pass 

1057 """ 

1058 if not _TRACER: 

1059 # Return a no-op context manager if tracing is not configured or available 

1060 return nullcontext() 

1061 

1062 attributes = dict(attributes or {}) 

1063 

1064 # Auto-inject correlation ID into all spans for request tracing 

1065 try: 

1066 correlation_id = get_correlation_id() 

1067 if correlation_id: 

1068 attributes.setdefault("correlation_id", correlation_id) 

1069 attributes.setdefault("request_id", correlation_id) 

1070 except Exception as exc: 

1071 # Correlation ID not available or error getting it, continue without it 

1072 logger.debug("Failed to add correlation_id to span: %s", exc) 

1073 

1074 try: 

1075 user_email = get_trace_user_email() 

1076 user_is_admin = get_trace_user_is_admin() 

1077 team_scope = get_trace_team_scope() 

1078 team_name = get_trace_team_name() 

1079 auth_method = get_trace_auth_method() 

1080 session_id = get_trace_session_id() 

1081 environment = _get_deployment_environment() 

1082 

1083 if _should_capture_identity_attributes(): 

1084 if user_email: 

1085 attributes.setdefault("user.email", user_email) 

1086 if user_email or user_is_admin: 

1087 attributes.setdefault("user.is_admin", user_is_admin) 

1088 if team_scope: 

1089 attributes.setdefault("team.scope", team_scope) 

1090 if team_name: 

1091 attributes.setdefault("team.name", team_name) 

1092 

1093 if auth_method: 

1094 attributes.setdefault("auth.method", auth_method) 

1095 

1096 if _should_emit_langfuse_attributes(): 

1097 if _should_capture_identity_attributes() and user_email: 

1098 attributes.setdefault("langfuse.user.id", user_email) 

1099 if session_id: 

1100 attributes.setdefault("langfuse.session.id", session_id) 

1101 attributes.setdefault("langfuse.environment", environment) 

1102 

1103 tags: list[str] = [] 

1104 primary_team = primary_team_from_scope(team_scope) 

1105 if _should_capture_identity_attributes() and primary_team: 

1106 tags.append(f"team:{primary_team}") 

1107 if auth_method: 

1108 tags.append(f"auth:{auth_method}") 

1109 if environment: 

1110 tags.append(f"env:{environment}") 

1111 if tags: 

1112 attributes.setdefault("langfuse.trace.tags", tags) 

1113 

1114 trace_name_attributes = {key: sanitize_trace_attribute_value(key, value) for key, value in attributes.items() if _should_emit_span_attribute(key)} 

1115 attributes.setdefault("langfuse.trace.name", _derive_langfuse_trace_name(name, trace_name_attributes)) 

1116 attributes.setdefault("langfuse.observation.level", "DEFAULT") 

1117 except Exception as exc: 

1118 logger.debug("Failed to auto-inject trace context into span: %s", exc) 

1119 

1120 # Start span and return the context manager 

1121 span_context = _TRACER.start_as_current_span(name) 

1122 

1123 # If we have attributes and the span context is entered, set them 

1124 if attributes: 

1125 # We need to set attributes after entering the context 

1126 # So we'll create a wrapper that sets attributes 

1127 class SpanWithAttributes: 

1128 """Context manager wrapper that adds attributes to a span. 

1129 

1130 This class wraps an OpenTelemetry span context and adds attributes 

1131 when entering the context. It also handles exception recording when 

1132 exiting the context. 

1133 """ 

1134 

1135 def __init__(self, span_context: Any, attrs: Dict[str, Any]): 

1136 """Initialize the span wrapper. 

1137 

1138 Args: 

1139 span_context: The OpenTelemetry span context to wrap. 

1140 attrs: Dictionary of attributes to add to the span. 

1141 """ 

1142 self.span_context: Any = span_context 

1143 self.attrs: Dict[str, Any] = attrs 

1144 self.span: Any = None 

1145 

1146 def __enter__(self) -> Any: 

1147 """Enter the context and set span attributes. 

1148 

1149 Returns: 

1150 The OpenTelemetry span with attributes set. 

1151 """ 

1152 self.span = self.span_context.__enter__() 

1153 if self.attrs and self.span: 

1154 for key, value in self.attrs.items(): 

1155 if value is not None: # Skip None values 

1156 set_span_attribute(self.span, key, value) 

1157 return self.span 

1158 

1159 def __exit__(self, exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Any) -> Any: 

1160 """Exit the context and record any exceptions. 

1161 

1162 Args: 

1163 exc_type: The exception type if an exception occurred. 

1164 exc_val: The exception value if an exception occurred. 

1165 exc_tb: The exception traceback if an exception occurred. 

1166 

1167 Returns: 

1168 The result of the wrapped span context's __exit__ method. 

1169 """ 

1170 # Record exception if one occurred 

1171 if exc_type is not None and self.span: 

1172 set_span_error(self.span, exc_val or exc_type.__name__, exc_type=exc_type, record_exception=True) 

1173 elif self.span: 

1174 if OTEL_AVAILABLE and Status and StatusCode: 

1175 self.span.set_status(Status(StatusCode.OK)) 

1176 return self.span_context.__exit__(exc_type, exc_val, exc_tb) 

1177 

1178 return SpanWithAttributes(span_context, attributes) 

1179 

1180 return span_context 

1181 

1182 

1183def create_child_span(name: str, attributes: Optional[Dict[str, Any]] = None) -> Any: 

1184 """Create a nested span using the current trace context. 

1185 

1186 This is an alias for ``create_span()`` used where child-span intent is part 

1187 of the local code structure. 

1188 

1189 Args: 

1190 name: Span name. 

1191 attributes: Optional attributes to attach to the created span. 

1192 

1193 Returns: 

1194 Span context manager returned by ``create_span()``. 

1195 """ 

1196 return create_span(name, attributes) 

1197 

1198 

1199# Initialize on module import 

1200_TRACER = init_telemetry()