Coverage for mcpgateway / services / metrics.py: 87%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2""" 

3Location: ./mcpgateway/services/metrics.py 

4Copyright 2025 

5SPDX-License-Identifier: Apache-2.0 

6 

7ContextForge Metrics Service. 

8 

9This module provides comprehensive Prometheus metrics instrumentation for ContextForge. 

10It configures and exposes HTTP metrics including request counts, latencies, response sizes, 

11and custom application metrics. 

12 

13The service automatically instruments FastAPI applications with standard HTTP metrics 

14and provides configurable exclusion patterns for endpoints that should not be monitored. 

15Metrics are exposed at the `/metrics/prometheus` endpoint in Prometheus format. 

16 

17Supported Metrics: 

18- http_requests_total: Counter for total HTTP requests by method, endpoint, and status 

19- http_request_duration_seconds: Histogram of request processing times 

20- http_request_size_bytes: Histogram of incoming request payload sizes 

21- http_response_size_bytes: Histogram of outgoing response payload sizes 

22- app_info: Gauge with custom static labels for application metadata 

23 

24Environment Variables: 

25- ENABLE_METRICS: Enable/disable metrics collection (default: "false") 

26- METRICS_EXCLUDED_HANDLERS: Comma-separated regex patterns for excluded endpoints 

27- METRICS_CUSTOM_LABELS: Custom labels for app_info gauge (format: "key1=value1,key2=value2") 

28 

29Usage: 

30 from mcpgateway.services.metrics import setup_metrics 

31 

32 app = FastAPI() 

33 setup_metrics(app) # Automatically instruments the app 

34 

35 # Metrics available at: GET /metrics/prometheus 

36 

37Functions: 

38- setup_metrics: Configure Prometheus instrumentation for FastAPI app 

39""" 

40 

41# Standard 

42import gzip 

43import os 

44import re 

45 

46# Third-Party 

47from fastapi import Depends, Request, Response, status 

48from prometheus_client import CONTENT_TYPE_LATEST, Counter, Gauge, generate_latest, REGISTRY 

49from prometheus_fastapi_instrumentator import Instrumentator 

50 

51# First-Party 

52from mcpgateway.config import settings 

53 

54 

55def _get_registry_collector(metric_name: str): 

56 """Best-effort lookup for a registered collector by metric name. 

57 

58 Prometheus client's public API does not expose a lookup helper, and tests 

59 may instantiate multiple apps in the same process. We use a guarded access 

60 to the internal registry mapping to avoid duplicate registrations. 

61 

62 Args: 

63 metric_name (str): Metric name to look up. 

64 

65 Returns: 

66 Any: Registered collector for the metric name, if available. 

67 """ 

68 

69 names_to_collectors = getattr(REGISTRY, "_names_to_collectors", None) 

70 if not isinstance(names_to_collectors, dict): 

71 return None 

72 return names_to_collectors.get(metric_name) 

73 

74 

75# Global Metrics 

76# Exposed for import by services/plugins to increment counters 

77tool_timeout_counter = Counter( 

78 "tool_timeout_total", 

79 "Total number of tool invocation timeouts", 

80 ["tool_name"], 

81) 

82 

83circuit_breaker_open_counter = Counter( 

84 "circuit_breaker_open_total", 

85 "Total number of times circuit breaker opened", 

86 ["tool_name"], 

87) 

88 

89password_reset_requests_counter = Counter( 

90 "password_reset_requests_total", 

91 "Total number of password reset requests", 

92 ["outcome"], 

93) 

94 

95password_reset_completions_counter = Counter( 

96 "password_reset_completions_total", 

97 "Total number of password reset completion attempts", 

98 ["outcome"], 

99) 

100 

101# Content Security Metrics (US-2) 

102content_size_violations_counter = Counter( 

103 "content_size_violations_total", 

104 "Total number of content size limit violations", 

105 ["content_type"], # "resource" or "prompt" 

106) 

107 

108content_type_violations_counter = Counter( 

109 "content_type_violations_total", 

110 "Total number of MIME type violations", 

111 ["content_type"], # "resource" or "prompt" — rejected type is in logs, not labels (unbounded cardinality) 

112) 

113 

114# MCP Auth Cache Metrics 

115mcp_auth_cache_events_counter = Counter( 

116 "mcp_auth_cache_events_total", 

117 "Total number of MCP auth cache events by outcome", 

118 ["outcome"], 

119) 

120 

121 

122def setup_metrics(app): 

123 """ 

124 Configure Prometheus metrics instrumentation for a FastAPI application. 

125 

126 This function sets up comprehensive HTTP metrics collection including request counts, 

127 latencies, and payload sizes. It also handles custom application labels and endpoint 

128 exclusion patterns. 

129 

130 Args: 

131 app: FastAPI application instance to instrument 

132 

133 Environment Variables Used: 

134 ENABLE_METRICS (str): "true" to enable metrics, "false" to disable (default: "false") 

135 METRICS_EXCLUDED_HANDLERS (str): Comma-separated regex patterns for endpoints 

136 to exclude from metrics collection 

137 METRICS_CUSTOM_LABELS (str): Custom labels in "key1=value1,key2=value2" format 

138 for the app_info gauge metric 

139 

140 Side Effects: 

141 - Registers Prometheus metrics collectors with the global registry 

142 - Adds middleware to the FastAPI app for request instrumentation 

143 - Exposes /metrics/prometheus endpoint for Prometheus scraping 

144 - Prints status messages to stdout 

145 

146 Example: 

147 >>> from fastapi import FastAPI 

148 >>> from mcpgateway.services.metrics import setup_metrics 

149 >>> app = FastAPI() 

150 >>> # setup_metrics(app) # Configures Prometheus metrics 

151 >>> # Metrics available at GET /metrics/prometheus 

152 """ 

153 enable_metrics = settings.ENABLE_METRICS 

154 

155 if enable_metrics: 

156 # Detect database engine from DATABASE_URL 

157 database_url = settings.database_url.lower() 

158 if database_url.startswith(("postgresql", "postgres://")): 

159 db_engine = "postgresql" 

160 elif database_url.startswith("sqlite"): 

161 db_engine = "sqlite" 

162 else: 

163 db_engine = "unknown" 

164 

165 # Custom labels gauge with automatic database engine detection 

166 # NOTE: setup_metrics may be invoked multiple times in a single process 

167 # (tests instantiate multiple FastAPI apps). Prometheus client registries 

168 # do not allow registering the same metric name twice, so we must re-use 

169 # an existing collector when present. 

170 custom_labels = dict(kv.split("=") for kv in os.getenv("METRICS_CUSTOM_LABELS", "").split(",") if "=" in kv) 

171 

172 # Always include database engine in metrics 

173 custom_labels["engine"] = db_engine 

174 

175 # Use a deterministic label order for stable registration. 

176 # Keep `engine` first, then any custom labels sorted. 

177 extra_label_names = sorted(label for label in custom_labels.keys() if label != "engine") 

178 desired_label_names = ["engine", *extra_label_names] 

179 

180 app_info_gauge = _get_registry_collector("app_info") 

181 if app_info_gauge is None: 

182 try: 

183 app_info_gauge = Gauge( 

184 "app_info", 

185 "Static labels for the application", 

186 labelnames=desired_label_names, 

187 registry=REGISTRY, 

188 ) 

189 except ValueError: 

190 # Another test/app instance registered it first; reuse it. 

191 app_info_gauge = _get_registry_collector("app_info") 

192 

193 if app_info_gauge is not None: 

194 labelnames = getattr(app_info_gauge, "_labelnames", ()) 

195 if labelnames: 

196 labels = {name: custom_labels.get(name, "") for name in labelnames} 

197 app_info_gauge.labels(**labels).set(1) 

198 else: 

199 app_info_gauge.set(1) 

200 

201 excluded = [pattern.strip() for pattern in (settings.METRICS_EXCLUDED_HANDLERS or "").split(",") if pattern.strip()] 

202 

203 # Add database metrics gauge 

204 db_info_gauge = _get_registry_collector("database_info") 

205 if db_info_gauge is None: 

206 try: 

207 db_info_gauge = Gauge( 

208 "database_info", 

209 "Database engine information", 

210 labelnames=["engine", "url_scheme"], 

211 registry=REGISTRY, 

212 ) 

213 except ValueError: 

214 db_info_gauge = _get_registry_collector("database_info") 

215 

216 # Extract URL scheme for additional context 

217 url_scheme = database_url.split("://", maxsplit=1)[0] if "://" in database_url else "unknown" 

218 if db_info_gauge is not None: 

219 db_info_gauge.labels(engine=db_engine, url_scheme=url_scheme).set(1) 

220 

221 # Add HTTP connection pool metrics with lazy initialization 

222 # These gauges are updated from app lifespan after SharedHttpClient is ready 

223 http_pool_max_connections = _get_registry_collector("http_pool_max_connections") 

224 if http_pool_max_connections is None: 

225 try: 

226 http_pool_max_connections = Gauge( 

227 "http_pool_max_connections", 

228 "Maximum allowed HTTP connections in the pool", 

229 registry=REGISTRY, 

230 ) 

231 except ValueError: 

232 http_pool_max_connections = _get_registry_collector("http_pool_max_connections") 

233 

234 http_pool_max_keepalive = _get_registry_collector("http_pool_max_keepalive_connections") 

235 if http_pool_max_keepalive is None: 

236 try: 

237 http_pool_max_keepalive = Gauge( 

238 "http_pool_max_keepalive_connections", 

239 "Maximum idle keepalive connections to retain", 

240 registry=REGISTRY, 

241 ) 

242 except ValueError: 

243 http_pool_max_keepalive = _get_registry_collector("http_pool_max_keepalive_connections") 

244 

245 # Store update function as a module-level attribute so it can be called 

246 # from the application lifespan after SharedHttpClient is initialized 

247 def update_http_pool_metrics(): 

248 """Update HTTP connection pool metrics from SharedHttpClient stats.""" 

249 try: 

250 # First-Party 

251 from mcpgateway.services.http_client_service import SharedHttpClient # pylint: disable=import-outside-toplevel 

252 

253 # Only update if client is initialized 

254 if SharedHttpClient._instance and SharedHttpClient._instance._initialized: # pylint: disable=protected-access 

255 stats = SharedHttpClient._instance.get_pool_stats() # pylint: disable=protected-access 

256 if http_pool_max_connections is not None: 

257 http_pool_max_connections.set(stats.get("max_connections", 0)) 

258 if http_pool_max_keepalive is not None: 

259 http_pool_max_keepalive.set(stats.get("max_keepalive", 0)) 

260 # Note: httpx doesn't expose current connection count, only limits 

261 except Exception: # nosec B110 

262 pass # Silently skip if client not initialized or error occurs 

263 

264 # Make the update function available at module level for lifespan calls 

265 app.state.update_http_pool_metrics = update_http_pool_metrics 

266 

267 # Create instrumentator instance 

268 instrumentator = Instrumentator( 

269 should_group_status_codes=False, 

270 should_ignore_untemplated=True, 

271 excluded_handlers=[re.compile(p) for p in excluded], 

272 ) 

273 

274 # Instrument FastAPI app 

275 instrumentator.instrument(app) 

276 

277 # Expose Prometheus metrics at /metrics/prometheus with auth. 

278 # We define the endpoint manually (instead of instrumentator.expose) 

279 # so we can gate it behind require_auth. 

280 # First-Party 

281 from mcpgateway.utils.verify_credentials import require_auth 

282 

283 @app.get("/metrics/prometheus", include_in_schema=True, tags=["Metrics"]) 

284 def prometheus_metrics(request: Request, _user=Depends(require_auth)): 

285 """Prometheus metrics endpoint (requires authentication). 

286 

287 Args: 

288 request: The incoming HTTP request (used to check Accept-Encoding). 

289 _user: Authenticated user from require_auth dependency. 

290 

291 Returns: 

292 Response: Prometheus metrics in text exposition format. 

293 """ 

294 registry = REGISTRY 

295 if "PROMETHEUS_MULTIPROC_DIR" in os.environ: 

296 # Third-Party 

297 from prometheus_client import CollectorRegistry, multiprocess 

298 

299 registry = CollectorRegistry() 

300 multiprocess.MultiProcessCollector(registry) 

301 if "gzip" in request.headers.get("Accept-Encoding", ""): 

302 resp = Response(content=gzip.compress(generate_latest(registry))) 

303 resp.headers["Content-Type"] = CONTENT_TYPE_LATEST 

304 resp.headers["Content-Encoding"] = "gzip" 

305 else: 

306 resp = Response(content=generate_latest(registry)) 

307 resp.headers["Content-Type"] = CONTENT_TYPE_LATEST 

308 return resp 

309 

310 print("✅ Metrics instrumentation enabled") 

311 else: 

312 print("⚠️ Metrics instrumentation disabled") 

313 

314 # First-Party 

315 from mcpgateway.utils.verify_credentials import require_auth 

316 

317 @app.get("/metrics/prometheus", tags=["Metrics"]) 

318 async def metrics_disabled(_user=Depends(require_auth)): # pylint: disable=unused-argument 

319 """Returns 503 when metrics collection is disabled (requires authentication). 

320 

321 Args: 

322 _user: Authenticated user from require_auth dependency. 

323 

324 Returns: 

325 Response: HTTP 503 response indicating metrics are disabled. 

326 """ 

327 return Response(content='{"error": "Metrics collection is disabled"}', media_type="application/json", status_code=status.HTTP_503_SERVICE_UNAVAILABLE)