Coverage for mcpgateway / services / metrics.py: 87%

102 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Location: ./mcpgateway/services/metrics.py 

4Copyright 2025 

5SPDX-License-Identifier: Apache-2.0 

6 

7ContextForge Metrics Service. 

8 

9This module provides comprehensive Prometheus metrics instrumentation for ContextForge. 

10It configures and exposes HTTP metrics including request counts, latencies, response sizes, 

11and custom application metrics. 

12 

13The service automatically instruments FastAPI applications with standard HTTP metrics 

14and provides configurable exclusion patterns for endpoints that should not be monitored. 

15Metrics are exposed at the `/metrics/prometheus` endpoint in Prometheus format. 

16 

17Supported Metrics: 

18- http_requests_total: Counter for total HTTP requests by method, endpoint, and status 

19- http_request_duration_seconds: Histogram of request processing times 

20- http_request_size_bytes: Histogram of incoming request payload sizes 

21- http_response_size_bytes: Histogram of outgoing response payload sizes 

22- app_info: Gauge with custom static labels for application metadata 

23 

24Environment Variables: 

25- ENABLE_METRICS: Enable/disable metrics collection (default: "false") 

26- METRICS_EXCLUDED_HANDLERS: Comma-separated regex patterns for excluded endpoints 

27- METRICS_CUSTOM_LABELS: Custom labels for app_info gauge (format: "key1=value1,key2=value2") 

28 

29Usage: 

30 from mcpgateway.services.metrics import setup_metrics 

31 

32 app = FastAPI() 

33 setup_metrics(app) # Automatically instruments the app 

34 

35 # Metrics available at: GET /metrics/prometheus 

36 

37Functions: 

38- setup_metrics: Configure Prometheus instrumentation for FastAPI app 

39""" 

40 

41# Standard 

42import gzip 

43import os 

44import re 

45 

46# Third-Party 

47from fastapi import Depends, Request, Response, status 

48from prometheus_client import CONTENT_TYPE_LATEST, Counter, Gauge, generate_latest, REGISTRY 

49from prometheus_fastapi_instrumentator import Instrumentator 

50 

51# First-Party 

52from mcpgateway.config import settings 

53 

54 

55def _get_registry_collector(metric_name: str): 

56 """Best-effort lookup for a registered collector by metric name. 

57 

58 Prometheus client's public API does not expose a lookup helper, and tests 

59 may instantiate multiple apps in the same process. We use a guarded access 

60 to the internal registry mapping to avoid duplicate registrations. 

61 

62 Args: 

63 metric_name (str): Metric name to look up. 

64 

65 Returns: 

66 Any: Registered collector for the metric name, if available. 

67 """ 

68 

69 names_to_collectors = getattr(REGISTRY, "_names_to_collectors", None) 

70 if not isinstance(names_to_collectors, dict): 

71 return None 

72 return names_to_collectors.get(metric_name) 

73 

74 

75# Global Metrics 

76# Exposed for import by services/plugins to increment counters 

77tool_timeout_counter = Counter( 

78 "tool_timeout_total", 

79 "Total number of tool invocation timeouts", 

80 ["tool_name"], 

81) 

82 

83circuit_breaker_open_counter = Counter( 

84 "circuit_breaker_open_total", 

85 "Total number of times circuit breaker opened", 

86 ["tool_name"], 

87) 

88 

89password_reset_requests_counter = Counter( 

90 "password_reset_requests_total", 

91 "Total number of password reset requests", 

92 ["outcome"], 

93) 

94 

95password_reset_completions_counter = Counter( 

96 "password_reset_completions_total", 

97 "Total number of password reset completion attempts", 

98 ["outcome"], 

99) 

100 

101 

102def setup_metrics(app): 

103 """ 

104 Configure Prometheus metrics instrumentation for a FastAPI application. 

105 

106 This function sets up comprehensive HTTP metrics collection including request counts, 

107 latencies, and payload sizes. It also handles custom application labels and endpoint 

108 exclusion patterns. 

109 

110 Args: 

111 app: FastAPI application instance to instrument 

112 

113 Environment Variables Used: 

114 ENABLE_METRICS (str): "true" to enable metrics, "false" to disable (default: "false") 

115 METRICS_EXCLUDED_HANDLERS (str): Comma-separated regex patterns for endpoints 

116 to exclude from metrics collection 

117 METRICS_CUSTOM_LABELS (str): Custom labels in "key1=value1,key2=value2" format 

118 for the app_info gauge metric 

119 

120 Side Effects: 

121 - Registers Prometheus metrics collectors with the global registry 

122 - Adds middleware to the FastAPI app for request instrumentation 

123 - Exposes /metrics/prometheus endpoint for Prometheus scraping 

124 - Prints status messages to stdout 

125 

126 Example: 

127 >>> from fastapi import FastAPI 

128 >>> from mcpgateway.services.metrics import setup_metrics 

129 >>> app = FastAPI() 

130 >>> # setup_metrics(app) # Configures Prometheus metrics 

131 >>> # Metrics available at GET /metrics/prometheus 

132 """ 

133 enable_metrics = settings.ENABLE_METRICS 

134 

135 if enable_metrics: 

136 # Detect database engine from DATABASE_URL 

137 database_url = settings.database_url.lower() 

138 if database_url.startswith("mysql+pymysql://") or "mariadb" in database_url: 

139 db_engine = "mariadb" 

140 elif database_url.startswith("postgresql://") or database_url.startswith("postgres://"): 

141 db_engine = "postgresql" 

142 elif database_url.startswith("sqlite://"): 

143 db_engine = "sqlite" 

144 elif database_url.startswith("mongodb://"): 

145 db_engine = "mongodb" 

146 else: 

147 db_engine = "unknown" 

148 

149 # Custom labels gauge with automatic database engine detection 

150 # NOTE: setup_metrics may be invoked multiple times in a single process 

151 # (tests instantiate multiple FastAPI apps). Prometheus client registries 

152 # do not allow registering the same metric name twice, so we must re-use 

153 # an existing collector when present. 

154 custom_labels = dict(kv.split("=") for kv in os.getenv("METRICS_CUSTOM_LABELS", "").split(",") if "=" in kv) 

155 

156 # Always include database engine in metrics 

157 custom_labels["engine"] = db_engine 

158 

159 # Use a deterministic label order for stable registration. 

160 # Keep `engine` first, then any custom labels sorted. 

161 extra_label_names = sorted(label for label in custom_labels.keys() if label != "engine") 

162 desired_label_names = ["engine", *extra_label_names] 

163 

164 app_info_gauge = _get_registry_collector("app_info") 

165 if app_info_gauge is None: 

166 try: 

167 app_info_gauge = Gauge( 

168 "app_info", 

169 "Static labels for the application", 

170 labelnames=desired_label_names, 

171 registry=REGISTRY, 

172 ) 

173 except ValueError: 

174 # Another test/app instance registered it first; reuse it. 

175 app_info_gauge = _get_registry_collector("app_info") 

176 

177 if app_info_gauge is not None: 

178 labelnames = getattr(app_info_gauge, "_labelnames", ()) 

179 if labelnames: 

180 labels = {name: custom_labels.get(name, "") for name in labelnames} 

181 app_info_gauge.labels(**labels).set(1) 

182 else: 

183 app_info_gauge.set(1) 

184 

185 excluded = [pattern.strip() for pattern in (settings.METRICS_EXCLUDED_HANDLERS or "").split(",") if pattern.strip()] 

186 

187 # Add database metrics gauge 

188 db_info_gauge = _get_registry_collector("database_info") 

189 if db_info_gauge is None: 

190 try: 

191 db_info_gauge = Gauge( 

192 "database_info", 

193 "Database engine information", 

194 labelnames=["engine", "url_scheme"], 

195 registry=REGISTRY, 

196 ) 

197 except ValueError: 

198 db_info_gauge = _get_registry_collector("database_info") 

199 

200 # Extract URL scheme for additional context 

201 url_scheme = database_url.split("://", maxsplit=1)[0] if "://" in database_url else "unknown" 

202 if db_info_gauge is not None: 

203 db_info_gauge.labels(engine=db_engine, url_scheme=url_scheme).set(1) 

204 

205 # Add HTTP connection pool metrics with lazy initialization 

206 # These gauges are updated from app lifespan after SharedHttpClient is ready 

207 http_pool_max_connections = _get_registry_collector("http_pool_max_connections") 

208 if http_pool_max_connections is None: 

209 try: 

210 http_pool_max_connections = Gauge( 

211 "http_pool_max_connections", 

212 "Maximum allowed HTTP connections in the pool", 

213 registry=REGISTRY, 

214 ) 

215 except ValueError: 

216 http_pool_max_connections = _get_registry_collector("http_pool_max_connections") 

217 

218 http_pool_max_keepalive = _get_registry_collector("http_pool_max_keepalive_connections") 

219 if http_pool_max_keepalive is None: 

220 try: 

221 http_pool_max_keepalive = Gauge( 

222 "http_pool_max_keepalive_connections", 

223 "Maximum idle keepalive connections to retain", 

224 registry=REGISTRY, 

225 ) 

226 except ValueError: 

227 http_pool_max_keepalive = _get_registry_collector("http_pool_max_keepalive_connections") 

228 

229 # Store update function as a module-level attribute so it can be called 

230 # from the application lifespan after SharedHttpClient is initialized 

231 def update_http_pool_metrics(): 

232 """Update HTTP connection pool metrics from SharedHttpClient stats.""" 

233 try: 

234 # First-Party 

235 from mcpgateway.services.http_client_service import SharedHttpClient # pylint: disable=import-outside-toplevel 

236 

237 # Only update if client is initialized 

238 if SharedHttpClient._instance and SharedHttpClient._instance._initialized: # pylint: disable=protected-access 

239 stats = SharedHttpClient._instance.get_pool_stats() # pylint: disable=protected-access 

240 if http_pool_max_connections is not None: 

241 http_pool_max_connections.set(stats.get("max_connections", 0)) 

242 if http_pool_max_keepalive is not None: 

243 http_pool_max_keepalive.set(stats.get("max_keepalive", 0)) 

244 # Note: httpx doesn't expose current connection count, only limits 

245 except Exception: # nosec B110 

246 pass # Silently skip if client not initialized or error occurs 

247 

248 # Make the update function available at module level for lifespan calls 

249 app.state.update_http_pool_metrics = update_http_pool_metrics 

250 

251 # Create instrumentator instance 

252 instrumentator = Instrumentator( 

253 should_group_status_codes=False, 

254 should_ignore_untemplated=True, 

255 excluded_handlers=[re.compile(p) for p in excluded], 

256 ) 

257 

258 # Instrument FastAPI app 

259 instrumentator.instrument(app) 

260 

261 # Expose Prometheus metrics at /metrics/prometheus with auth. 

262 # We define the endpoint manually (instead of instrumentator.expose) 

263 # so we can gate it behind require_auth. 

264 # First-Party 

265 from mcpgateway.utils.verify_credentials import require_auth 

266 

267 @app.get("/metrics/prometheus", include_in_schema=True, tags=["Metrics"]) 

268 def prometheus_metrics(request: Request, _user=Depends(require_auth)): 

269 """Prometheus metrics endpoint (requires authentication). 

270 

271 Args: 

272 request: The incoming HTTP request (used to check Accept-Encoding). 

273 _user: Authenticated user from require_auth dependency. 

274 

275 Returns: 

276 Response: Prometheus metrics in text exposition format. 

277 """ 

278 registry = REGISTRY 

279 if "PROMETHEUS_MULTIPROC_DIR" in os.environ: 

280 # Third-Party 

281 from prometheus_client import CollectorRegistry, multiprocess 

282 

283 registry = CollectorRegistry() 

284 multiprocess.MultiProcessCollector(registry) 

285 if "gzip" in request.headers.get("Accept-Encoding", ""): 

286 resp = Response(content=gzip.compress(generate_latest(registry))) 

287 resp.headers["Content-Type"] = CONTENT_TYPE_LATEST 

288 resp.headers["Content-Encoding"] = "gzip" 

289 else: 

290 resp = Response(content=generate_latest(registry)) 

291 resp.headers["Content-Type"] = CONTENT_TYPE_LATEST 

292 return resp 

293 

294 print("✅ Metrics instrumentation enabled") 

295 else: 

296 print("⚠️ Metrics instrumentation disabled") 

297 

298 # First-Party 

299 from mcpgateway.utils.verify_credentials import require_auth 

300 

301 @app.get("/metrics/prometheus", tags=["Metrics"]) 

302 async def metrics_disabled(_user=Depends(require_auth)): # pylint: disable=unused-argument 

303 """Returns 503 when metrics collection is disabled (requires authentication). 

304 

305 Args: 

306 _user: Authenticated user from require_auth dependency. 

307 

308 Returns: 

309 Response: HTTP 503 response indicating metrics are disabled. 

310 """ 

311 return Response(content='{"error": "Metrics collection is disabled"}', media_type="application/json", status_code=status.HTTP_503_SERVICE_UNAVAILABLE)