Coverage for mcpgateway / observability.py: 96%
559 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/observability.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Vendor-agnostic OpenTelemetry instrumentation for ContextForge.
8Supports any OTLP-compatible backend (Jaeger, Zipkin, Tempo, Phoenix, etc.).
9"""
11# Standard
12import base64
13from contextlib import nullcontext
14from importlib import import_module as _im
15import logging
16import os
17from typing import Any, Callable, cast, Dict, Mapping, Optional
18from urllib.parse import urlparse
20# Third-Party - Try to import OpenTelemetry core components - make them truly optional
21OTEL_AVAILABLE = False
22try:
23 # Third-Party
24 from opentelemetry import trace
25 from opentelemetry.propagate import extract as otel_extract
26 from opentelemetry.propagate import inject as otel_inject
27 from opentelemetry.sdk.resources import Resource
28 from opentelemetry.sdk.trace import TracerProvider
29 from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor
30 from opentelemetry.trace import SpanKind, Status, StatusCode
32 OTEL_AVAILABLE = True
33except ImportError:
34 # OpenTelemetry not installed - set to None for graceful degradation
35 trace = None
36 otel_extract = None
37 otel_inject = None
39 class _SpanKindShim:
40 """Minimal SpanKind shim used when OpenTelemetry isn't installed."""
42 SERVER = "server"
44 # Provide a lightweight shim so tests can patch Resource.create
45 class _ResourceShim:
46 """Minimal Resource shim used when OpenTelemetry SDK isn't installed.
48 Exposes a static ``create`` method that simply returns the provided
49 attributes mapping, enabling tests to patch and inspect the inputs
50 without requiring the real OpenTelemetry classes.
51 """
53 @staticmethod
54 def create(attrs: Dict[str, Any]) -> Dict[str, Any]: # type: ignore[override]
55 """Return attributes unchanged to mimic ``Resource.create``.
57 Args:
58 attrs: Resource attribute dictionary.
60 Returns:
61 Dict[str, Any]: The same mapping passed in.
62 """
63 return attrs
65 Resource = cast(Any, _ResourceShim)
66 TracerProvider = None
67 BatchSpanProcessor = None
68 ConsoleSpanExporter = None
69 SimpleSpanProcessor = None
70 SpanKind = cast(Any, _SpanKindShim)
71 Status = None
72 StatusCode = None
74 # Provide minimal module shims so tests can patch ConsoleSpanExporter path
75 try:
76 # Standard
77 import sys
78 import types
80 if ("pytest" in sys.modules) or (os.getenv("MCP_TESTING") == "1"):
81 otel_root = types.ModuleType("opentelemetry")
82 otel_sdk = types.ModuleType("opentelemetry.sdk")
83 otel_trace = types.ModuleType("opentelemetry.sdk.trace")
84 otel_export = types.ModuleType("opentelemetry.sdk.trace.export")
86 class _ConsoleSpanExporterStub: # pragma: no cover - test patch replaces this
87 """Lightweight stub for ConsoleSpanExporter used in tests.
89 Provides a placeholder class so unit tests can patch
90 `opentelemetry.sdk.trace.export.ConsoleSpanExporter` even when
91 the OpenTelemetry SDK is not installed in the environment.
92 """
94 setattr(otel_export, "ConsoleSpanExporter", _ConsoleSpanExporterStub)
95 setattr(otel_trace, "export", otel_export)
96 setattr(otel_sdk, "trace", otel_trace)
97 setattr(otel_root, "sdk", otel_sdk)
99 # Only register the exact chain needed by tests
100 sys.modules.setdefault("opentelemetry", otel_root)
101 sys.modules.setdefault("opentelemetry.sdk", otel_sdk)
102 sys.modules.setdefault("opentelemetry.sdk.trace", otel_trace)
103 sys.modules.setdefault("opentelemetry.sdk.trace.export", otel_export)
104 except Exception as exc: # nosec B110 - best-effort optional shim
105 # Shimming is a non-critical, best-effort step for tests; log and continue.
106 logging.getLogger(__name__).debug("Skipping OpenTelemetry shim setup: %s", exc)
108# First-Party
109from mcpgateway.config import get_settings # noqa: E402 # pylint: disable=wrong-import-position
110from mcpgateway.utils.correlation_id import get_correlation_id # noqa: E402 # pylint: disable=wrong-import-position
111from mcpgateway.utils.log_sanitizer import sanitize_for_log # noqa: E402 # pylint: disable=wrong-import-position
112from mcpgateway.utils.trace_context import ( # noqa: E402 # pylint: disable=wrong-import-position
113 get_trace_auth_method,
114 get_trace_session_id,
115 get_trace_team_name,
116 get_trace_team_scope,
117 get_trace_user_email,
118 get_trace_user_is_admin,
119 primary_team_from_scope,
120)
121from mcpgateway.utils.trace_redaction import sanitize_trace_attribute_value, sanitize_trace_text # noqa: E402 # pylint: disable=wrong-import-position
123# Try to import optional exporters
124try:
125 OTLP_SPAN_EXPORTER = getattr(_im("opentelemetry.exporter.otlp.proto.grpc.trace_exporter"), "OTLPSpanExporter")
126except Exception:
127 try:
128 OTLP_SPAN_EXPORTER = getattr(_im("opentelemetry.exporter.otlp.proto.http.trace_exporter"), "OTLPSpanExporter")
129 except Exception:
130 OTLP_SPAN_EXPORTER = None
132try:
133 JAEGER_EXPORTER = getattr(_im("opentelemetry.exporter.jaeger.thrift"), "JaegerExporter")
134except Exception:
135 JAEGER_EXPORTER = None
137try:
138 ZIPKIN_EXPORTER = getattr(_im("opentelemetry.exporter.zipkin.json"), "ZipkinExporter")
139except Exception:
140 ZIPKIN_EXPORTER = None
142try:
143 HTTP_EXPORTER = getattr(_im("opentelemetry.exporter.otlp.proto.http.trace_exporter"), "OTLPSpanExporter")
144except Exception:
145 HTTP_EXPORTER = None
147logger = logging.getLogger(__name__)
149_LANGFUSE_OTEL_PATH_FRAGMENT = "/api/public/otel"
150_MAX_SPAN_EXCEPTION_MESSAGE_LENGTH = 1024
151_IDENTITY_ATTRIBUTE_KEYS = frozenset({"user.email", "user.is_admin", "team.scope", "team.name", "langfuse.user.id"})
154# Global tracer instance - using UPPER_CASE for module-level constant
155# pylint: disable=invalid-name
156_TRACER = None
159def _sanitize_span_exception_message(exc_val: Optional[BaseException]) -> str:
160 """Return a sanitized, bounded exception message for span attributes.
162 Args:
163 exc_val: Exception instance captured by the span lifecycle.
165 Returns:
166 Sanitized exception text safe to attach to OTEL and Langfuse attributes.
167 """
168 if exc_val is None:
169 return ""
171 sanitized = sanitize_trace_text(str(exc_val))
172 sanitized = sanitize_for_log(sanitized).strip()
173 if not sanitized:
174 sanitized = exc_val.__class__.__name__
176 if len(sanitized) <= _MAX_SPAN_EXCEPTION_MESSAGE_LENGTH:
177 return sanitized
179 truncated_length = _MAX_SPAN_EXCEPTION_MESSAGE_LENGTH - 3
180 return f"{sanitized[:truncated_length]}..."
183def _get_deployment_environment() -> str:
184 """Return the current deployment environment label.
186 Returns:
187 Deployment environment label derived from configuration.
188 """
189 return get_settings().deployment_env
192def _is_langfuse_otlp_endpoint(endpoint: Optional[str]) -> bool:
193 """Return whether the OTLP endpoint points at a Langfuse ingestion path.
195 Args:
196 endpoint: OTLP endpoint URL to inspect.
198 Returns:
199 ``True`` when the endpoint path matches Langfuse's public OTLP ingestion path.
200 """
201 if not endpoint:
202 return False
204 try:
205 return _LANGFUSE_OTEL_PATH_FRAGMENT in urlparse(endpoint).path
206 except Exception:
207 return _LANGFUSE_OTEL_PATH_FRAGMENT in endpoint
210def _resolve_langfuse_basic_auth() -> str:
211 """Resolve Langfuse OTLP basic auth from explicit auth or project keys.
213 Returns:
214 Base64-encoded Langfuse basic-auth credential string, or an empty string when Langfuse auth is not configured.
215 """
216 cfg = get_settings()
217 explicit_auth = cfg.langfuse_otel_auth.get_secret_value().strip() if cfg.langfuse_otel_auth else ""
218 if explicit_auth:
219 return explicit_auth
221 public_key = cfg.langfuse_public_key.get_secret_value().strip() if cfg.langfuse_public_key else ""
222 secret_key = cfg.langfuse_secret_key.get_secret_value().strip() if cfg.langfuse_secret_key else ""
223 if not public_key or not secret_key:
224 return ""
226 return base64.b64encode(f"{public_key}:{secret_key}".encode("utf-8")).decode("ascii")
229def _resolve_otlp_endpoint() -> Optional[str]:
230 """Resolve the OTLP endpoint from generic or Langfuse-specific configuration.
232 Returns:
233 Configured OTLP endpoint, preferring the Langfuse-specific override when present.
234 """
235 cfg = get_settings()
236 return cfg.langfuse_otel_endpoint or cfg.otel_exporter_otlp_endpoint
239def _parse_otlp_headers(headers: Optional[str]) -> Dict[str, str]:
240 """Parse OTLP headers from a comma-separated key=value string.
242 Args:
243 headers: Raw header string from configuration.
245 Returns:
246 Parsed OTLP headers mapping.
247 """
248 parsed: Dict[str, str] = {}
249 if not headers:
250 return parsed
252 for header in headers.split(","):
253 if "=" not in header:
254 continue
255 key, value = header.split("=", 1)
256 key = key.strip()
257 if key:
258 parsed[key] = value.strip()
259 return parsed
262def _get_header_case_insensitive(headers: Dict[str, str], name: str) -> Optional[str]:
263 """Return a header value using case-insensitive name matching.
265 Args:
266 headers: Header mapping to inspect.
267 name: Header name to resolve.
269 Returns:
270 Matching header value, or ``None`` when not present.
271 """
272 normalized_name = name.lower()
273 for key, value in headers.items():
274 if key.lower() == normalized_name:
275 return value
276 return None
279def _set_header_case_insensitive(headers: Dict[str, str], name: str, value: str) -> None:
280 """Set a header value while preserving an existing key's original casing when present.
282 Args:
283 headers: Header mapping to mutate.
284 name: Header name to set.
285 value: Header value to store.
286 """
287 normalized_name = name.lower()
288 for key in list(headers.keys()):
289 if key.lower() == normalized_name:
290 headers[key] = value
291 return
292 headers[name] = value
295def _resolve_otlp_headers(endpoint: Optional[str]) -> Dict[str, str]:
296 """Resolve OTLP headers, deriving Langfuse basic auth when possible.
298 Args:
299 endpoint: Resolved OTLP endpoint URL.
301 Returns:
302 OTLP header mapping suitable for exporter configuration.
303 """
304 cfg = get_settings()
305 headers = _parse_otlp_headers(cfg.otel_exporter_otlp_headers)
307 if not _is_langfuse_otlp_endpoint(endpoint):
308 return headers
310 basic_auth = _resolve_langfuse_basic_auth()
311 if basic_auth and not _get_header_case_insensitive(headers, "Authorization"):
312 _set_header_case_insensitive(headers, "Authorization", f"Basic {basic_auth}")
314 return headers
317def _validate_langfuse_configuration(endpoint: Optional[str], headers: Dict[str, str]) -> None:
318 """Fail closed when Langfuse OTLP is configured without usable credentials.
320 Args:
321 endpoint: OTLP endpoint URL configured for export.
322 headers: Authorization header value configured for the OTLP exporter.
324 Raises:
325 RuntimeError: If a Langfuse endpoint is configured without credentials.
326 """
327 if not _is_langfuse_otlp_endpoint(endpoint):
328 return
330 authorization = _get_header_case_insensitive(headers, "Authorization")
331 if authorization:
332 try:
333 scheme, encoded = authorization.strip().split(None, 1)
334 decoded = base64.b64decode(encoded.strip(), validate=True).decode("utf-8")
335 public_key, secret_key = decoded.split(":", 1)
336 if scheme.lower() == "basic" and public_key and secret_key:
337 return
338 except (ValueError, UnicodeDecodeError):
339 pass
341 message = (
342 "Langfuse OTLP endpoint configured without valid Basic Authorization credentials. " + "Set OTEL_EXPORTER_OTLP_HEADERS, LANGFUSE_OTEL_AUTH, " + "or LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY."
343 )
344 raise RuntimeError(message)
347def _should_emit_langfuse_attributes() -> bool:
348 """Return whether Langfuse-specific span attributes should be emitted.
350 Returns:
351 ``True`` when Langfuse-specific span attributes should be attached.
352 """
353 cfg = get_settings()
354 if cfg.otel_emit_langfuse_attributes is not None:
355 return cfg.otel_emit_langfuse_attributes
356 return _is_langfuse_otlp_endpoint(_resolve_otlp_endpoint())
359def _should_capture_identity_attributes() -> bool:
360 """Return whether user/team identity attributes should be emitted on spans.
362 Returns:
363 ``True`` when user/team identity metadata should be attached to spans.
364 """
365 cfg = get_settings()
366 if cfg.otel_capture_identity_attributes is not None:
367 return cfg.otel_capture_identity_attributes
368 return _should_emit_langfuse_attributes()
371def _should_emit_span_attribute(attribute_name: str) -> bool:
372 """Return whether a span attribute should be emitted under current policy.
374 Args:
375 attribute_name: Span attribute key.
377 Returns:
378 ``True`` when the attribute passes the current export policy.
379 """
380 if attribute_name.startswith("langfuse.") and not _should_emit_langfuse_attributes():
381 return False
382 if attribute_name in _IDENTITY_ATTRIBUTE_KEYS and not _should_capture_identity_attributes():
383 return False
384 return True
387def set_span_attribute(span: Any, attribute_name: str, value: Any) -> None:
388 """Set a span attribute after applying export and sanitization policy.
390 Args:
391 span: Active span object.
392 attribute_name: Span attribute key.
393 value: Attribute value to set.
394 """
395 if not span or value is None:
396 return
397 if not _should_emit_span_attribute(attribute_name):
398 return
399 span.set_attribute(attribute_name, sanitize_trace_attribute_value(attribute_name, value))
402def _set_pre_sanitized_span_attribute(span: Any, attribute_name: str, value: Any) -> None:
403 """Set a span attribute that has already been sanitized and bounded.
405 Args:
406 span: Active span object.
407 attribute_name: Span attribute key.
408 value: Pre-sanitized attribute value to set.
409 """
410 if not span or value is None:
411 return
412 if not _should_emit_span_attribute(attribute_name):
413 return
414 span.set_attribute(attribute_name, value)
417def _record_sanitized_exception_event(span: Any, exc_type: Optional[type], error_message: str) -> None:
418 """Record a sanitized exception event on a span without exporting raw exception text.
420 Args:
421 span: Active span object.
422 exc_type: Exception class associated with the failure.
423 error_message: Sanitized exception message to attach to the event.
424 """
425 if not span or exc_type is None:
426 return
428 attrs = {
429 "exception.type": exc_type.__name__,
430 "exception.message": error_message,
431 "exception.escaped": True,
432 }
434 if hasattr(span, "add_event"):
435 span.add_event("exception", attributes=attrs)
436 return
438 if hasattr(span, "record_exception"):
439 try:
440 sanitized_exc = exc_type(error_message)
441 except Exception:
442 sanitized_exc = Exception(error_message)
443 span.record_exception(sanitized_exc)
446def set_span_error(
447 span: Any,
448 error: str | BaseException,
449 *,
450 exc_type: Optional[type] = None,
451 record_exception: bool = False,
452) -> None:
453 """Mark a span as failed with a sanitized error message.
455 Args:
456 span: Active span object.
457 error: Exception instance or message text describing the failure.
458 exc_type: Optional explicit exception type for the failure.
459 record_exception: Whether to add a sanitized exception event to the span.
460 """
461 if not span:
462 return
464 if isinstance(error, BaseException):
465 error_message = _sanitize_span_exception_message(error)
466 resolved_exc_type = exc_type or type(error)
467 else:
468 error_message = sanitize_for_log(sanitize_trace_text(str(error))).strip() or "Error"
469 resolved_exc_type = exc_type
471 if record_exception:
472 _record_sanitized_exception_event(span, resolved_exc_type, error_message)
474 if OTEL_AVAILABLE and Status and StatusCode:
475 span.set_status(Status(StatusCode.ERROR, error_message))
477 set_span_attribute(span, "error", True)
478 if resolved_exc_type is not None:
479 set_span_attribute(span, "error.type", resolved_exc_type.__name__)
480 _set_pre_sanitized_span_attribute(span, "error.message", error_message)
481 set_span_attribute(span, "langfuse.observation.level", "ERROR")
482 _set_pre_sanitized_span_attribute(span, "langfuse.observation.status_message", error_message)
485def _derive_langfuse_trace_name(name: str, attributes: Dict[str, Any]) -> str:
486 """Derive a human-readable Langfuse trace name from span context.
488 Args:
489 name: Raw span name.
490 attributes: Span attributes used to derive a more readable Langfuse trace label.
492 Returns:
493 Human-readable trace name suitable for Langfuse dashboards.
494 """
496 def _display_value(value: Any) -> Any:
497 """Sanitize string display values before promoting them into a trace name.
499 Args:
500 value: Candidate trace-name fragment.
502 Returns:
503 Sanitized string values, or the original non-string value unchanged.
504 """
505 if isinstance(value, str):
506 return sanitize_trace_text(value)
507 return value
509 if name == "tool.invoke" and attributes.get("tool.name"):
510 return f"Tool: {_display_value(attributes['tool.name'])}"
511 if name == "tool.list":
512 return "Tools"
513 if name == "prompt.render" and attributes.get("prompt.id"):
514 return f"Prompt: {_display_value(attributes['prompt.id'])}"
515 if name == "prompt.list":
516 return "Prompts"
517 if name == "resource.read" and attributes.get("resource.uri"):
518 return f"Resource: {_display_value(attributes['resource.uri'])}"
519 if name == "resource.list":
520 return "Resources"
521 if name == "resource_template.list":
522 return "Resource Templates"
523 if name == "root.list":
524 return "Roots"
525 if name in {"llm.proxy", "llm.chat"} and attributes.get("gen_ai.request.model"):
526 prefix = "LLM Proxy" if name == "llm.proxy" else "LLM Chat"
527 return f"{prefix}: {_display_value(attributes['gen_ai.request.model'])}"
528 if name.startswith("a2a.") and attributes.get("a2a.agent.name"):
529 return f"A2A: {_display_value(attributes['a2a.agent.name'])}"
530 return name
533def otel_tracing_enabled() -> bool:
534 """Return whether OpenTelemetry tracing is active in this process.
536 Returns:
537 bool: ``True`` when a tracer has been initialised, otherwise ``False``.
538 """
540 return _TRACER is not None
543def otel_context_active() -> bool:
544 """Return whether the current async context carries an active OTEL span.
546 Returns:
547 bool: ``True`` when the current context has a valid OTEL span.
548 """
550 if not OTEL_AVAILABLE or trace is None:
551 return False
552 try:
553 current_span = trace.get_current_span()
554 if current_span is None:
555 return False
556 span_context = current_span.get_span_context()
557 return bool(getattr(span_context, "is_valid", False))
558 except Exception:
559 return False
562def inject_trace_context_headers(headers: Optional[Mapping[str, str]] = None) -> Dict[str, str]:
563 """Return a header carrier populated with the active W3C trace context.
565 Args:
566 headers: Existing outbound headers to copy into the carrier before trace injection.
568 Returns:
569 Dict[str, str]: Header mapping including any injected trace context.
570 """
572 carrier = {str(key): str(value) for key, value in (headers or {}).items() if key and value}
573 if not otel_context_active() or otel_inject is None:
574 return carrier
575 try:
576 otel_inject(carrier=carrier)
577 except Exception as exc:
578 logger.debug("Failed to inject W3C trace context into outbound headers: %s", exc)
579 return carrier
582def _scope_headers_to_carrier(scope_headers: list[tuple[bytes, bytes]]) -> Dict[str, str]:
583 """Convert ASGI scope headers to a text carrier for propagation/extraction.
585 Args:
586 scope_headers: Raw ASGI header tuples from the request scope.
588 Returns:
589 Dict[str, str]: Decoded lower-cased header carrier suitable for OTEL propagation.
590 """
592 carrier: Dict[str, str] = {}
593 for key, value in scope_headers:
594 try:
595 decoded_key = key.decode("latin-1").lower()
596 decoded_value = value.decode("latin-1")
597 except (AttributeError, TypeError, UnicodeDecodeError):
598 continue
599 carrier[decoded_key] = decoded_value
600 return carrier
603def _should_trace_request_path(path: str) -> bool:
604 """Return whether Phase 1 OTEL request tracing should instrument the path.
606 Args:
607 path: Incoming request path.
609 Returns:
610 bool: ``True`` when the path should be wrapped in a request span.
611 """
613 normalized = path.rstrip("/") or "/"
614 if normalized in {"/rpc", "/mcp", "/mcp/sse", "/mcp/message", "/message", "/sse"}:
615 return True
616 if normalized.startswith("/servers/") and (normalized.endswith("/mcp") or normalized.endswith("/message") or normalized.endswith("/sse")):
617 return True
618 if normalized.startswith("/_internal/mcp/"):
619 return True
620 return False
623class OpenTelemetryRequestMiddleware:
624 """Raw ASGI middleware that creates request-root spans for gateway transport flows."""
626 def __init__(self, app: Any, should_trace_request_path: Optional[Callable[[str], bool]] = None):
627 """Initialize the middleware wrapper.
629 Args:
630 app: Wrapped ASGI application.
631 should_trace_request_path: Optional predicate that decides whether a request path
632 should be instrumented.
633 """
634 self.app = app
635 self.should_trace_request_path = should_trace_request_path or _should_trace_request_path
637 def __getattr__(self, name: str) -> Any:
638 """Proxy attribute access to the wrapped ASGI app.
640 This preserves access to app-specific attributes such as ``routes`` when
641 the middleware is used as the top-level object passed to uvicorn.
643 Args:
644 name: Attribute name to resolve on the wrapped ASGI app.
646 Returns:
647 Any: The proxied attribute value from ``self.app``.
648 """
650 return getattr(self.app, name)
652 async def __call__(self, scope: Mapping[str, Any], receive: Any, send: Any) -> None:
653 """Handle an ASGI request and create a request-root span when tracing applies.
655 Args:
656 scope: ASGI connection scope.
657 receive: ASGI receive callable.
658 send: ASGI send callable.
660 Raises:
661 Exception: Any exception raised by the wrapped ASGI application.
662 """
663 if scope.get("type") != "http" or _TRACER is None:
664 await self.app(scope, receive, send)
665 return
667 path = str(scope.get("path", "") or "")
668 if not self.should_trace_request_path(path):
669 await self.app(scope, receive, send)
670 return
672 method = str(scope.get("method", "GET") or "GET").upper()
673 scope_headers = list(scope.get("headers", []) or [])
674 carrier = _scope_headers_to_carrier(scope_headers)
676 parent_context = None
677 if otel_extract is not None:
678 try:
679 parent_context = otel_extract(carrier=carrier)
680 except Exception as exc:
681 logger.debug("Failed to extract W3C trace context for %s %s: %s", method, path, exc)
683 server = scope.get("server") or ("", None)
684 client = scope.get("client") or ("", None)
685 query_string = scope.get("query_string", b"") or b""
686 if isinstance(query_string, bytes):
687 query_text = query_string.decode("latin-1")
688 else:
689 query_text = str(query_string)
691 # Sanitize query string to prevent leaking session_id and other sensitive parameters
692 sanitized_query = sanitize_trace_text(query_text) if query_text else None
694 span_name = f"{method} {path or '/'}"
695 span_attributes: Dict[str, Any] = {
696 "http.request.method": method,
697 "http.route": path or "/",
698 "url.path": path or "/",
699 "url.query": sanitized_query,
700 "network.protocol.version": scope.get("http_version"),
701 "server.address": server[0] if len(server) > 0 else None,
702 "server.port": server[1] if len(server) > 1 else None,
703 "client.address": client[0] if len(client) > 0 else None,
704 "client.port": client[1] if len(client) > 1 else None,
705 "user_agent.original": carrier.get("user-agent"),
706 "correlation_id": carrier.get("x-correlation-id"),
707 }
709 start_span_kwargs: Dict[str, Any] = {}
710 if parent_context is not None:
711 start_span_kwargs["context"] = parent_context
712 if SpanKind is not None:
713 start_span_kwargs["kind"] = SpanKind.SERVER
715 status_code_holder: Dict[str, int] = {}
717 async def _send_with_span_status(message: Mapping[str, Any]) -> None:
718 if message.get("type") == "http.response.start":
719 status_code = int(message.get("status", 0) or 0)
720 status_code_holder["status"] = status_code
721 if span is not None and status_code:
722 set_span_attribute(span, "http.response.status_code", status_code)
723 if OTEL_AVAILABLE and Status and StatusCode:
724 if status_code >= 500:
725 span.set_status(Status(StatusCode.ERROR))
726 else:
727 span.set_status(Status(StatusCode.OK))
728 await send(message)
730 with _TRACER.start_as_current_span(span_name, **start_span_kwargs) as span:
731 if span is not None:
732 for key, value in span_attributes.items():
733 if value is not None:
734 set_span_attribute(span, key, value)
736 try:
737 await self.app(scope, receive, _send_with_span_status)
738 if span is not None and "status" not in status_code_holder and OTEL_AVAILABLE and Status and StatusCode:
739 span.set_status(Status(StatusCode.OK))
740 except Exception as exc:
741 if span is not None:
742 error_message = _sanitize_span_exception_message(exc)
743 set_span_error(span, exc, record_exception=True)
744 if OTEL_AVAILABLE and Status and StatusCode:
745 span.set_status(Status(StatusCode.ERROR, error_message))
746 raise
749def init_telemetry() -> Optional[Any]:
750 """Initialize OpenTelemetry with configurable backend.
752 Supports multiple backends via environment variables:
753 - OTEL_TRACES_EXPORTER: Exporter type (otlp, jaeger, zipkin, console, none)
754 - OTEL_EXPORTER_OTLP_ENDPOINT: OTLP endpoint (for otlp exporter)
755 - OTEL_EXPORTER_JAEGER_ENDPOINT: Jaeger endpoint (for jaeger exporter)
756 - OTEL_EXPORTER_ZIPKIN_ENDPOINT: Zipkin endpoint (for zipkin exporter)
757 - OTEL_ENABLE_OBSERVABILITY: Set to 'true' to enable (disabled by default)
759 Returns:
760 The initialized tracer instance or None if disabled.
761 """
762 # pylint: disable=global-statement
763 global _TRACER
764 cfg = get_settings()
766 # Check if observability is disabled (default: disabled)
767 if not cfg.otel_enable_observability:
768 logger.info("Observability disabled via OTEL_ENABLE_OBSERVABILITY=false")
769 return None
771 # If OpenTelemetry isn't installed, return early with graceful degradation
772 if not OTEL_AVAILABLE:
773 logger.warning("OpenTelemetry not installed - telemetry disabled")
774 logger.info("To enable telemetry, install: pip install mcp-contextforge-gateway[observability]")
775 return None
777 # Get exporter type from environment
778 exporter_type = cfg.otel_traces_exporter.lower()
780 # Handle 'none' exporter (tracing disabled)
781 if exporter_type == "none":
782 logger.info("Tracing disabled via OTEL_TRACES_EXPORTER=none")
783 return None
785 # Check if endpoint is configured for otlp
786 if exporter_type == "otlp":
787 endpoint = _resolve_otlp_endpoint()
788 if not endpoint:
789 logger.info("OTLP endpoint not configured, skipping telemetry init")
790 return None
791 _validate_langfuse_configuration(endpoint, _resolve_otlp_headers(endpoint))
793 try:
794 # Create resource attributes
795 resource_attributes: Dict[str, Any] = {
796 "service.name": cfg.otel_service_name,
797 "service.version": "1.0.0-RC-2",
798 "deployment.environment": _get_deployment_environment(),
799 }
801 # Add custom resource attributes from environment
802 custom_attrs = cfg.otel_resource_attributes or ""
803 if custom_attrs:
804 for attr in custom_attrs.split(","):
805 if "=" in attr:
806 key, value = attr.split("=", 1)
807 resource_attributes[key.strip()] = value.strip()
809 # Narrow types for mypy/pyrefly
810 # Create resource if available, else skip
811 resource: Optional[Any]
812 if Resource is not None and hasattr(Resource, "create"):
813 resource = cast(Any, Resource).create(resource_attributes)
814 else:
815 resource = None
817 # Set up tracer provider with optional sampling
818 # Initialize tracer provider (with resource if available)
819 if resource is not None:
820 provider = cast(Any, TracerProvider)(resource=resource)
821 else:
822 provider = cast(Any, TracerProvider)()
824 # Register provider if trace API is present
825 if trace is not None and hasattr(trace, "set_tracer_provider"):
826 cast(Any, trace).set_tracer_provider(provider)
828 # Create a custom span processor to copy resource attributes to span attributes
829 # This is needed because Arize requires arize.project.name as a span attribute
830 class ResourceAttributeSpanProcessor:
831 """Span processor that copies specific resource attributes to span attributes."""
833 def __init__(self, attributes_to_copy=None):
834 self.attributes_to_copy = attributes_to_copy or ["arize.project.name", "model_id"]
835 logger.info(f"ResourceAttributeSpanProcessor will copy: {self.attributes_to_copy}")
837 def on_start(self, span, _parent_context=None):
838 """Copy specified resource attributes to span attributes when span starts.
840 Args:
841 span: The span being started.
842 _parent_context: The parent context (unused, required by interface).
843 """
844 if not hasattr(span, "resource") or span.resource is None:
845 return
847 # Get resource attributes
848 resource_attributes = getattr(span.resource, "attributes", {})
850 # Copy specified attributes from resource to span
851 for attr in self.attributes_to_copy:
852 if attr in resource_attributes:
853 value = resource_attributes[attr]
854 span.set_attribute(attr, value)
855 logger.debug(f"Copied resource attribute to span: {attr}={value}")
857 def on_end(self, span):
858 """Handle span end event.
860 Required by the SpanProcessor interface but not used.
862 Args:
863 span: The span being ended.
864 """
865 pass # pylint: disable=unnecessary-pass
867 # Add the custom span processor to copy resource attributes to spans
868 # This is needed for Arize which requires certain attributes as span attributes
869 # Enable via OTEL_COPY_RESOURCE_ATTRS_TO_SPANS=true (disabled by default)
870 copy_resource_attrs = cfg.otel_copy_resource_attrs_to_spans
871 if resource is not None and copy_resource_attrs:
872 logger.info("Adding ResourceAttributeSpanProcessor to copy resource attributes to spans")
873 provider.add_span_processor(ResourceAttributeSpanProcessor())
875 # Configure the appropriate exporter based on type
876 exporter: Optional[Any] = None
878 if exporter_type == "otlp":
879 endpoint = _resolve_otlp_endpoint()
880 protocol = cfg.otel_exporter_otlp_protocol.lower()
881 header_dict = _resolve_otlp_headers(endpoint)
882 if _is_langfuse_otlp_endpoint(endpoint):
883 protocol = "http"
884 # Note: some versions of OTLP exporters may not accept 'insecure' kwarg; avoid passing it.
885 # Use endpoint scheme or env to control TLS externally.
887 if protocol == "grpc" and OTLP_SPAN_EXPORTER:
888 exporter = cast(Any, OTLP_SPAN_EXPORTER)(endpoint=endpoint, headers=header_dict or None)
889 elif HTTP_EXPORTER:
890 # Use HTTP exporter as fallback
891 ep = str(endpoint) if endpoint is not None else ""
892 http_ep = (ep.replace(":4317", ":4318") + "/v1/traces") if ":4317" in ep else ep
893 exporter = cast(Any, HTTP_EXPORTER)(endpoint=http_ep, headers=header_dict or None)
894 else:
895 logger.error("No OTLP exporter available")
896 return None
898 elif exporter_type == "jaeger":
899 if JAEGER_EXPORTER:
900 endpoint = cfg.otel_exporter_jaeger_endpoint or "http://localhost:14268/api/traces"
901 exporter = JAEGER_EXPORTER(
902 collector_endpoint=endpoint,
903 username=cfg.otel_exporter_jaeger_user,
904 password=cfg.otel_exporter_jaeger_password.get_secret_value() if cfg.otel_exporter_jaeger_password else None,
905 )
906 else:
907 logger.error("Jaeger exporter not available. Install with: pip install opentelemetry-exporter-jaeger")
908 return None
910 elif exporter_type == "zipkin":
911 if ZIPKIN_EXPORTER:
912 endpoint = cfg.otel_exporter_zipkin_endpoint or "http://localhost:9411/api/v2/spans"
913 exporter = ZIPKIN_EXPORTER(endpoint=endpoint)
914 else:
915 logger.error("Zipkin exporter not available. Install with: pip install opentelemetry-exporter-zipkin")
916 return None
918 elif exporter_type == "console":
919 # Console exporter for debugging
920 exporter = cast(Any, ConsoleSpanExporter)()
922 else:
923 logger.warning(f"Unknown exporter type: {exporter_type}. Using console exporter.")
924 exporter = cast(Any, ConsoleSpanExporter)()
926 if exporter:
927 # Add batch processor for better performance (except for console)
928 if exporter_type == "console":
929 span_processor = cast(Any, SimpleSpanProcessor)(exporter)
930 else:
931 span_processor = cast(Any, BatchSpanProcessor)(
932 exporter,
933 max_queue_size=cfg.otel_bsp_max_queue_size,
934 max_export_batch_size=cfg.otel_bsp_max_export_batch_size,
935 schedule_delay_millis=cfg.otel_bsp_schedule_delay,
936 )
937 provider.add_span_processor(span_processor)
939 # Get tracer
940 # Obtain a tracer if trace API available; otherwise create a no-op tracer
941 if trace is not None and hasattr(trace, "get_tracer"):
942 _TRACER = cast(Any, trace).get_tracer("mcp-gateway", "1.0.0-RC-2", schema_url="https://opentelemetry.io/schemas/1.11.0")
943 else:
945 class _NoopTracer:
946 """No-op tracer used when OpenTelemetry API isn't available."""
948 def start_as_current_span(self, _name: str): # type: ignore[override]
949 """Return a no-op context manager for span creation.
951 Args:
952 _name: Span name (ignored in no-op implementation).
954 Returns:
955 contextlib.AbstractContextManager: A null context.
956 """
957 return nullcontext()
959 _TRACER = _NoopTracer()
961 logger.info(f"✅ OpenTelemetry initialized with {exporter_type} exporter")
962 if exporter_type == "otlp":
963 logger.info(f" Endpoint: {_resolve_otlp_endpoint()}")
964 elif exporter_type == "jaeger":
965 logger.info(f" Endpoint: {cfg.otel_exporter_jaeger_endpoint or 'default'}")
966 elif exporter_type == "zipkin":
967 logger.info(f" Endpoint: {cfg.otel_exporter_zipkin_endpoint or 'default'}")
969 return _TRACER
971 except Exception as e:
972 logger.error(f"Failed to initialize OpenTelemetry: {e}")
973 return None
976def trace_operation(operation_name: str, attributes: Optional[Dict[str, Any]] = None) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
977 """
978 Simple decorator to trace any operation.
980 Args:
981 operation_name: Name of the operation to trace (e.g., "tool.invoke").
982 attributes: Optional dictionary of attributes to add to the span.
984 Returns:
985 Decorator function that wraps the target function with tracing.
987 Usage:
988 @trace_operation("tool.invoke", {"tool.name": "calculator"})
989 async def invoke_tool():
990 ...
991 """
993 def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
994 """Decorator that wraps the function with tracing.
996 Args:
997 func: The async function to wrap with tracing.
999 Returns:
1000 The wrapped function with tracing capabilities.
1001 """
1003 async def wrapper(*args: Any, **kwargs: Any) -> Any:
1004 """Async wrapper that adds tracing to the decorated function.
1006 Args:
1007 *args: Positional arguments passed to the wrapped function.
1008 **kwargs: Keyword arguments passed to the wrapped function.
1010 Returns:
1011 The result of the wrapped function.
1013 Raises:
1014 Exception: Any exception raised by the wrapped function.
1015 """
1016 if not _TRACER:
1017 # No tracing configured, just run the function
1018 return await func(*args, **kwargs)
1020 # Create span for this operation
1021 with _TRACER.start_as_current_span(operation_name) as span:
1022 # Add attributes if provided
1023 if attributes:
1024 for key, value in attributes.items():
1025 set_span_attribute(span, key, value)
1027 try:
1028 # Run the actual function
1029 result = await func(*args, **kwargs)
1030 set_span_attribute(span, "status", "success")
1031 return result
1032 except Exception as e:
1033 set_span_attribute(span, "status", "error")
1034 set_span_error(span, e, record_exception=True)
1035 raise
1037 return wrapper
1039 return decorator
1042def create_span(name: str, attributes: Optional[Dict[str, Any]] = None) -> Any:
1043 """
1044 Create a span for manual instrumentation.
1046 Args:
1047 name: Name of the span to create (e.g., "database.query").
1048 attributes: Optional dictionary of attributes to add to the span.
1050 Returns:
1051 Context manager that creates and manages the span lifecycle.
1053 Usage:
1054 with create_span("database.query", {"db.statement": "SELECT * FROM tools"}):
1055 # Your code here
1056 pass
1057 """
1058 if not _TRACER:
1059 # Return a no-op context manager if tracing is not configured or available
1060 return nullcontext()
1062 attributes = dict(attributes or {})
1064 # Auto-inject correlation ID into all spans for request tracing
1065 try:
1066 correlation_id = get_correlation_id()
1067 if correlation_id:
1068 attributes.setdefault("correlation_id", correlation_id)
1069 attributes.setdefault("request_id", correlation_id)
1070 except Exception as exc:
1071 # Correlation ID not available or error getting it, continue without it
1072 logger.debug("Failed to add correlation_id to span: %s", exc)
1074 try:
1075 user_email = get_trace_user_email()
1076 user_is_admin = get_trace_user_is_admin()
1077 team_scope = get_trace_team_scope()
1078 team_name = get_trace_team_name()
1079 auth_method = get_trace_auth_method()
1080 session_id = get_trace_session_id()
1081 environment = _get_deployment_environment()
1083 if _should_capture_identity_attributes():
1084 if user_email:
1085 attributes.setdefault("user.email", user_email)
1086 if user_email or user_is_admin:
1087 attributes.setdefault("user.is_admin", user_is_admin)
1088 if team_scope:
1089 attributes.setdefault("team.scope", team_scope)
1090 if team_name:
1091 attributes.setdefault("team.name", team_name)
1093 if auth_method:
1094 attributes.setdefault("auth.method", auth_method)
1096 if _should_emit_langfuse_attributes():
1097 if _should_capture_identity_attributes() and user_email:
1098 attributes.setdefault("langfuse.user.id", user_email)
1099 if session_id:
1100 attributes.setdefault("langfuse.session.id", session_id)
1101 attributes.setdefault("langfuse.environment", environment)
1103 tags: list[str] = []
1104 primary_team = primary_team_from_scope(team_scope)
1105 if _should_capture_identity_attributes() and primary_team:
1106 tags.append(f"team:{primary_team}")
1107 if auth_method:
1108 tags.append(f"auth:{auth_method}")
1109 if environment:
1110 tags.append(f"env:{environment}")
1111 if tags:
1112 attributes.setdefault("langfuse.trace.tags", tags)
1114 trace_name_attributes = {key: sanitize_trace_attribute_value(key, value) for key, value in attributes.items() if _should_emit_span_attribute(key)}
1115 attributes.setdefault("langfuse.trace.name", _derive_langfuse_trace_name(name, trace_name_attributes))
1116 attributes.setdefault("langfuse.observation.level", "DEFAULT")
1117 except Exception as exc:
1118 logger.debug("Failed to auto-inject trace context into span: %s", exc)
1120 # Start span and return the context manager
1121 span_context = _TRACER.start_as_current_span(name)
1123 # If we have attributes and the span context is entered, set them
1124 if attributes:
1125 # We need to set attributes after entering the context
1126 # So we'll create a wrapper that sets attributes
1127 class SpanWithAttributes:
1128 """Context manager wrapper that adds attributes to a span.
1130 This class wraps an OpenTelemetry span context and adds attributes
1131 when entering the context. It also handles exception recording when
1132 exiting the context.
1133 """
1135 def __init__(self, span_context: Any, attrs: Dict[str, Any]):
1136 """Initialize the span wrapper.
1138 Args:
1139 span_context: The OpenTelemetry span context to wrap.
1140 attrs: Dictionary of attributes to add to the span.
1141 """
1142 self.span_context: Any = span_context
1143 self.attrs: Dict[str, Any] = attrs
1144 self.span: Any = None
1146 def __enter__(self) -> Any:
1147 """Enter the context and set span attributes.
1149 Returns:
1150 The OpenTelemetry span with attributes set.
1151 """
1152 self.span = self.span_context.__enter__()
1153 if self.attrs and self.span:
1154 for key, value in self.attrs.items():
1155 if value is not None: # Skip None values
1156 set_span_attribute(self.span, key, value)
1157 return self.span
1159 def __exit__(self, exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Any) -> Any:
1160 """Exit the context and record any exceptions.
1162 Args:
1163 exc_type: The exception type if an exception occurred.
1164 exc_val: The exception value if an exception occurred.
1165 exc_tb: The exception traceback if an exception occurred.
1167 Returns:
1168 The result of the wrapped span context's __exit__ method.
1169 """
1170 # Record exception if one occurred
1171 if exc_type is not None and self.span:
1172 set_span_error(self.span, exc_val or exc_type.__name__, exc_type=exc_type, record_exception=True)
1173 elif self.span:
1174 if OTEL_AVAILABLE and Status and StatusCode:
1175 self.span.set_status(Status(StatusCode.OK))
1176 return self.span_context.__exit__(exc_type, exc_val, exc_tb)
1178 return SpanWithAttributes(span_context, attributes)
1180 return span_context
1183def create_child_span(name: str, attributes: Optional[Dict[str, Any]] = None) -> Any:
1184 """Create a nested span using the current trace context.
1186 This is an alias for ``create_span()`` used where child-span intent is part
1187 of the local code structure.
1189 Args:
1190 name: Span name.
1191 attributes: Optional attributes to attach to the created span.
1193 Returns:
1194 Span context manager returned by ``create_span()``.
1195 """
1196 return create_span(name, attributes)
1199# Initialize on module import
1200_TRACER = init_telemetry()