Coverage for mcpgateway / plugins / observability_adapter.py: 100%
48 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/plugins/observability_adapter.py
3Copyright 2026
4SPDX-License-Identifier: Apache-2.0
5Authors: Fred Araujo
7Adapter bridging ObservabilityService to the plugin framework's
8ObservabilityProvider protocol.
10The plugin framework defines a protocol-based ObservabilityProvider
11interface so that it stays decoupled from gateway internals. This
12adapter lives on the gateway side, wrapping ObservabilityService
13with its own database sessions so the executor can call
14start_span/end_span without needing a db parameter.
15"""
17# Standard
18import logging
19from typing import Any, Dict, Optional
21# Third-Party
22from sqlalchemy.orm import Session
24# First-Party
25from mcpgateway.db import SessionLocal
26from mcpgateway.services.observability_service import ObservabilityService
28logger = logging.getLogger(__name__)
31class ObservabilityServiceAdapter:
32 """Bridges ObservabilityService to the ObservabilityProvider protocol.
34 Satisfies the ObservabilityProvider protocol via duck typing (no explicit
35 inheritance needed). Each call creates its own short-lived DB session,
36 matching the pattern used in observability_middleware.py.
37 """
39 def __init__(self, service: Optional[ObservabilityService] = None):
40 """Initialize the adapter.
42 Args:
43 service: ObservabilityService instance to wrap (creates one if not provided).
44 """
45 self._service = service or ObservabilityService()
47 def _make_session(self) -> Session:
48 """Create a fresh DB session for observability writes.
50 Returns:
51 A new SQLAlchemy session.
52 """
53 return SessionLocal()
55 def start_span(
56 self,
57 trace_id: str,
58 name: str,
59 kind: str = "internal",
60 resource_type: Optional[str] = None,
61 resource_name: Optional[str] = None,
62 attributes: Optional[Dict[str, Any]] = None,
63 ) -> Optional[str]:
64 """Start a span by delegating to ObservabilityService with a fresh DB session.
66 Args:
67 trace_id: The trace identifier.
68 name: The span name.
69 kind: The span kind (e.g. "internal", "client", "server").
70 resource_type: Optional resource type being traced.
71 resource_name: Optional resource name being traced.
72 attributes: Optional key-value attributes for the span.
74 Returns:
75 The span identifier, or None on failure.
76 """
77 db: Optional[Session] = None
78 try:
79 db = self._make_session()
80 return self._service.start_span(
81 db=db,
82 trace_id=trace_id,
83 name=name,
84 kind=kind,
85 resource_type=resource_type,
86 resource_name=resource_name,
87 attributes=attributes,
88 )
89 except Exception as exc:
90 logger.warning("ObservabilityServiceAdapter.start_span failed: %s", exc)
91 if db:
92 try:
93 db.rollback()
94 except Exception: # nosec B110
95 pass
96 return None
97 finally:
98 if db:
99 try:
100 db.close()
101 except Exception: # nosec B110
102 pass
104 def end_span(
105 self,
106 span_id: Optional[str],
107 status: str = "ok",
108 attributes: Optional[Dict[str, Any]] = None,
109 ) -> None:
110 """End a span by delegating to ObservabilityService with a fresh DB session.
112 Args:
113 span_id: The span identifier returned by start_span.
114 status: The span status (e.g. "ok", "error").
115 attributes: Optional additional attributes to attach.
116 """
117 if span_id is None:
118 return
119 db: Optional[Session] = None
120 try:
121 db = self._make_session()
122 self._service.end_span(
123 db=db,
124 span_id=span_id,
125 status=status,
126 attributes=attributes,
127 )
128 except Exception as exc:
129 logger.warning("ObservabilityServiceAdapter.end_span failed: %s", exc)
130 if db:
131 try:
132 db.rollback()
133 except Exception: # nosec B110
134 pass
135 finally:
136 if db:
137 try:
138 db.close()
139 except Exception: # nosec B110
140 pass