Coverage for mcpgateway / observability.py: 96%
238 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/observability.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Vendor-agnostic OpenTelemetry instrumentation for MCP Gateway.
8Supports any OTLP-compatible backend (Jaeger, Zipkin, Tempo, Phoenix, etc.).
9"""
11# Standard
12from contextlib import nullcontext
13from importlib import import_module as _im
14import logging
15import os
16from typing import Any, Callable, cast, Dict, Optional
18# Third-Party - Try to import OpenTelemetry core components - make them truly optional
19OTEL_AVAILABLE = False
20try:
21 # Third-Party
22 from opentelemetry import trace
23 from opentelemetry.sdk.resources import Resource
24 from opentelemetry.sdk.trace import TracerProvider
25 from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor
26 from opentelemetry.trace import Status, StatusCode
28 OTEL_AVAILABLE = True
29except ImportError:
30 # OpenTelemetry not installed - set to None for graceful degradation
31 trace = None
33 # Provide a lightweight shim so tests can patch Resource.create
34 class _ResourceShim:
35 """Minimal Resource shim used when OpenTelemetry SDK isn't installed.
37 Exposes a static ``create`` method that simply returns the provided
38 attributes mapping, enabling tests to patch and inspect the inputs
39 without requiring the real OpenTelemetry classes.
40 """
42 @staticmethod
43 def create(attrs: Dict[str, Any]) -> Dict[str, Any]: # type: ignore[override]
44 """Return attributes unchanged to mimic ``Resource.create``.
46 Args:
47 attrs: Resource attribute dictionary.
49 Returns:
50 Dict[str, Any]: The same mapping passed in.
51 """
52 return attrs
54 Resource = cast(Any, _ResourceShim)
55 TracerProvider = None
56 BatchSpanProcessor = None
57 ConsoleSpanExporter = None
58 SimpleSpanProcessor = None
59 Status = None
60 StatusCode = None
62 # Provide minimal module shims so tests can patch ConsoleSpanExporter path
63 try:
64 # Standard
65 import sys
66 import types
68 if ("pytest" in sys.modules) or (os.getenv("MCP_TESTING") == "1"): 68 ↛ 97line 68 didn't jump to line 97 because the condition on line 68 was always true
69 otel_root = types.ModuleType("opentelemetry")
70 otel_sdk = types.ModuleType("opentelemetry.sdk")
71 otel_trace = types.ModuleType("opentelemetry.sdk.trace")
72 otel_export = types.ModuleType("opentelemetry.sdk.trace.export")
74 class _ConsoleSpanExporterStub: # pragma: no cover - test patch replaces this
75 """Lightweight stub for ConsoleSpanExporter used in tests.
77 Provides a placeholder class so unit tests can patch
78 `opentelemetry.sdk.trace.export.ConsoleSpanExporter` even when
79 the OpenTelemetry SDK is not installed in the environment.
80 """
82 setattr(otel_export, "ConsoleSpanExporter", _ConsoleSpanExporterStub)
83 setattr(otel_trace, "export", otel_export)
84 setattr(otel_sdk, "trace", otel_trace)
85 setattr(otel_root, "sdk", otel_sdk)
87 # Only register the exact chain needed by tests
88 sys.modules.setdefault("opentelemetry", otel_root)
89 sys.modules.setdefault("opentelemetry.sdk", otel_sdk)
90 sys.modules.setdefault("opentelemetry.sdk.trace", otel_trace)
91 sys.modules.setdefault("opentelemetry.sdk.trace.export", otel_export)
92 except Exception as exc: # nosec B110 - best-effort optional shim
93 # Shimming is a non-critical, best-effort step for tests; log and continue.
94 logging.getLogger(__name__).debug("Skipping OpenTelemetry shim setup: %s", exc)
96# First-Party
97from mcpgateway.utils.correlation_id import get_correlation_id # noqa: E402 # pylint: disable=wrong-import-position
99# Try to import optional exporters
100try:
101 OTLP_SPAN_EXPORTER = getattr(_im("opentelemetry.exporter.otlp.proto.grpc.trace_exporter"), "OTLPSpanExporter")
102except Exception:
103 try:
104 OTLP_SPAN_EXPORTER = getattr(_im("opentelemetry.exporter.otlp.proto.http.trace_exporter"), "OTLPSpanExporter")
105 except Exception:
106 OTLP_SPAN_EXPORTER = None
108try:
109 JAEGER_EXPORTER = getattr(_im("opentelemetry.exporter.jaeger.thrift"), "JaegerExporter")
110except Exception:
111 JAEGER_EXPORTER = None
113try:
114 ZIPKIN_EXPORTER = getattr(_im("opentelemetry.exporter.zipkin.json"), "ZipkinExporter")
115except Exception:
116 ZIPKIN_EXPORTER = None
118try:
119 HTTP_EXPORTER = getattr(_im("opentelemetry.exporter.otlp.proto.http.trace_exporter"), "OTLPSpanExporter")
120except Exception:
121 HTTP_EXPORTER = None
123logger = logging.getLogger(__name__)
126# Global tracer instance - using UPPER_CASE for module-level constant
127# pylint: disable=invalid-name
128_TRACER = None
131def init_telemetry() -> Optional[Any]:
132 """Initialize OpenTelemetry with configurable backend.
134 Supports multiple backends via environment variables:
135 - OTEL_TRACES_EXPORTER: Exporter type (otlp, jaeger, zipkin, console, none)
136 - OTEL_EXPORTER_OTLP_ENDPOINT: OTLP endpoint (for otlp exporter)
137 - OTEL_EXPORTER_JAEGER_ENDPOINT: Jaeger endpoint (for jaeger exporter)
138 - OTEL_EXPORTER_ZIPKIN_ENDPOINT: Zipkin endpoint (for zipkin exporter)
139 - OTEL_ENABLE_OBSERVABILITY: Set to 'true' to enable (disabled by default)
141 Returns:
142 The initialized tracer instance or None if disabled.
143 """
144 # pylint: disable=global-statement
145 global _TRACER
147 # Check if observability is disabled (default: disabled)
148 if os.getenv("OTEL_ENABLE_OBSERVABILITY", "false").lower() == "false":
149 logger.info("Observability disabled via OTEL_ENABLE_OBSERVABILITY=false")
150 return None
152 # If OpenTelemetry isn't installed, return early with graceful degradation
153 if not OTEL_AVAILABLE:
154 logger.warning("OpenTelemetry not installed - telemetry disabled")
155 logger.info("To enable telemetry, install: pip install mcp-contextforge-gateway[observability]")
156 return None
158 # Get exporter type from environment
159 exporter_type = os.getenv("OTEL_TRACES_EXPORTER", "otlp").lower()
161 # Handle 'none' exporter (tracing disabled)
162 if exporter_type == "none":
163 logger.info("Tracing disabled via OTEL_TRACES_EXPORTER=none")
164 return None
166 # Check if endpoint is configured for otlp
167 if exporter_type == "otlp":
168 endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
169 if not endpoint:
170 logger.info("OTLP endpoint not configured, skipping telemetry init")
171 return None
173 try:
174 # Create resource attributes
175 resource_attributes: Dict[str, Any] = {
176 "service.name": os.getenv("OTEL_SERVICE_NAME", "mcp-gateway"),
177 "service.version": "1.0.0-BETA-2",
178 "deployment.environment": os.getenv("DEPLOYMENT_ENV", "development"),
179 }
181 # Add custom resource attributes from environment
182 custom_attrs = os.getenv("OTEL_RESOURCE_ATTRIBUTES", "")
183 if custom_attrs:
184 for attr in custom_attrs.split(","):
185 if "=" in attr: 185 ↛ 184line 185 didn't jump to line 184 because the condition on line 185 was always true
186 key, value = attr.split("=", 1)
187 resource_attributes[key.strip()] = value.strip()
189 # Narrow types for mypy/pyrefly
190 # Create resource if available, else skip
191 resource: Optional[Any]
192 if Resource is not None and hasattr(Resource, "create"):
193 resource = cast(Any, Resource).create(resource_attributes)
194 else:
195 resource = None
197 # Set up tracer provider with optional sampling
198 # Initialize tracer provider (with resource if available)
199 if resource is not None:
200 provider = cast(Any, TracerProvider)(resource=resource)
201 else:
202 provider = cast(Any, TracerProvider)()
204 # Register provider if trace API is present
205 if trace is not None and hasattr(trace, "set_tracer_provider"):
206 cast(Any, trace).set_tracer_provider(provider)
208 # Create a custom span processor to copy resource attributes to span attributes
209 # This is needed because Arize requires arize.project.name as a span attribute
210 class ResourceAttributeSpanProcessor:
211 """Span processor that copies specific resource attributes to span attributes."""
213 def __init__(self, attributes_to_copy=None):
214 self.attributes_to_copy = attributes_to_copy or ["arize.project.name", "model_id"]
215 logger.info(f"ResourceAttributeSpanProcessor will copy: {self.attributes_to_copy}")
217 def on_start(self, span, _parent_context=None):
218 """Copy specified resource attributes to span attributes when span starts.
220 Args:
221 span: The span being started.
222 _parent_context: The parent context (unused, required by interface).
223 """
224 if not hasattr(span, "resource") or span.resource is None:
225 return
227 # Get resource attributes
228 resource_attributes = getattr(span.resource, "attributes", {})
230 # Copy specified attributes from resource to span
231 for attr in self.attributes_to_copy:
232 if attr in resource_attributes: 232 ↛ 231line 232 didn't jump to line 231 because the condition on line 232 was always true
233 value = resource_attributes[attr]
234 span.set_attribute(attr, value)
235 logger.debug(f"Copied resource attribute to span: {attr}={value}")
237 def on_end(self, span):
238 """Handle span end event.
240 Required by the SpanProcessor interface but not used.
242 Args:
243 span: The span being ended.
244 """
245 pass # pylint: disable=unnecessary-pass
247 # Add the custom span processor to copy resource attributes to spans
248 # This is needed for Arize which requires certain attributes as span attributes
249 # Enable via OTEL_COPY_RESOURCE_ATTRS_TO_SPANS=true (disabled by default)
250 copy_resource_attrs = os.getenv("OTEL_COPY_RESOURCE_ATTRS_TO_SPANS", "false").lower() == "true"
251 if resource is not None and copy_resource_attrs:
252 logger.info("Adding ResourceAttributeSpanProcessor to copy resource attributes to spans")
253 provider.add_span_processor(ResourceAttributeSpanProcessor())
255 # Configure the appropriate exporter based on type
256 exporter: Optional[Any] = None
258 if exporter_type == "otlp":
259 endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
260 protocol = os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL", "grpc").lower()
261 headers = os.getenv("OTEL_EXPORTER_OTLP_HEADERS", "")
262 # Note: some versions of OTLP exporters may not accept 'insecure' kwarg; avoid passing it.
263 # Use endpoint scheme or env to control TLS externally.
265 # Parse headers if provided
266 header_dict: Dict[str, str] = {}
267 if headers:
268 for header in headers.split(","):
269 if "=" in header: 269 ↛ 268line 269 didn't jump to line 268 because the condition on line 269 was always true
270 key, value = header.split("=", 1)
271 header_dict[key.strip()] = value.strip()
273 if protocol == "grpc" and OTLP_SPAN_EXPORTER:
274 exporter = cast(Any, OTLP_SPAN_EXPORTER)(endpoint=endpoint, headers=header_dict or None)
275 elif HTTP_EXPORTER:
276 # Use HTTP exporter as fallback
277 ep = str(endpoint) if endpoint is not None else ""
278 http_ep = (ep.replace(":4317", ":4318") + "/v1/traces") if ":4317" in ep else ep
279 exporter = cast(Any, HTTP_EXPORTER)(endpoint=http_ep, headers=header_dict or None)
280 else:
281 logger.error("No OTLP exporter available")
282 return None
284 elif exporter_type == "jaeger":
285 if JAEGER_EXPORTER:
286 endpoint = os.getenv("OTEL_EXPORTER_JAEGER_ENDPOINT", "http://localhost:14268/api/traces")
287 exporter = JAEGER_EXPORTER(collector_endpoint=endpoint, username=os.getenv("OTEL_EXPORTER_JAEGER_USER"), password=os.getenv("OTEL_EXPORTER_JAEGER_PASSWORD"))
288 else:
289 logger.error("Jaeger exporter not available. Install with: pip install opentelemetry-exporter-jaeger")
290 return None
292 elif exporter_type == "zipkin":
293 if ZIPKIN_EXPORTER:
294 endpoint = os.getenv("OTEL_EXPORTER_ZIPKIN_ENDPOINT", "http://localhost:9411/api/v2/spans")
295 exporter = ZIPKIN_EXPORTER(endpoint=endpoint)
296 else:
297 logger.error("Zipkin exporter not available. Install with: pip install opentelemetry-exporter-zipkin")
298 return None
300 elif exporter_type == "console":
301 # Console exporter for debugging
302 exporter = cast(Any, ConsoleSpanExporter)()
304 else:
305 logger.warning(f"Unknown exporter type: {exporter_type}. Using console exporter.")
306 exporter = cast(Any, ConsoleSpanExporter)()
308 if exporter: 308 ↛ 323line 308 didn't jump to line 323 because the condition on line 308 was always true
309 # Add batch processor for better performance (except for console)
310 if exporter_type == "console":
311 span_processor = cast(Any, SimpleSpanProcessor)(exporter)
312 else:
313 span_processor = cast(Any, BatchSpanProcessor)(
314 exporter,
315 max_queue_size=int(os.getenv("OTEL_BSP_MAX_QUEUE_SIZE", "2048")),
316 max_export_batch_size=int(os.getenv("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "512")),
317 schedule_delay_millis=int(os.getenv("OTEL_BSP_SCHEDULE_DELAY", "5000")),
318 )
319 provider.add_span_processor(span_processor)
321 # Get tracer
322 # Obtain a tracer if trace API available; otherwise create a no-op tracer
323 if trace is not None and hasattr(trace, "get_tracer"):
324 _TRACER = cast(Any, trace).get_tracer("mcp-gateway", "1.0.0-BETA-2", schema_url="https://opentelemetry.io/schemas/1.11.0")
325 else:
327 class _NoopTracer:
328 """No-op tracer used when OpenTelemetry API isn't available."""
330 def start_as_current_span(self, _name: str): # type: ignore[override]
331 """Return a no-op context manager for span creation.
333 Args:
334 _name: Span name (ignored in no-op implementation).
336 Returns:
337 contextlib.AbstractContextManager: A null context.
338 """
339 return nullcontext()
341 _TRACER = _NoopTracer()
343 logger.info(f"✅ OpenTelemetry initialized with {exporter_type} exporter")
344 if exporter_type == "otlp":
345 logger.info(f" Endpoint: {os.getenv('OTEL_EXPORTER_OTLP_ENDPOINT')}")
346 elif exporter_type == "jaeger":
347 logger.info(f" Endpoint: {os.getenv('OTEL_EXPORTER_JAEGER_ENDPOINT', 'default')}")
348 elif exporter_type == "zipkin":
349 logger.info(f" Endpoint: {os.getenv('OTEL_EXPORTER_ZIPKIN_ENDPOINT', 'default')}")
351 return _TRACER
353 except Exception as e:
354 logger.error(f"Failed to initialize OpenTelemetry: {e}")
355 return None
358def trace_operation(operation_name: str, attributes: Optional[Dict[str, Any]] = None) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
359 """
360 Simple decorator to trace any operation.
362 Args:
363 operation_name: Name of the operation to trace (e.g., "tool.invoke").
364 attributes: Optional dictionary of attributes to add to the span.
366 Returns:
367 Decorator function that wraps the target function with tracing.
369 Usage:
370 @trace_operation("tool.invoke", {"tool.name": "calculator"})
371 async def invoke_tool():
372 ...
373 """
375 def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
376 """Decorator that wraps the function with tracing.
378 Args:
379 func: The async function to wrap with tracing.
381 Returns:
382 The wrapped function with tracing capabilities.
383 """
385 async def wrapper(*args: Any, **kwargs: Any) -> Any:
386 """Async wrapper that adds tracing to the decorated function.
388 Args:
389 *args: Positional arguments passed to the wrapped function.
390 **kwargs: Keyword arguments passed to the wrapped function.
392 Returns:
393 The result of the wrapped function.
395 Raises:
396 Exception: Any exception raised by the wrapped function.
397 """
398 if not _TRACER:
399 # No tracing configured, just run the function
400 return await func(*args, **kwargs)
402 # Create span for this operation
403 with _TRACER.start_as_current_span(operation_name) as span:
404 # Add attributes if provided
405 if attributes:
406 for key, value in attributes.items():
407 span.set_attribute(key, value)
409 try:
410 # Run the actual function
411 result = await func(*args, **kwargs)
412 span.set_attribute("status", "success")
413 return result
414 except Exception as e:
415 # Record error in span
416 span.set_attribute("status", "error")
417 span.set_attribute("error.message", str(e))
418 span.record_exception(e)
419 raise
421 return wrapper
423 return decorator
426def create_span(name: str, attributes: Optional[Dict[str, Any]] = None) -> Any:
427 """
428 Create a span for manual instrumentation.
430 Args:
431 name: Name of the span to create (e.g., "database.query").
432 attributes: Optional dictionary of attributes to add to the span.
434 Returns:
435 Context manager that creates and manages the span lifecycle.
437 Usage:
438 with create_span("database.query", {"db.statement": "SELECT * FROM tools"}):
439 # Your code here
440 pass
441 """
442 if not _TRACER:
443 # Return a no-op context manager if tracing is not configured or available
444 return nullcontext()
446 # Auto-inject correlation ID into all spans for request tracing
447 try:
448 correlation_id = get_correlation_id()
449 if correlation_id:
450 if attributes is None:
451 attributes = {}
452 # Add correlation ID if not already present
453 if "correlation_id" not in attributes: 453 ↛ 455line 453 didn't jump to line 455 because the condition on line 453 was always true
454 attributes["correlation_id"] = correlation_id
455 if "request_id" not in attributes: 455 ↛ 462line 455 didn't jump to line 462 because the condition on line 455 was always true
456 attributes["request_id"] = correlation_id # Alias for compatibility
457 except Exception as exc:
458 # Correlation ID not available or error getting it, continue without it
459 logger.debug("Failed to add correlation_id to span: %s", exc)
461 # Start span and return the context manager
462 span_context = _TRACER.start_as_current_span(name)
464 # If we have attributes and the span context is entered, set them
465 if attributes:
466 # We need to set attributes after entering the context
467 # So we'll create a wrapper that sets attributes
468 class SpanWithAttributes:
469 """Context manager wrapper that adds attributes to a span.
471 This class wraps an OpenTelemetry span context and adds attributes
472 when entering the context. It also handles exception recording when
473 exiting the context.
474 """
476 def __init__(self, span_context: Any, attrs: Dict[str, Any]):
477 """Initialize the span wrapper.
479 Args:
480 span_context: The OpenTelemetry span context to wrap.
481 attrs: Dictionary of attributes to add to the span.
482 """
483 self.span_context: Any = span_context
484 self.attrs: Dict[str, Any] = attrs
485 self.span: Any = None
487 def __enter__(self) -> Any:
488 """Enter the context and set span attributes.
490 Returns:
491 The OpenTelemetry span with attributes set.
492 """
493 self.span = self.span_context.__enter__()
494 if self.attrs and self.span: 494 ↛ 498line 494 didn't jump to line 498 because the condition on line 494 was always true
495 for key, value in self.attrs.items():
496 if value is not None: # Skip None values
497 self.span.set_attribute(key, value)
498 return self.span
500 def __exit__(self, exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Any) -> Any:
501 """Exit the context and record any exceptions.
503 Args:
504 exc_type: The exception type if an exception occurred.
505 exc_val: The exception value if an exception occurred.
506 exc_tb: The exception traceback if an exception occurred.
508 Returns:
509 The result of the wrapped span context's __exit__ method.
510 """
511 # Record exception if one occurred
512 if exc_type is not None and self.span:
513 self.span.record_exception(exc_val)
514 if OTEL_AVAILABLE and Status and StatusCode: 514 ↛ 516line 514 didn't jump to line 516 because the condition on line 514 was always true
515 self.span.set_status(Status(StatusCode.ERROR, str(exc_val)))
516 self.span.set_attribute("error", True)
517 self.span.set_attribute("error.type", exc_type.__name__)
518 self.span.set_attribute("error.message", str(exc_val))
519 elif self.span: 519 ↛ 522line 519 didn't jump to line 522 because the condition on line 519 was always true
520 if OTEL_AVAILABLE and Status and StatusCode:
521 self.span.set_status(Status(StatusCode.OK))
522 return self.span_context.__exit__(exc_type, exc_val, exc_tb)
524 return SpanWithAttributes(span_context, attributes)
526 return span_context
529# Initialize on module import
530_TRACER = init_telemetry()