Coverage for mcpgateway / services / metrics.py: 87%
102 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""
3Location: ./mcpgateway/services/metrics.py
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
7ContextForge Metrics Service.
9This module provides comprehensive Prometheus metrics instrumentation for ContextForge.
10It configures and exposes HTTP metrics including request counts, latencies, response sizes,
11and custom application metrics.
13The service automatically instruments FastAPI applications with standard HTTP metrics
14and provides configurable exclusion patterns for endpoints that should not be monitored.
15Metrics are exposed at the `/metrics/prometheus` endpoint in Prometheus format.
17Supported Metrics:
18- http_requests_total: Counter for total HTTP requests by method, endpoint, and status
19- http_request_duration_seconds: Histogram of request processing times
20- http_request_size_bytes: Histogram of incoming request payload sizes
21- http_response_size_bytes: Histogram of outgoing response payload sizes
22- app_info: Gauge with custom static labels for application metadata
24Environment Variables:
25- ENABLE_METRICS: Enable/disable metrics collection (default: "false")
26- METRICS_EXCLUDED_HANDLERS: Comma-separated regex patterns for excluded endpoints
27- METRICS_CUSTOM_LABELS: Custom labels for app_info gauge (format: "key1=value1,key2=value2")
29Usage:
30 from mcpgateway.services.metrics import setup_metrics
32 app = FastAPI()
33 setup_metrics(app) # Automatically instruments the app
35 # Metrics available at: GET /metrics/prometheus
37Functions:
38- setup_metrics: Configure Prometheus instrumentation for FastAPI app
39"""
41# Standard
42import gzip
43import os
44import re
46# Third-Party
47from fastapi import Depends, Request, Response, status
48from prometheus_client import CONTENT_TYPE_LATEST, Counter, Gauge, generate_latest, REGISTRY
49from prometheus_fastapi_instrumentator import Instrumentator
51# First-Party
52from mcpgateway.config import settings
55def _get_registry_collector(metric_name: str):
56 """Best-effort lookup for a registered collector by metric name.
58 Prometheus client's public API does not expose a lookup helper, and tests
59 may instantiate multiple apps in the same process. We use a guarded access
60 to the internal registry mapping to avoid duplicate registrations.
62 Args:
63 metric_name (str): Metric name to look up.
65 Returns:
66 Any: Registered collector for the metric name, if available.
67 """
69 names_to_collectors = getattr(REGISTRY, "_names_to_collectors", None)
70 if not isinstance(names_to_collectors, dict):
71 return None
72 return names_to_collectors.get(metric_name)
75# Global Metrics
76# Exposed for import by services/plugins to increment counters
77tool_timeout_counter = Counter(
78 "tool_timeout_total",
79 "Total number of tool invocation timeouts",
80 ["tool_name"],
81)
83circuit_breaker_open_counter = Counter(
84 "circuit_breaker_open_total",
85 "Total number of times circuit breaker opened",
86 ["tool_name"],
87)
89password_reset_requests_counter = Counter(
90 "password_reset_requests_total",
91 "Total number of password reset requests",
92 ["outcome"],
93)
95password_reset_completions_counter = Counter(
96 "password_reset_completions_total",
97 "Total number of password reset completion attempts",
98 ["outcome"],
99)
102def setup_metrics(app):
103 """
104 Configure Prometheus metrics instrumentation for a FastAPI application.
106 This function sets up comprehensive HTTP metrics collection including request counts,
107 latencies, and payload sizes. It also handles custom application labels and endpoint
108 exclusion patterns.
110 Args:
111 app: FastAPI application instance to instrument
113 Environment Variables Used:
114 ENABLE_METRICS (str): "true" to enable metrics, "false" to disable (default: "false")
115 METRICS_EXCLUDED_HANDLERS (str): Comma-separated regex patterns for endpoints
116 to exclude from metrics collection
117 METRICS_CUSTOM_LABELS (str): Custom labels in "key1=value1,key2=value2" format
118 for the app_info gauge metric
120 Side Effects:
121 - Registers Prometheus metrics collectors with the global registry
122 - Adds middleware to the FastAPI app for request instrumentation
123 - Exposes /metrics/prometheus endpoint for Prometheus scraping
124 - Prints status messages to stdout
126 Example:
127 >>> from fastapi import FastAPI
128 >>> from mcpgateway.services.metrics import setup_metrics
129 >>> app = FastAPI()
130 >>> # setup_metrics(app) # Configures Prometheus metrics
131 >>> # Metrics available at GET /metrics/prometheus
132 """
133 enable_metrics = settings.ENABLE_METRICS
135 if enable_metrics:
136 # Detect database engine from DATABASE_URL
137 database_url = settings.database_url.lower()
138 if database_url.startswith("mysql+pymysql://") or "mariadb" in database_url:
139 db_engine = "mariadb"
140 elif database_url.startswith("postgresql://") or database_url.startswith("postgres://"):
141 db_engine = "postgresql"
142 elif database_url.startswith("sqlite://"):
143 db_engine = "sqlite"
144 elif database_url.startswith("mongodb://"):
145 db_engine = "mongodb"
146 else:
147 db_engine = "unknown"
149 # Custom labels gauge with automatic database engine detection
150 # NOTE: setup_metrics may be invoked multiple times in a single process
151 # (tests instantiate multiple FastAPI apps). Prometheus client registries
152 # do not allow registering the same metric name twice, so we must re-use
153 # an existing collector when present.
154 custom_labels = dict(kv.split("=") for kv in os.getenv("METRICS_CUSTOM_LABELS", "").split(",") if "=" in kv)
156 # Always include database engine in metrics
157 custom_labels["engine"] = db_engine
159 # Use a deterministic label order for stable registration.
160 # Keep `engine` first, then any custom labels sorted.
161 extra_label_names = sorted(label for label in custom_labels.keys() if label != "engine")
162 desired_label_names = ["engine", *extra_label_names]
164 app_info_gauge = _get_registry_collector("app_info")
165 if app_info_gauge is None:
166 try:
167 app_info_gauge = Gauge(
168 "app_info",
169 "Static labels for the application",
170 labelnames=desired_label_names,
171 registry=REGISTRY,
172 )
173 except ValueError:
174 # Another test/app instance registered it first; reuse it.
175 app_info_gauge = _get_registry_collector("app_info")
177 if app_info_gauge is not None:
178 labelnames = getattr(app_info_gauge, "_labelnames", ())
179 if labelnames:
180 labels = {name: custom_labels.get(name, "") for name in labelnames}
181 app_info_gauge.labels(**labels).set(1)
182 else:
183 app_info_gauge.set(1)
185 excluded = [pattern.strip() for pattern in (settings.METRICS_EXCLUDED_HANDLERS or "").split(",") if pattern.strip()]
187 # Add database metrics gauge
188 db_info_gauge = _get_registry_collector("database_info")
189 if db_info_gauge is None:
190 try:
191 db_info_gauge = Gauge(
192 "database_info",
193 "Database engine information",
194 labelnames=["engine", "url_scheme"],
195 registry=REGISTRY,
196 )
197 except ValueError:
198 db_info_gauge = _get_registry_collector("database_info")
200 # Extract URL scheme for additional context
201 url_scheme = database_url.split("://", maxsplit=1)[0] if "://" in database_url else "unknown"
202 if db_info_gauge is not None:
203 db_info_gauge.labels(engine=db_engine, url_scheme=url_scheme).set(1)
205 # Add HTTP connection pool metrics with lazy initialization
206 # These gauges are updated from app lifespan after SharedHttpClient is ready
207 http_pool_max_connections = _get_registry_collector("http_pool_max_connections")
208 if http_pool_max_connections is None:
209 try:
210 http_pool_max_connections = Gauge(
211 "http_pool_max_connections",
212 "Maximum allowed HTTP connections in the pool",
213 registry=REGISTRY,
214 )
215 except ValueError:
216 http_pool_max_connections = _get_registry_collector("http_pool_max_connections")
218 http_pool_max_keepalive = _get_registry_collector("http_pool_max_keepalive_connections")
219 if http_pool_max_keepalive is None:
220 try:
221 http_pool_max_keepalive = Gauge(
222 "http_pool_max_keepalive_connections",
223 "Maximum idle keepalive connections to retain",
224 registry=REGISTRY,
225 )
226 except ValueError:
227 http_pool_max_keepalive = _get_registry_collector("http_pool_max_keepalive_connections")
229 # Store update function as a module-level attribute so it can be called
230 # from the application lifespan after SharedHttpClient is initialized
231 def update_http_pool_metrics():
232 """Update HTTP connection pool metrics from SharedHttpClient stats."""
233 try:
234 # First-Party
235 from mcpgateway.services.http_client_service import SharedHttpClient # pylint: disable=import-outside-toplevel
237 # Only update if client is initialized
238 if SharedHttpClient._instance and SharedHttpClient._instance._initialized: # pylint: disable=protected-access
239 stats = SharedHttpClient._instance.get_pool_stats() # pylint: disable=protected-access
240 if http_pool_max_connections is not None:
241 http_pool_max_connections.set(stats.get("max_connections", 0))
242 if http_pool_max_keepalive is not None:
243 http_pool_max_keepalive.set(stats.get("max_keepalive", 0))
244 # Note: httpx doesn't expose current connection count, only limits
245 except Exception: # nosec B110
246 pass # Silently skip if client not initialized or error occurs
248 # Make the update function available at module level for lifespan calls
249 app.state.update_http_pool_metrics = update_http_pool_metrics
251 # Create instrumentator instance
252 instrumentator = Instrumentator(
253 should_group_status_codes=False,
254 should_ignore_untemplated=True,
255 excluded_handlers=[re.compile(p) for p in excluded],
256 )
258 # Instrument FastAPI app
259 instrumentator.instrument(app)
261 # Expose Prometheus metrics at /metrics/prometheus with auth.
262 # We define the endpoint manually (instead of instrumentator.expose)
263 # so we can gate it behind require_auth.
264 # First-Party
265 from mcpgateway.utils.verify_credentials import require_auth
267 @app.get("/metrics/prometheus", include_in_schema=True, tags=["Metrics"])
268 def prometheus_metrics(request: Request, _user=Depends(require_auth)):
269 """Prometheus metrics endpoint (requires authentication).
271 Args:
272 request: The incoming HTTP request (used to check Accept-Encoding).
273 _user: Authenticated user from require_auth dependency.
275 Returns:
276 Response: Prometheus metrics in text exposition format.
277 """
278 registry = REGISTRY
279 if "PROMETHEUS_MULTIPROC_DIR" in os.environ:
280 # Third-Party
281 from prometheus_client import CollectorRegistry, multiprocess
283 registry = CollectorRegistry()
284 multiprocess.MultiProcessCollector(registry)
285 if "gzip" in request.headers.get("Accept-Encoding", ""):
286 resp = Response(content=gzip.compress(generate_latest(registry)))
287 resp.headers["Content-Type"] = CONTENT_TYPE_LATEST
288 resp.headers["Content-Encoding"] = "gzip"
289 else:
290 resp = Response(content=generate_latest(registry))
291 resp.headers["Content-Type"] = CONTENT_TYPE_LATEST
292 return resp
294 print("✅ Metrics instrumentation enabled")
295 else:
296 print("⚠️ Metrics instrumentation disabled")
298 # First-Party
299 from mcpgateway.utils.verify_credentials import require_auth
301 @app.get("/metrics/prometheus", tags=["Metrics"])
302 async def metrics_disabled(_user=Depends(require_auth)): # pylint: disable=unused-argument
303 """Returns 503 when metrics collection is disabled (requires authentication).
305 Args:
306 _user: Authenticated user from require_auth dependency.
308 Returns:
309 Response: HTTP 503 response indicating metrics are disabled.
310 """
311 return Response(content='{"error": "Metrics collection is disabled"}', media_type="application/json", status_code=status.HTTP_503_SERVICE_UNAVAILABLE)