Coverage for mcpgateway / services / performance_tracker.py: 98%

124 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/performance_tracker.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5 

6Performance Tracking Service. 

7 

8This module provides performance tracking and analytics for all operations 

9across the MCP Gateway, enabling identification of bottlenecks and 

10optimization opportunities. 

11""" 

12 

13# Standard 

14from collections import defaultdict, deque, OrderedDict 

15from contextlib import contextmanager 

16import logging 

17import statistics 

18import time 

19from typing import Any, Dict, Generator, Optional, Tuple 

20 

21# First-Party 

22from mcpgateway.config import settings 

23from mcpgateway.utils.correlation_id import get_correlation_id 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28class PerformanceTracker: 

29 """Tracks and analyzes performance metrics across requests. 

30 

31 Provides context managers for tracking operation timing, 

32 aggregation of metrics, and threshold-based alerting. 

33 

34 Uses version-based caching for performance summaries: 

35 - Per-operation versions track changes to specific operations 

36 - Global version tracks any change (for "all operations" summaries) 

37 - Cache entries store the version at computation time 

38 - Entries are valid only if versions match (no TTL-based expiry) 

39 

40 Note: Internal state (_operation_timings, _op_version, etc.) should not be 

41 accessed directly. Use record_timing() or track_operation() to add data. 

42 """ 

43 

44 # Sentinel for "all operations" cache key 

45 _ALL_OPERATIONS_KEY = "__all__" 

46 

47 # Maximum cache entries to prevent unbounded growth with varying min_samples 

48 _MAX_CACHE_ENTRIES = 64 

49 

50 def __init__(self): 

51 """Initialize performance tracker.""" 

52 # Max buffer size per operation type - must be set before creating deque factory 

53 self.max_samples = getattr(settings, "perf_max_samples_per_operation", 1000) 

54 

55 # Use deque with maxlen for O(1) automatic eviction instead of O(n) pop(0) 

56 # Private to ensure all mutations go through record_timing/track_operation (version tracking) 

57 self._operation_timings: Dict[str, deque[float]] = defaultdict(lambda: deque(maxlen=self.max_samples)) 

58 

59 # Performance thresholds (seconds) from settings or defaults 

60 self.performance_thresholds = { 

61 "database_query": getattr(settings, "perf_threshold_database_query", 0.1), 

62 "tool_invocation": getattr(settings, "perf_threshold_tool_invocation", 2.0), 

63 "authentication": getattr(settings, "perf_threshold_authentication", 0.5), 

64 "cache_operation": getattr(settings, "perf_threshold_cache_operation", 0.01), 

65 "a2a_task": getattr(settings, "perf_threshold_a2a_task", 5.0), 

66 "request_total": getattr(settings, "perf_threshold_request_total", 10.0), 

67 "resource_fetch": getattr(settings, "perf_threshold_resource_fetch", 1.0), 

68 "prompt_processing": getattr(settings, "perf_threshold_prompt_processing", 0.5), 

69 } 

70 

71 # Version counters for cache invalidation 

72 self._op_version: Dict[str, int] = defaultdict(int) # Per-operation version 

73 self._global_version: int = 0 # Incremented on any mutation 

74 

75 # Summary cache: key=(operation_name, min_samples), value=(version, summary_dict) 

76 # For specific ops: version is op_version; for all ops: version is global_version 

77 self._summary_cache: OrderedDict[Tuple[str, int], Tuple[int, Dict[str, Any]]] = OrderedDict() 

78 

79 def _increment_version(self, operation_name: Optional[str] = None) -> None: 

80 """Increment version counters to invalidate cached summaries. 

81 

82 Args: 

83 operation_name: Specific operation that changed. If None, only increments global version. 

84 """ 

85 self._global_version += 1 

86 if operation_name: 86 ↛ exitline 86 didn't return from function '_increment_version' because the condition on line 86 was always true

87 self._op_version[operation_name] += 1 

88 

89 @contextmanager 

90 def track_operation(self, operation_name: str, component: Optional[str] = None, log_slow: bool = True, extra_context: Optional[Dict[str, Any]] = None) -> Generator[None, None, None]: 

91 """Context manager to track operation performance. 

92 

93 Args: 

94 operation_name: Name of the operation being tracked 

95 component: Component/module name for context 

96 log_slow: Whether to log operations exceeding thresholds 

97 extra_context: Additional context to include in logs 

98 

99 Yields: 

100 None 

101 

102 Raises: 

103 Exception: Any exception from the tracked operation is re-raised 

104 

105 Example: 

106 >>> tracker = PerformanceTracker() 

107 >>> with tracker.track_operation("database_query", component="tool_service"): 

108 ... # Perform database operation 

109 ... pass 

110 """ 

111 start_time = time.time() 

112 correlation_id = get_correlation_id() 

113 error_occurred = False 

114 

115 try: 

116 yield 

117 except Exception: 

118 error_occurred = True 

119 raise 

120 finally: 

121 duration = time.time() - start_time 

122 

123 # Record timing (deque automatically evicts oldest when at maxlen) 

124 self._operation_timings[operation_name].append(duration) 

125 

126 # Increment version to invalidate cached summaries 

127 self._increment_version(operation_name) 

128 

129 # Check threshold and log if needed 

130 threshold = self.performance_thresholds.get(operation_name, float("inf")) 

131 threshold_exceeded = duration > threshold 

132 

133 if log_slow and threshold_exceeded: 

134 context = { 

135 "operation": operation_name, 

136 "duration_ms": duration * 1000, 

137 "threshold_ms": threshold * 1000, 

138 "exceeded_by_ms": (duration - threshold) * 1000, 

139 "component": component, 

140 "correlation_id": correlation_id, 

141 "error_occurred": error_occurred, 

142 } 

143 if extra_context: 143 ↛ 146line 143 didn't jump to line 146 because the condition on line 143 was always true

144 context.update(extra_context) 

145 

146 logger.warning(f"Slow operation detected: {operation_name} took {duration*1000:.2f}ms " f"(threshold: {threshold*1000:.2f}ms)", extra=context) 

147 

148 def record_timing(self, operation_name: str, duration: float, component: Optional[str] = None, extra_context: Optional[Dict[str, Any]] = None) -> None: 

149 """Manually record a timing measurement. 

150 

151 Args: 

152 operation_name: Name of the operation 

153 duration: Duration in seconds 

154 component: Component/module name 

155 extra_context: Additional context 

156 """ 

157 # Record timing (deque automatically evicts oldest when at maxlen) 

158 self._operation_timings[operation_name].append(duration) 

159 

160 # Increment version to invalidate cached summaries 

161 self._increment_version(operation_name) 

162 

163 # Check threshold 

164 threshold = self.performance_thresholds.get(operation_name, float("inf")) 

165 if duration > threshold: 

166 context = { 

167 "operation": operation_name, 

168 "duration_ms": duration * 1000, 

169 "threshold_ms": threshold * 1000, 

170 "component": component, 

171 "correlation_id": get_correlation_id(), 

172 } 

173 if extra_context: 

174 context.update(extra_context) 

175 

176 logger.warning(f"Slow operation: {operation_name} took {duration*1000:.2f}ms", extra=context) 

177 

178 def get_performance_summary(self, operation_name: Optional[str] = None, min_samples: int = 1) -> Dict[str, Any]: 

179 """Get performance summary for analytics. 

180 

181 Args: 

182 operation_name: Specific operation to summarize (None for all) 

183 min_samples: Minimum samples required to include in summary 

184 

185 Returns: 

186 Dictionary containing performance statistics 

187 

188 Example: 

189 >>> tracker = PerformanceTracker() 

190 >>> summary = tracker.get_performance_summary() 

191 >>> isinstance(summary, dict) 

192 True 

193 """ 

194 # Determine if we're summarizing a specific operation or all operations 

195 # Normalize cache key: use _ALL_OPERATIONS_KEY if operation doesn't exist or None was passed 

196 is_specific_op = operation_name and operation_name in self._operation_timings 

197 cache_key = (operation_name if is_specific_op else self._ALL_OPERATIONS_KEY, min_samples) 

198 

199 # Get current version for cache validation 

200 current_version = self._op_version[operation_name] if is_specific_op else self._global_version 

201 

202 # Check cache - valid if version matches 

203 if cache_key in self._summary_cache: 

204 cached_version, cached_summary = self._summary_cache[cache_key] 

205 if cached_version == current_version: 

206 # Mark as recently used and return a copy to prevent external mutation 

207 self._summary_cache.move_to_end(cache_key) 

208 return {k: dict(v) for k, v in cached_summary.items()} 

209 

210 # Compute summary 

211 summary = {} 

212 

213 operations = {operation_name: self._operation_timings[operation_name]} if is_specific_op else self._operation_timings 

214 

215 for op_name, timings in operations.items(): 

216 if len(timings) < min_samples: 

217 continue 

218 

219 # Calculate percentiles 

220 sorted_timings = sorted(timings) 

221 count = len(sorted_timings) 

222 

223 def percentile(p: float, *, sorted_vals=sorted_timings, n=count) -> float: 

224 """Calculate percentile value. 

225 

226 Args: 

227 p: Percentile to calculate (0.0 to 1.0) 

228 sorted_vals: Sorted list of values 

229 n: Number of values 

230 

231 Returns: 

232 float: Calculated percentile value 

233 """ 

234 k = (n - 1) * p 

235 f = int(k) 

236 c = k - f 

237 if f + 1 < n: 

238 return sorted_vals[f] * (1 - c) + sorted_vals[f + 1] * c 

239 return sorted_vals[f] 

240 

241 summary[op_name] = { 

242 "count": count, 

243 "avg_duration_ms": statistics.mean(timings) * 1000, 

244 "min_duration_ms": min(timings) * 1000, 

245 "max_duration_ms": max(timings) * 1000, 

246 "p50_duration_ms": percentile(0.5) * 1000, 

247 "p95_duration_ms": percentile(0.95) * 1000, 

248 "p99_duration_ms": percentile(0.99) * 1000, 

249 "threshold_ms": self.performance_thresholds.get(op_name, float("inf")) * 1000, 

250 "threshold_violations": sum(1 for t in timings if t > self.performance_thresholds.get(op_name, float("inf"))), 

251 "violation_rate": sum(1 for t in timings if t > self.performance_thresholds.get(op_name, float("inf"))) / count, 

252 } 

253 

254 # Store a copy in cache with current version 

255 # Only evict if adding a new key (not updating existing) and at capacity (LRU) 

256 if cache_key not in self._summary_cache and len(self._summary_cache) >= self._MAX_CACHE_ENTRIES: 

257 # Remove least-recently-used entry 

258 try: 

259 self._summary_cache.popitem(last=False) 

260 except (StopIteration, KeyError): 

261 pass 

262 

263 self._summary_cache[cache_key] = (current_version, {k: dict(v) for k, v in summary.items()}) 

264 self._summary_cache.move_to_end(cache_key) 

265 

266 return summary 

267 

268 def get_operation_stats(self, operation_name: str) -> Optional[Dict[str, Any]]: 

269 """Get statistics for a specific operation. 

270 

271 Args: 

272 operation_name: Name of the operation 

273 

274 Returns: 

275 Statistics dictionary or None if no data 

276 """ 

277 if operation_name not in self._operation_timings: 

278 return None 

279 

280 timings = self._operation_timings[operation_name] 

281 if not timings: 

282 return None 

283 

284 return { 

285 "operation": operation_name, 

286 "sample_count": len(timings), 

287 "avg_duration_ms": statistics.mean(timings) * 1000, 

288 "min_duration_ms": min(timings) * 1000, 

289 "max_duration_ms": max(timings) * 1000, 

290 "total_time_ms": sum(timings) * 1000, 

291 "threshold_ms": self.performance_thresholds.get(operation_name, float("inf")) * 1000, 

292 } 

293 

294 def clear_stats(self, operation_name: Optional[str] = None) -> None: 

295 """Clear performance statistics. 

296 

297 Args: 

298 operation_name: Specific operation to clear (None for all) 

299 """ 

300 if operation_name: 

301 if operation_name in self._operation_timings: 301 ↛ 304line 301 didn't jump to line 304 because the condition on line 301 was always true

302 self._operation_timings[operation_name].clear() 

303 # Increment version to invalidate cached summaries 

304 self._increment_version(operation_name) 

305 else: 

306 self._operation_timings.clear() 

307 # Clear all version tracking and cache on full reset 

308 self._global_version += 1 

309 self._op_version.clear() 

310 self._summary_cache.clear() 

311 

312 def set_threshold(self, operation_name: str, threshold_seconds: float) -> None: 

313 """Set or update performance threshold for an operation. 

314 

315 Args: 

316 operation_name: Name of the operation 

317 threshold_seconds: Threshold in seconds 

318 """ 

319 self.performance_thresholds[operation_name] = threshold_seconds 

320 

321 # Increment version (threshold affects violation stats in summaries) 

322 self._increment_version(operation_name) 

323 

324 def check_performance_degradation(self, operation_name: str, baseline_multiplier: float = 2.0) -> Dict[str, Any]: 

325 """Check if performance has degraded compared to baseline. 

326 

327 Args: 

328 operation_name: Name of the operation to check 

329 baseline_multiplier: Multiplier for degradation detection 

330 

331 Returns: 

332 Dictionary with degradation analysis 

333 """ 

334 if operation_name not in self._operation_timings: 

335 return {"degraded": False, "reason": "no_data"} 

336 

337 timings = self._operation_timings[operation_name] 

338 if len(timings) < 10: 

339 return {"degraded": False, "reason": "insufficient_samples"} 

340 

341 # Compare recent timings to overall average 

342 # Convert deque to list for slicing operations 

343 timings_list = list(timings) 

344 recent_count = min(10, len(timings_list)) 

345 recent_timings = timings_list[-recent_count:] 

346 # If we don't have more than the "recent" window worth of samples, we don't have a historical baseline. 

347 historical_timings = timings_list[:-recent_count] if len(timings_list) > recent_count else [] 

348 

349 if not historical_timings: 

350 return {"degraded": False, "reason": "insufficient_historical_data"} 

351 

352 recent_avg = statistics.mean(recent_timings) 

353 historical_avg = statistics.mean(historical_timings) 

354 

355 degraded = recent_avg > (historical_avg * baseline_multiplier) 

356 

357 return { 

358 "degraded": degraded, 

359 "recent_avg_ms": recent_avg * 1000, 

360 "historical_avg_ms": historical_avg * 1000, 

361 "multiplier": recent_avg / historical_avg if historical_avg > 0 else 0, 

362 "threshold_multiplier": baseline_multiplier, 

363 } 

364 

365 

366# Global performance tracker instance 

367_performance_tracker: Optional[PerformanceTracker] = None 

368 

369 

370def get_performance_tracker() -> PerformanceTracker: 

371 """Get or create the global performance tracker instance. 

372 

373 Returns: 

374 Global PerformanceTracker instance 

375 """ 

376 global _performance_tracker # pylint: disable=global-statement 

377 if _performance_tracker is None: 

378 _performance_tracker = PerformanceTracker() 

379 return _performance_tracker