Coverage for mcpgateway / observability.py: 96%

238 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/observability.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Vendor-agnostic OpenTelemetry instrumentation for MCP Gateway. 

8Supports any OTLP-compatible backend (Jaeger, Zipkin, Tempo, Phoenix, etc.). 

9""" 

10 

11# Standard 

12from contextlib import nullcontext 

13from importlib import import_module as _im 

14import logging 

15import os 

16from typing import Any, Callable, cast, Dict, Optional 

17 

18# Third-Party - Try to import OpenTelemetry core components - make them truly optional 

19OTEL_AVAILABLE = False 

20try: 

21 # Third-Party 

22 from opentelemetry import trace 

23 from opentelemetry.sdk.resources import Resource 

24 from opentelemetry.sdk.trace import TracerProvider 

25 from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor 

26 from opentelemetry.trace import Status, StatusCode 

27 

28 OTEL_AVAILABLE = True 

29except ImportError: 

30 # OpenTelemetry not installed - set to None for graceful degradation 

31 trace = None 

32 

33 # Provide a lightweight shim so tests can patch Resource.create 

34 class _ResourceShim: 

35 """Minimal Resource shim used when OpenTelemetry SDK isn't installed. 

36 

37 Exposes a static ``create`` method that simply returns the provided 

38 attributes mapping, enabling tests to patch and inspect the inputs 

39 without requiring the real OpenTelemetry classes. 

40 """ 

41 

42 @staticmethod 

43 def create(attrs: Dict[str, Any]) -> Dict[str, Any]: # type: ignore[override] 

44 """Return attributes unchanged to mimic ``Resource.create``. 

45 

46 Args: 

47 attrs: Resource attribute dictionary. 

48 

49 Returns: 

50 Dict[str, Any]: The same mapping passed in. 

51 """ 

52 return attrs 

53 

54 Resource = cast(Any, _ResourceShim) 

55 TracerProvider = None 

56 BatchSpanProcessor = None 

57 ConsoleSpanExporter = None 

58 SimpleSpanProcessor = None 

59 Status = None 

60 StatusCode = None 

61 

62 # Provide minimal module shims so tests can patch ConsoleSpanExporter path 

63 try: 

64 # Standard 

65 import sys 

66 import types 

67 

68 if ("pytest" in sys.modules) or (os.getenv("MCP_TESTING") == "1"): 68 ↛ 97line 68 didn't jump to line 97 because the condition on line 68 was always true

69 otel_root = types.ModuleType("opentelemetry") 

70 otel_sdk = types.ModuleType("opentelemetry.sdk") 

71 otel_trace = types.ModuleType("opentelemetry.sdk.trace") 

72 otel_export = types.ModuleType("opentelemetry.sdk.trace.export") 

73 

74 class _ConsoleSpanExporterStub: # pragma: no cover - test patch replaces this 

75 """Lightweight stub for ConsoleSpanExporter used in tests. 

76 

77 Provides a placeholder class so unit tests can patch 

78 `opentelemetry.sdk.trace.export.ConsoleSpanExporter` even when 

79 the OpenTelemetry SDK is not installed in the environment. 

80 """ 

81 

82 setattr(otel_export, "ConsoleSpanExporter", _ConsoleSpanExporterStub) 

83 setattr(otel_trace, "export", otel_export) 

84 setattr(otel_sdk, "trace", otel_trace) 

85 setattr(otel_root, "sdk", otel_sdk) 

86 

87 # Only register the exact chain needed by tests 

88 sys.modules.setdefault("opentelemetry", otel_root) 

89 sys.modules.setdefault("opentelemetry.sdk", otel_sdk) 

90 sys.modules.setdefault("opentelemetry.sdk.trace", otel_trace) 

91 sys.modules.setdefault("opentelemetry.sdk.trace.export", otel_export) 

92 except Exception as exc: # nosec B110 - best-effort optional shim 

93 # Shimming is a non-critical, best-effort step for tests; log and continue. 

94 logging.getLogger(__name__).debug("Skipping OpenTelemetry shim setup: %s", exc) 

95 

96# First-Party 

97from mcpgateway.utils.correlation_id import get_correlation_id # noqa: E402 # pylint: disable=wrong-import-position 

98 

99# Try to import optional exporters 

100try: 

101 OTLP_SPAN_EXPORTER = getattr(_im("opentelemetry.exporter.otlp.proto.grpc.trace_exporter"), "OTLPSpanExporter") 

102except Exception: 

103 try: 

104 OTLP_SPAN_EXPORTER = getattr(_im("opentelemetry.exporter.otlp.proto.http.trace_exporter"), "OTLPSpanExporter") 

105 except Exception: 

106 OTLP_SPAN_EXPORTER = None 

107 

108try: 

109 JAEGER_EXPORTER = getattr(_im("opentelemetry.exporter.jaeger.thrift"), "JaegerExporter") 

110except Exception: 

111 JAEGER_EXPORTER = None 

112 

113try: 

114 ZIPKIN_EXPORTER = getattr(_im("opentelemetry.exporter.zipkin.json"), "ZipkinExporter") 

115except Exception: 

116 ZIPKIN_EXPORTER = None 

117 

118try: 

119 HTTP_EXPORTER = getattr(_im("opentelemetry.exporter.otlp.proto.http.trace_exporter"), "OTLPSpanExporter") 

120except Exception: 

121 HTTP_EXPORTER = None 

122 

123logger = logging.getLogger(__name__) 

124 

125 

126# Global tracer instance - using UPPER_CASE for module-level constant 

127# pylint: disable=invalid-name 

128_TRACER = None 

129 

130 

131def init_telemetry() -> Optional[Any]: 

132 """Initialize OpenTelemetry with configurable backend. 

133 

134 Supports multiple backends via environment variables: 

135 - OTEL_TRACES_EXPORTER: Exporter type (otlp, jaeger, zipkin, console, none) 

136 - OTEL_EXPORTER_OTLP_ENDPOINT: OTLP endpoint (for otlp exporter) 

137 - OTEL_EXPORTER_JAEGER_ENDPOINT: Jaeger endpoint (for jaeger exporter) 

138 - OTEL_EXPORTER_ZIPKIN_ENDPOINT: Zipkin endpoint (for zipkin exporter) 

139 - OTEL_ENABLE_OBSERVABILITY: Set to 'true' to enable (disabled by default) 

140 

141 Returns: 

142 The initialized tracer instance or None if disabled. 

143 """ 

144 # pylint: disable=global-statement 

145 global _TRACER 

146 

147 # Check if observability is disabled (default: disabled) 

148 if os.getenv("OTEL_ENABLE_OBSERVABILITY", "false").lower() == "false": 

149 logger.info("Observability disabled via OTEL_ENABLE_OBSERVABILITY=false") 

150 return None 

151 

152 # If OpenTelemetry isn't installed, return early with graceful degradation 

153 if not OTEL_AVAILABLE: 

154 logger.warning("OpenTelemetry not installed - telemetry disabled") 

155 logger.info("To enable telemetry, install: pip install mcp-contextforge-gateway[observability]") 

156 return None 

157 

158 # Get exporter type from environment 

159 exporter_type = os.getenv("OTEL_TRACES_EXPORTER", "otlp").lower() 

160 

161 # Handle 'none' exporter (tracing disabled) 

162 if exporter_type == "none": 

163 logger.info("Tracing disabled via OTEL_TRACES_EXPORTER=none") 

164 return None 

165 

166 # Check if endpoint is configured for otlp 

167 if exporter_type == "otlp": 

168 endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT") 

169 if not endpoint: 

170 logger.info("OTLP endpoint not configured, skipping telemetry init") 

171 return None 

172 

173 try: 

174 # Create resource attributes 

175 resource_attributes: Dict[str, Any] = { 

176 "service.name": os.getenv("OTEL_SERVICE_NAME", "mcp-gateway"), 

177 "service.version": "1.0.0-BETA-2", 

178 "deployment.environment": os.getenv("DEPLOYMENT_ENV", "development"), 

179 } 

180 

181 # Add custom resource attributes from environment 

182 custom_attrs = os.getenv("OTEL_RESOURCE_ATTRIBUTES", "") 

183 if custom_attrs: 

184 for attr in custom_attrs.split(","): 

185 if "=" in attr: 185 ↛ 184line 185 didn't jump to line 184 because the condition on line 185 was always true

186 key, value = attr.split("=", 1) 

187 resource_attributes[key.strip()] = value.strip() 

188 

189 # Narrow types for mypy/pyrefly 

190 # Create resource if available, else skip 

191 resource: Optional[Any] 

192 if Resource is not None and hasattr(Resource, "create"): 

193 resource = cast(Any, Resource).create(resource_attributes) 

194 else: 

195 resource = None 

196 

197 # Set up tracer provider with optional sampling 

198 # Initialize tracer provider (with resource if available) 

199 if resource is not None: 

200 provider = cast(Any, TracerProvider)(resource=resource) 

201 else: 

202 provider = cast(Any, TracerProvider)() 

203 

204 # Register provider if trace API is present 

205 if trace is not None and hasattr(trace, "set_tracer_provider"): 

206 cast(Any, trace).set_tracer_provider(provider) 

207 

208 # Create a custom span processor to copy resource attributes to span attributes 

209 # This is needed because Arize requires arize.project.name as a span attribute 

210 class ResourceAttributeSpanProcessor: 

211 """Span processor that copies specific resource attributes to span attributes.""" 

212 

213 def __init__(self, attributes_to_copy=None): 

214 self.attributes_to_copy = attributes_to_copy or ["arize.project.name", "model_id"] 

215 logger.info(f"ResourceAttributeSpanProcessor will copy: {self.attributes_to_copy}") 

216 

217 def on_start(self, span, _parent_context=None): 

218 """Copy specified resource attributes to span attributes when span starts. 

219 

220 Args: 

221 span: The span being started. 

222 _parent_context: The parent context (unused, required by interface). 

223 """ 

224 if not hasattr(span, "resource") or span.resource is None: 

225 return 

226 

227 # Get resource attributes 

228 resource_attributes = getattr(span.resource, "attributes", {}) 

229 

230 # Copy specified attributes from resource to span 

231 for attr in self.attributes_to_copy: 

232 if attr in resource_attributes: 232 ↛ 231line 232 didn't jump to line 231 because the condition on line 232 was always true

233 value = resource_attributes[attr] 

234 span.set_attribute(attr, value) 

235 logger.debug(f"Copied resource attribute to span: {attr}={value}") 

236 

237 def on_end(self, span): 

238 """Handle span end event. 

239 

240 Required by the SpanProcessor interface but not used. 

241 

242 Args: 

243 span: The span being ended. 

244 """ 

245 pass # pylint: disable=unnecessary-pass 

246 

247 # Add the custom span processor to copy resource attributes to spans 

248 # This is needed for Arize which requires certain attributes as span attributes 

249 # Enable via OTEL_COPY_RESOURCE_ATTRS_TO_SPANS=true (disabled by default) 

250 copy_resource_attrs = os.getenv("OTEL_COPY_RESOURCE_ATTRS_TO_SPANS", "false").lower() == "true" 

251 if resource is not None and copy_resource_attrs: 

252 logger.info("Adding ResourceAttributeSpanProcessor to copy resource attributes to spans") 

253 provider.add_span_processor(ResourceAttributeSpanProcessor()) 

254 

255 # Configure the appropriate exporter based on type 

256 exporter: Optional[Any] = None 

257 

258 if exporter_type == "otlp": 

259 endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT") 

260 protocol = os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL", "grpc").lower() 

261 headers = os.getenv("OTEL_EXPORTER_OTLP_HEADERS", "") 

262 # Note: some versions of OTLP exporters may not accept 'insecure' kwarg; avoid passing it. 

263 # Use endpoint scheme or env to control TLS externally. 

264 

265 # Parse headers if provided 

266 header_dict: Dict[str, str] = {} 

267 if headers: 

268 for header in headers.split(","): 

269 if "=" in header: 269 ↛ 268line 269 didn't jump to line 268 because the condition on line 269 was always true

270 key, value = header.split("=", 1) 

271 header_dict[key.strip()] = value.strip() 

272 

273 if protocol == "grpc" and OTLP_SPAN_EXPORTER: 

274 exporter = cast(Any, OTLP_SPAN_EXPORTER)(endpoint=endpoint, headers=header_dict or None) 

275 elif HTTP_EXPORTER: 

276 # Use HTTP exporter as fallback 

277 ep = str(endpoint) if endpoint is not None else "" 

278 http_ep = (ep.replace(":4317", ":4318") + "/v1/traces") if ":4317" in ep else ep 

279 exporter = cast(Any, HTTP_EXPORTER)(endpoint=http_ep, headers=header_dict or None) 

280 else: 

281 logger.error("No OTLP exporter available") 

282 return None 

283 

284 elif exporter_type == "jaeger": 

285 if JAEGER_EXPORTER: 

286 endpoint = os.getenv("OTEL_EXPORTER_JAEGER_ENDPOINT", "http://localhost:14268/api/traces") 

287 exporter = JAEGER_EXPORTER(collector_endpoint=endpoint, username=os.getenv("OTEL_EXPORTER_JAEGER_USER"), password=os.getenv("OTEL_EXPORTER_JAEGER_PASSWORD")) 

288 else: 

289 logger.error("Jaeger exporter not available. Install with: pip install opentelemetry-exporter-jaeger") 

290 return None 

291 

292 elif exporter_type == "zipkin": 

293 if ZIPKIN_EXPORTER: 

294 endpoint = os.getenv("OTEL_EXPORTER_ZIPKIN_ENDPOINT", "http://localhost:9411/api/v2/spans") 

295 exporter = ZIPKIN_EXPORTER(endpoint=endpoint) 

296 else: 

297 logger.error("Zipkin exporter not available. Install with: pip install opentelemetry-exporter-zipkin") 

298 return None 

299 

300 elif exporter_type == "console": 

301 # Console exporter for debugging 

302 exporter = cast(Any, ConsoleSpanExporter)() 

303 

304 else: 

305 logger.warning(f"Unknown exporter type: {exporter_type}. Using console exporter.") 

306 exporter = cast(Any, ConsoleSpanExporter)() 

307 

308 if exporter: 308 ↛ 323line 308 didn't jump to line 323 because the condition on line 308 was always true

309 # Add batch processor for better performance (except for console) 

310 if exporter_type == "console": 

311 span_processor = cast(Any, SimpleSpanProcessor)(exporter) 

312 else: 

313 span_processor = cast(Any, BatchSpanProcessor)( 

314 exporter, 

315 max_queue_size=int(os.getenv("OTEL_BSP_MAX_QUEUE_SIZE", "2048")), 

316 max_export_batch_size=int(os.getenv("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "512")), 

317 schedule_delay_millis=int(os.getenv("OTEL_BSP_SCHEDULE_DELAY", "5000")), 

318 ) 

319 provider.add_span_processor(span_processor) 

320 

321 # Get tracer 

322 # Obtain a tracer if trace API available; otherwise create a no-op tracer 

323 if trace is not None and hasattr(trace, "get_tracer"): 

324 _TRACER = cast(Any, trace).get_tracer("mcp-gateway", "1.0.0-BETA-2", schema_url="https://opentelemetry.io/schemas/1.11.0") 

325 else: 

326 

327 class _NoopTracer: 

328 """No-op tracer used when OpenTelemetry API isn't available.""" 

329 

330 def start_as_current_span(self, _name: str): # type: ignore[override] 

331 """Return a no-op context manager for span creation. 

332 

333 Args: 

334 _name: Span name (ignored in no-op implementation). 

335 

336 Returns: 

337 contextlib.AbstractContextManager: A null context. 

338 """ 

339 return nullcontext() 

340 

341 _TRACER = _NoopTracer() 

342 

343 logger.info(f"✅ OpenTelemetry initialized with {exporter_type} exporter") 

344 if exporter_type == "otlp": 

345 logger.info(f" Endpoint: {os.getenv('OTEL_EXPORTER_OTLP_ENDPOINT')}") 

346 elif exporter_type == "jaeger": 

347 logger.info(f" Endpoint: {os.getenv('OTEL_EXPORTER_JAEGER_ENDPOINT', 'default')}") 

348 elif exporter_type == "zipkin": 

349 logger.info(f" Endpoint: {os.getenv('OTEL_EXPORTER_ZIPKIN_ENDPOINT', 'default')}") 

350 

351 return _TRACER 

352 

353 except Exception as e: 

354 logger.error(f"Failed to initialize OpenTelemetry: {e}") 

355 return None 

356 

357 

358def trace_operation(operation_name: str, attributes: Optional[Dict[str, Any]] = None) -> Callable[[Callable[..., Any]], Callable[..., Any]]: 

359 """ 

360 Simple decorator to trace any operation. 

361 

362 Args: 

363 operation_name: Name of the operation to trace (e.g., "tool.invoke"). 

364 attributes: Optional dictionary of attributes to add to the span. 

365 

366 Returns: 

367 Decorator function that wraps the target function with tracing. 

368 

369 Usage: 

370 @trace_operation("tool.invoke", {"tool.name": "calculator"}) 

371 async def invoke_tool(): 

372 ... 

373 """ 

374 

375 def decorator(func: Callable[..., Any]) -> Callable[..., Any]: 

376 """Decorator that wraps the function with tracing. 

377 

378 Args: 

379 func: The async function to wrap with tracing. 

380 

381 Returns: 

382 The wrapped function with tracing capabilities. 

383 """ 

384 

385 async def wrapper(*args: Any, **kwargs: Any) -> Any: 

386 """Async wrapper that adds tracing to the decorated function. 

387 

388 Args: 

389 *args: Positional arguments passed to the wrapped function. 

390 **kwargs: Keyword arguments passed to the wrapped function. 

391 

392 Returns: 

393 The result of the wrapped function. 

394 

395 Raises: 

396 Exception: Any exception raised by the wrapped function. 

397 """ 

398 if not _TRACER: 

399 # No tracing configured, just run the function 

400 return await func(*args, **kwargs) 

401 

402 # Create span for this operation 

403 with _TRACER.start_as_current_span(operation_name) as span: 

404 # Add attributes if provided 

405 if attributes: 

406 for key, value in attributes.items(): 

407 span.set_attribute(key, value) 

408 

409 try: 

410 # Run the actual function 

411 result = await func(*args, **kwargs) 

412 span.set_attribute("status", "success") 

413 return result 

414 except Exception as e: 

415 # Record error in span 

416 span.set_attribute("status", "error") 

417 span.set_attribute("error.message", str(e)) 

418 span.record_exception(e) 

419 raise 

420 

421 return wrapper 

422 

423 return decorator 

424 

425 

426def create_span(name: str, attributes: Optional[Dict[str, Any]] = None) -> Any: 

427 """ 

428 Create a span for manual instrumentation. 

429 

430 Args: 

431 name: Name of the span to create (e.g., "database.query"). 

432 attributes: Optional dictionary of attributes to add to the span. 

433 

434 Returns: 

435 Context manager that creates and manages the span lifecycle. 

436 

437 Usage: 

438 with create_span("database.query", {"db.statement": "SELECT * FROM tools"}): 

439 # Your code here 

440 pass 

441 """ 

442 if not _TRACER: 

443 # Return a no-op context manager if tracing is not configured or available 

444 return nullcontext() 

445 

446 # Auto-inject correlation ID into all spans for request tracing 

447 try: 

448 correlation_id = get_correlation_id() 

449 if correlation_id: 

450 if attributes is None: 

451 attributes = {} 

452 # Add correlation ID if not already present 

453 if "correlation_id" not in attributes: 453 ↛ 455line 453 didn't jump to line 455 because the condition on line 453 was always true

454 attributes["correlation_id"] = correlation_id 

455 if "request_id" not in attributes: 455 ↛ 462line 455 didn't jump to line 462 because the condition on line 455 was always true

456 attributes["request_id"] = correlation_id # Alias for compatibility 

457 except Exception as exc: 

458 # Correlation ID not available or error getting it, continue without it 

459 logger.debug("Failed to add correlation_id to span: %s", exc) 

460 

461 # Start span and return the context manager 

462 span_context = _TRACER.start_as_current_span(name) 

463 

464 # If we have attributes and the span context is entered, set them 

465 if attributes: 

466 # We need to set attributes after entering the context 

467 # So we'll create a wrapper that sets attributes 

468 class SpanWithAttributes: 

469 """Context manager wrapper that adds attributes to a span. 

470 

471 This class wraps an OpenTelemetry span context and adds attributes 

472 when entering the context. It also handles exception recording when 

473 exiting the context. 

474 """ 

475 

476 def __init__(self, span_context: Any, attrs: Dict[str, Any]): 

477 """Initialize the span wrapper. 

478 

479 Args: 

480 span_context: The OpenTelemetry span context to wrap. 

481 attrs: Dictionary of attributes to add to the span. 

482 """ 

483 self.span_context: Any = span_context 

484 self.attrs: Dict[str, Any] = attrs 

485 self.span: Any = None 

486 

487 def __enter__(self) -> Any: 

488 """Enter the context and set span attributes. 

489 

490 Returns: 

491 The OpenTelemetry span with attributes set. 

492 """ 

493 self.span = self.span_context.__enter__() 

494 if self.attrs and self.span: 494 ↛ 498line 494 didn't jump to line 498 because the condition on line 494 was always true

495 for key, value in self.attrs.items(): 

496 if value is not None: # Skip None values 

497 self.span.set_attribute(key, value) 

498 return self.span 

499 

500 def __exit__(self, exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Any) -> Any: 

501 """Exit the context and record any exceptions. 

502 

503 Args: 

504 exc_type: The exception type if an exception occurred. 

505 exc_val: The exception value if an exception occurred. 

506 exc_tb: The exception traceback if an exception occurred. 

507 

508 Returns: 

509 The result of the wrapped span context's __exit__ method. 

510 """ 

511 # Record exception if one occurred 

512 if exc_type is not None and self.span: 

513 self.span.record_exception(exc_val) 

514 if OTEL_AVAILABLE and Status and StatusCode: 514 ↛ 516line 514 didn't jump to line 516 because the condition on line 514 was always true

515 self.span.set_status(Status(StatusCode.ERROR, str(exc_val))) 

516 self.span.set_attribute("error", True) 

517 self.span.set_attribute("error.type", exc_type.__name__) 

518 self.span.set_attribute("error.message", str(exc_val)) 

519 elif self.span: 519 ↛ 522line 519 didn't jump to line 522 because the condition on line 519 was always true

520 if OTEL_AVAILABLE and Status and StatusCode: 

521 self.span.set_status(Status(StatusCode.OK)) 

522 return self.span_context.__exit__(exc_type, exc_val, exc_tb) 

523 

524 return SpanWithAttributes(span_context, attributes) 

525 

526 return span_context 

527 

528 

529# Initialize on module import 

530_TRACER = init_telemetry()