Coverage for mcpgateway / services / metrics.py: 87%
101 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""
3Location: ./mcpgateway/services/metrics.py
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
7ContextForge Metrics Service.
9This module provides comprehensive Prometheus metrics instrumentation for ContextForge.
10It configures and exposes HTTP metrics including request counts, latencies, response sizes,
11and custom application metrics.
13The service automatically instruments FastAPI applications with standard HTTP metrics
14and provides configurable exclusion patterns for endpoints that should not be monitored.
15Metrics are exposed at the `/metrics/prometheus` endpoint in Prometheus format.
17Supported Metrics:
18- http_requests_total: Counter for total HTTP requests by method, endpoint, and status
19- http_request_duration_seconds: Histogram of request processing times
20- http_request_size_bytes: Histogram of incoming request payload sizes
21- http_response_size_bytes: Histogram of outgoing response payload sizes
22- app_info: Gauge with custom static labels for application metadata
24Environment Variables:
25- ENABLE_METRICS: Enable/disable metrics collection (default: "false")
26- METRICS_EXCLUDED_HANDLERS: Comma-separated regex patterns for excluded endpoints
27- METRICS_CUSTOM_LABELS: Custom labels for app_info gauge (format: "key1=value1,key2=value2")
29Usage:
30 from mcpgateway.services.metrics import setup_metrics
32 app = FastAPI()
33 setup_metrics(app) # Automatically instruments the app
35 # Metrics available at: GET /metrics/prometheus
37Functions:
38- setup_metrics: Configure Prometheus instrumentation for FastAPI app
39"""
41# Standard
42import gzip
43import os
44import re
46# Third-Party
47from fastapi import Depends, Request, Response, status
48from prometheus_client import CONTENT_TYPE_LATEST, Counter, Gauge, generate_latest, REGISTRY
49from prometheus_fastapi_instrumentator import Instrumentator
51# First-Party
52from mcpgateway.config import settings
55def _get_registry_collector(metric_name: str):
56 """Best-effort lookup for a registered collector by metric name.
58 Prometheus client's public API does not expose a lookup helper, and tests
59 may instantiate multiple apps in the same process. We use a guarded access
60 to the internal registry mapping to avoid duplicate registrations.
62 Args:
63 metric_name (str): Metric name to look up.
65 Returns:
66 Any: Registered collector for the metric name, if available.
67 """
69 names_to_collectors = getattr(REGISTRY, "_names_to_collectors", None)
70 if not isinstance(names_to_collectors, dict):
71 return None
72 return names_to_collectors.get(metric_name)
75# Global Metrics
76# Exposed for import by services/plugins to increment counters
77tool_timeout_counter = Counter(
78 "tool_timeout_total",
79 "Total number of tool invocation timeouts",
80 ["tool_name"],
81)
83circuit_breaker_open_counter = Counter(
84 "circuit_breaker_open_total",
85 "Total number of times circuit breaker opened",
86 ["tool_name"],
87)
89password_reset_requests_counter = Counter(
90 "password_reset_requests_total",
91 "Total number of password reset requests",
92 ["outcome"],
93)
95password_reset_completions_counter = Counter(
96 "password_reset_completions_total",
97 "Total number of password reset completion attempts",
98 ["outcome"],
99)
101# Content Security Metrics (US-2)
102content_size_violations_counter = Counter(
103 "content_size_violations_total",
104 "Total number of content size limit violations",
105 ["content_type"], # "resource" or "prompt"
106)
108content_type_violations_counter = Counter(
109 "content_type_violations_total",
110 "Total number of MIME type violations",
111 ["content_type"], # "resource" or "prompt" — rejected type is in logs, not labels (unbounded cardinality)
112)
114# MCP Auth Cache Metrics
115mcp_auth_cache_events_counter = Counter(
116 "mcp_auth_cache_events_total",
117 "Total number of MCP auth cache events by outcome",
118 ["outcome"],
119)
122def setup_metrics(app):
123 """
124 Configure Prometheus metrics instrumentation for a FastAPI application.
126 This function sets up comprehensive HTTP metrics collection including request counts,
127 latencies, and payload sizes. It also handles custom application labels and endpoint
128 exclusion patterns.
130 Args:
131 app: FastAPI application instance to instrument
133 Environment Variables Used:
134 ENABLE_METRICS (str): "true" to enable metrics, "false" to disable (default: "false")
135 METRICS_EXCLUDED_HANDLERS (str): Comma-separated regex patterns for endpoints
136 to exclude from metrics collection
137 METRICS_CUSTOM_LABELS (str): Custom labels in "key1=value1,key2=value2" format
138 for the app_info gauge metric
140 Side Effects:
141 - Registers Prometheus metrics collectors with the global registry
142 - Adds middleware to the FastAPI app for request instrumentation
143 - Exposes /metrics/prometheus endpoint for Prometheus scraping
144 - Prints status messages to stdout
146 Example:
147 >>> from fastapi import FastAPI
148 >>> from mcpgateway.services.metrics import setup_metrics
149 >>> app = FastAPI()
150 >>> # setup_metrics(app) # Configures Prometheus metrics
151 >>> # Metrics available at GET /metrics/prometheus
152 """
153 enable_metrics = settings.ENABLE_METRICS
155 if enable_metrics:
156 # Detect database engine from DATABASE_URL
157 database_url = settings.database_url.lower()
158 if database_url.startswith(("postgresql", "postgres://")):
159 db_engine = "postgresql"
160 elif database_url.startswith("sqlite"):
161 db_engine = "sqlite"
162 else:
163 db_engine = "unknown"
165 # Custom labels gauge with automatic database engine detection
166 # NOTE: setup_metrics may be invoked multiple times in a single process
167 # (tests instantiate multiple FastAPI apps). Prometheus client registries
168 # do not allow registering the same metric name twice, so we must re-use
169 # an existing collector when present.
170 custom_labels = dict(kv.split("=") for kv in os.getenv("METRICS_CUSTOM_LABELS", "").split(",") if "=" in kv)
172 # Always include database engine in metrics
173 custom_labels["engine"] = db_engine
175 # Use a deterministic label order for stable registration.
176 # Keep `engine` first, then any custom labels sorted.
177 extra_label_names = sorted(label for label in custom_labels.keys() if label != "engine")
178 desired_label_names = ["engine", *extra_label_names]
180 app_info_gauge = _get_registry_collector("app_info")
181 if app_info_gauge is None:
182 try:
183 app_info_gauge = Gauge(
184 "app_info",
185 "Static labels for the application",
186 labelnames=desired_label_names,
187 registry=REGISTRY,
188 )
189 except ValueError:
190 # Another test/app instance registered it first; reuse it.
191 app_info_gauge = _get_registry_collector("app_info")
193 if app_info_gauge is not None:
194 labelnames = getattr(app_info_gauge, "_labelnames", ())
195 if labelnames:
196 labels = {name: custom_labels.get(name, "") for name in labelnames}
197 app_info_gauge.labels(**labels).set(1)
198 else:
199 app_info_gauge.set(1)
201 excluded = [pattern.strip() for pattern in (settings.METRICS_EXCLUDED_HANDLERS or "").split(",") if pattern.strip()]
203 # Add database metrics gauge
204 db_info_gauge = _get_registry_collector("database_info")
205 if db_info_gauge is None:
206 try:
207 db_info_gauge = Gauge(
208 "database_info",
209 "Database engine information",
210 labelnames=["engine", "url_scheme"],
211 registry=REGISTRY,
212 )
213 except ValueError:
214 db_info_gauge = _get_registry_collector("database_info")
216 # Extract URL scheme for additional context
217 url_scheme = database_url.split("://", maxsplit=1)[0] if "://" in database_url else "unknown"
218 if db_info_gauge is not None:
219 db_info_gauge.labels(engine=db_engine, url_scheme=url_scheme).set(1)
221 # Add HTTP connection pool metrics with lazy initialization
222 # These gauges are updated from app lifespan after SharedHttpClient is ready
223 http_pool_max_connections = _get_registry_collector("http_pool_max_connections")
224 if http_pool_max_connections is None:
225 try:
226 http_pool_max_connections = Gauge(
227 "http_pool_max_connections",
228 "Maximum allowed HTTP connections in the pool",
229 registry=REGISTRY,
230 )
231 except ValueError:
232 http_pool_max_connections = _get_registry_collector("http_pool_max_connections")
234 http_pool_max_keepalive = _get_registry_collector("http_pool_max_keepalive_connections")
235 if http_pool_max_keepalive is None:
236 try:
237 http_pool_max_keepalive = Gauge(
238 "http_pool_max_keepalive_connections",
239 "Maximum idle keepalive connections to retain",
240 registry=REGISTRY,
241 )
242 except ValueError:
243 http_pool_max_keepalive = _get_registry_collector("http_pool_max_keepalive_connections")
245 # Store update function as a module-level attribute so it can be called
246 # from the application lifespan after SharedHttpClient is initialized
247 def update_http_pool_metrics():
248 """Update HTTP connection pool metrics from SharedHttpClient stats."""
249 try:
250 # First-Party
251 from mcpgateway.services.http_client_service import SharedHttpClient # pylint: disable=import-outside-toplevel
253 # Only update if client is initialized
254 if SharedHttpClient._instance and SharedHttpClient._instance._initialized: # pylint: disable=protected-access
255 stats = SharedHttpClient._instance.get_pool_stats() # pylint: disable=protected-access
256 if http_pool_max_connections is not None:
257 http_pool_max_connections.set(stats.get("max_connections", 0))
258 if http_pool_max_keepalive is not None:
259 http_pool_max_keepalive.set(stats.get("max_keepalive", 0))
260 # Note: httpx doesn't expose current connection count, only limits
261 except Exception: # nosec B110
262 pass # Silently skip if client not initialized or error occurs
264 # Make the update function available at module level for lifespan calls
265 app.state.update_http_pool_metrics = update_http_pool_metrics
267 # Create instrumentator instance
268 instrumentator = Instrumentator(
269 should_group_status_codes=False,
270 should_ignore_untemplated=True,
271 excluded_handlers=[re.compile(p) for p in excluded],
272 )
274 # Instrument FastAPI app
275 instrumentator.instrument(app)
277 # Expose Prometheus metrics at /metrics/prometheus with auth.
278 # We define the endpoint manually (instead of instrumentator.expose)
279 # so we can gate it behind require_auth.
280 # First-Party
281 from mcpgateway.utils.verify_credentials import require_auth
283 @app.get("/metrics/prometheus", include_in_schema=True, tags=["Metrics"])
284 def prometheus_metrics(request: Request, _user=Depends(require_auth)):
285 """Prometheus metrics endpoint (requires authentication).
287 Args:
288 request: The incoming HTTP request (used to check Accept-Encoding).
289 _user: Authenticated user from require_auth dependency.
291 Returns:
292 Response: Prometheus metrics in text exposition format.
293 """
294 registry = REGISTRY
295 if "PROMETHEUS_MULTIPROC_DIR" in os.environ:
296 # Third-Party
297 from prometheus_client import CollectorRegistry, multiprocess
299 registry = CollectorRegistry()
300 multiprocess.MultiProcessCollector(registry)
301 if "gzip" in request.headers.get("Accept-Encoding", ""):
302 resp = Response(content=gzip.compress(generate_latest(registry)))
303 resp.headers["Content-Type"] = CONTENT_TYPE_LATEST
304 resp.headers["Content-Encoding"] = "gzip"
305 else:
306 resp = Response(content=generate_latest(registry))
307 resp.headers["Content-Type"] = CONTENT_TYPE_LATEST
308 return resp
310 print("✅ Metrics instrumentation enabled")
311 else:
312 print("⚠️ Metrics instrumentation disabled")
314 # First-Party
315 from mcpgateway.utils.verify_credentials import require_auth
317 @app.get("/metrics/prometheus", tags=["Metrics"])
318 async def metrics_disabled(_user=Depends(require_auth)): # pylint: disable=unused-argument
319 """Returns 503 when metrics collection is disabled (requires authentication).
321 Args:
322 _user: Authenticated user from require_auth dependency.
324 Returns:
325 Response: HTTP 503 response indicating metrics are disabled.
326 """
327 return Response(content='{"error": "Metrics collection is disabled"}', media_type="application/json", status_code=status.HTTP_503_SERVICE_UNAVAILABLE)