Coverage for mcpgateway / services / performance_tracker.py: 98%
124 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/performance_tracker.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
6Performance Tracking Service.
8This module provides performance tracking and analytics for all operations
9across the MCP Gateway, enabling identification of bottlenecks and
10optimization opportunities.
11"""
13# Standard
14from collections import defaultdict, deque, OrderedDict
15from contextlib import contextmanager
16import logging
17import statistics
18import time
19from typing import Any, Dict, Generator, Optional, Tuple
21# First-Party
22from mcpgateway.config import settings
23from mcpgateway.utils.correlation_id import get_correlation_id
25logger = logging.getLogger(__name__)
28class PerformanceTracker:
29 """Tracks and analyzes performance metrics across requests.
31 Provides context managers for tracking operation timing,
32 aggregation of metrics, and threshold-based alerting.
34 Uses version-based caching for performance summaries:
35 - Per-operation versions track changes to specific operations
36 - Global version tracks any change (for "all operations" summaries)
37 - Cache entries store the version at computation time
38 - Entries are valid only if versions match (no TTL-based expiry)
40 Note: Internal state (_operation_timings, _op_version, etc.) should not be
41 accessed directly. Use record_timing() or track_operation() to add data.
42 """
44 # Sentinel for "all operations" cache key
45 _ALL_OPERATIONS_KEY = "__all__"
47 # Maximum cache entries to prevent unbounded growth with varying min_samples
48 _MAX_CACHE_ENTRIES = 64
50 def __init__(self):
51 """Initialize performance tracker."""
52 # Max buffer size per operation type - must be set before creating deque factory
53 self.max_samples = getattr(settings, "perf_max_samples_per_operation", 1000)
55 # Use deque with maxlen for O(1) automatic eviction instead of O(n) pop(0)
56 # Private to ensure all mutations go through record_timing/track_operation (version tracking)
57 self._operation_timings: Dict[str, deque[float]] = defaultdict(lambda: deque(maxlen=self.max_samples))
59 # Performance thresholds (seconds) from settings or defaults
60 self.performance_thresholds = {
61 "database_query": getattr(settings, "perf_threshold_database_query", 0.1),
62 "tool_invocation": getattr(settings, "perf_threshold_tool_invocation", 2.0),
63 "authentication": getattr(settings, "perf_threshold_authentication", 0.5),
64 "cache_operation": getattr(settings, "perf_threshold_cache_operation", 0.01),
65 "a2a_task": getattr(settings, "perf_threshold_a2a_task", 5.0),
66 "request_total": getattr(settings, "perf_threshold_request_total", 10.0),
67 "resource_fetch": getattr(settings, "perf_threshold_resource_fetch", 1.0),
68 "prompt_processing": getattr(settings, "perf_threshold_prompt_processing", 0.5),
69 }
71 # Version counters for cache invalidation
72 self._op_version: Dict[str, int] = defaultdict(int) # Per-operation version
73 self._global_version: int = 0 # Incremented on any mutation
75 # Summary cache: key=(operation_name, min_samples), value=(version, summary_dict)
76 # For specific ops: version is op_version; for all ops: version is global_version
77 self._summary_cache: OrderedDict[Tuple[str, int], Tuple[int, Dict[str, Any]]] = OrderedDict()
79 def _increment_version(self, operation_name: Optional[str] = None) -> None:
80 """Increment version counters to invalidate cached summaries.
82 Args:
83 operation_name: Specific operation that changed. If None, only increments global version.
84 """
85 self._global_version += 1
86 if operation_name: 86 ↛ exitline 86 didn't return from function '_increment_version' because the condition on line 86 was always true
87 self._op_version[operation_name] += 1
89 @contextmanager
90 def track_operation(self, operation_name: str, component: Optional[str] = None, log_slow: bool = True, extra_context: Optional[Dict[str, Any]] = None) -> Generator[None, None, None]:
91 """Context manager to track operation performance.
93 Args:
94 operation_name: Name of the operation being tracked
95 component: Component/module name for context
96 log_slow: Whether to log operations exceeding thresholds
97 extra_context: Additional context to include in logs
99 Yields:
100 None
102 Raises:
103 Exception: Any exception from the tracked operation is re-raised
105 Example:
106 >>> tracker = PerformanceTracker()
107 >>> with tracker.track_operation("database_query", component="tool_service"):
108 ... # Perform database operation
109 ... pass
110 """
111 start_time = time.time()
112 correlation_id = get_correlation_id()
113 error_occurred = False
115 try:
116 yield
117 except Exception:
118 error_occurred = True
119 raise
120 finally:
121 duration = time.time() - start_time
123 # Record timing (deque automatically evicts oldest when at maxlen)
124 self._operation_timings[operation_name].append(duration)
126 # Increment version to invalidate cached summaries
127 self._increment_version(operation_name)
129 # Check threshold and log if needed
130 threshold = self.performance_thresholds.get(operation_name, float("inf"))
131 threshold_exceeded = duration > threshold
133 if log_slow and threshold_exceeded:
134 context = {
135 "operation": operation_name,
136 "duration_ms": duration * 1000,
137 "threshold_ms": threshold * 1000,
138 "exceeded_by_ms": (duration - threshold) * 1000,
139 "component": component,
140 "correlation_id": correlation_id,
141 "error_occurred": error_occurred,
142 }
143 if extra_context: 143 ↛ 146line 143 didn't jump to line 146 because the condition on line 143 was always true
144 context.update(extra_context)
146 logger.warning(f"Slow operation detected: {operation_name} took {duration*1000:.2f}ms " f"(threshold: {threshold*1000:.2f}ms)", extra=context)
148 def record_timing(self, operation_name: str, duration: float, component: Optional[str] = None, extra_context: Optional[Dict[str, Any]] = None) -> None:
149 """Manually record a timing measurement.
151 Args:
152 operation_name: Name of the operation
153 duration: Duration in seconds
154 component: Component/module name
155 extra_context: Additional context
156 """
157 # Record timing (deque automatically evicts oldest when at maxlen)
158 self._operation_timings[operation_name].append(duration)
160 # Increment version to invalidate cached summaries
161 self._increment_version(operation_name)
163 # Check threshold
164 threshold = self.performance_thresholds.get(operation_name, float("inf"))
165 if duration > threshold:
166 context = {
167 "operation": operation_name,
168 "duration_ms": duration * 1000,
169 "threshold_ms": threshold * 1000,
170 "component": component,
171 "correlation_id": get_correlation_id(),
172 }
173 if extra_context:
174 context.update(extra_context)
176 logger.warning(f"Slow operation: {operation_name} took {duration*1000:.2f}ms", extra=context)
178 def get_performance_summary(self, operation_name: Optional[str] = None, min_samples: int = 1) -> Dict[str, Any]:
179 """Get performance summary for analytics.
181 Args:
182 operation_name: Specific operation to summarize (None for all)
183 min_samples: Minimum samples required to include in summary
185 Returns:
186 Dictionary containing performance statistics
188 Example:
189 >>> tracker = PerformanceTracker()
190 >>> summary = tracker.get_performance_summary()
191 >>> isinstance(summary, dict)
192 True
193 """
194 # Determine if we're summarizing a specific operation or all operations
195 # Normalize cache key: use _ALL_OPERATIONS_KEY if operation doesn't exist or None was passed
196 is_specific_op = operation_name and operation_name in self._operation_timings
197 cache_key = (operation_name if is_specific_op else self._ALL_OPERATIONS_KEY, min_samples)
199 # Get current version for cache validation
200 current_version = self._op_version[operation_name] if is_specific_op else self._global_version
202 # Check cache - valid if version matches
203 if cache_key in self._summary_cache:
204 cached_version, cached_summary = self._summary_cache[cache_key]
205 if cached_version == current_version:
206 # Mark as recently used and return a copy to prevent external mutation
207 self._summary_cache.move_to_end(cache_key)
208 return {k: dict(v) for k, v in cached_summary.items()}
210 # Compute summary
211 summary = {}
213 operations = {operation_name: self._operation_timings[operation_name]} if is_specific_op else self._operation_timings
215 for op_name, timings in operations.items():
216 if len(timings) < min_samples:
217 continue
219 # Calculate percentiles
220 sorted_timings = sorted(timings)
221 count = len(sorted_timings)
223 def percentile(p: float, *, sorted_vals=sorted_timings, n=count) -> float:
224 """Calculate percentile value.
226 Args:
227 p: Percentile to calculate (0.0 to 1.0)
228 sorted_vals: Sorted list of values
229 n: Number of values
231 Returns:
232 float: Calculated percentile value
233 """
234 k = (n - 1) * p
235 f = int(k)
236 c = k - f
237 if f + 1 < n:
238 return sorted_vals[f] * (1 - c) + sorted_vals[f + 1] * c
239 return sorted_vals[f]
241 summary[op_name] = {
242 "count": count,
243 "avg_duration_ms": statistics.mean(timings) * 1000,
244 "min_duration_ms": min(timings) * 1000,
245 "max_duration_ms": max(timings) * 1000,
246 "p50_duration_ms": percentile(0.5) * 1000,
247 "p95_duration_ms": percentile(0.95) * 1000,
248 "p99_duration_ms": percentile(0.99) * 1000,
249 "threshold_ms": self.performance_thresholds.get(op_name, float("inf")) * 1000,
250 "threshold_violations": sum(1 for t in timings if t > self.performance_thresholds.get(op_name, float("inf"))),
251 "violation_rate": sum(1 for t in timings if t > self.performance_thresholds.get(op_name, float("inf"))) / count,
252 }
254 # Store a copy in cache with current version
255 # Only evict if adding a new key (not updating existing) and at capacity (LRU)
256 if cache_key not in self._summary_cache and len(self._summary_cache) >= self._MAX_CACHE_ENTRIES:
257 # Remove least-recently-used entry
258 try:
259 self._summary_cache.popitem(last=False)
260 except (StopIteration, KeyError):
261 pass
263 self._summary_cache[cache_key] = (current_version, {k: dict(v) for k, v in summary.items()})
264 self._summary_cache.move_to_end(cache_key)
266 return summary
268 def get_operation_stats(self, operation_name: str) -> Optional[Dict[str, Any]]:
269 """Get statistics for a specific operation.
271 Args:
272 operation_name: Name of the operation
274 Returns:
275 Statistics dictionary or None if no data
276 """
277 if operation_name not in self._operation_timings:
278 return None
280 timings = self._operation_timings[operation_name]
281 if not timings:
282 return None
284 return {
285 "operation": operation_name,
286 "sample_count": len(timings),
287 "avg_duration_ms": statistics.mean(timings) * 1000,
288 "min_duration_ms": min(timings) * 1000,
289 "max_duration_ms": max(timings) * 1000,
290 "total_time_ms": sum(timings) * 1000,
291 "threshold_ms": self.performance_thresholds.get(operation_name, float("inf")) * 1000,
292 }
294 def clear_stats(self, operation_name: Optional[str] = None) -> None:
295 """Clear performance statistics.
297 Args:
298 operation_name: Specific operation to clear (None for all)
299 """
300 if operation_name:
301 if operation_name in self._operation_timings: 301 ↛ 304line 301 didn't jump to line 304 because the condition on line 301 was always true
302 self._operation_timings[operation_name].clear()
303 # Increment version to invalidate cached summaries
304 self._increment_version(operation_name)
305 else:
306 self._operation_timings.clear()
307 # Clear all version tracking and cache on full reset
308 self._global_version += 1
309 self._op_version.clear()
310 self._summary_cache.clear()
312 def set_threshold(self, operation_name: str, threshold_seconds: float) -> None:
313 """Set or update performance threshold for an operation.
315 Args:
316 operation_name: Name of the operation
317 threshold_seconds: Threshold in seconds
318 """
319 self.performance_thresholds[operation_name] = threshold_seconds
321 # Increment version (threshold affects violation stats in summaries)
322 self._increment_version(operation_name)
324 def check_performance_degradation(self, operation_name: str, baseline_multiplier: float = 2.0) -> Dict[str, Any]:
325 """Check if performance has degraded compared to baseline.
327 Args:
328 operation_name: Name of the operation to check
329 baseline_multiplier: Multiplier for degradation detection
331 Returns:
332 Dictionary with degradation analysis
333 """
334 if operation_name not in self._operation_timings:
335 return {"degraded": False, "reason": "no_data"}
337 timings = self._operation_timings[operation_name]
338 if len(timings) < 10:
339 return {"degraded": False, "reason": "insufficient_samples"}
341 # Compare recent timings to overall average
342 # Convert deque to list for slicing operations
343 timings_list = list(timings)
344 recent_count = min(10, len(timings_list))
345 recent_timings = timings_list[-recent_count:]
346 # If we don't have more than the "recent" window worth of samples, we don't have a historical baseline.
347 historical_timings = timings_list[:-recent_count] if len(timings_list) > recent_count else []
349 if not historical_timings:
350 return {"degraded": False, "reason": "insufficient_historical_data"}
352 recent_avg = statistics.mean(recent_timings)
353 historical_avg = statistics.mean(historical_timings)
355 degraded = recent_avg > (historical_avg * baseline_multiplier)
357 return {
358 "degraded": degraded,
359 "recent_avg_ms": recent_avg * 1000,
360 "historical_avg_ms": historical_avg * 1000,
361 "multiplier": recent_avg / historical_avg if historical_avg > 0 else 0,
362 "threshold_multiplier": baseline_multiplier,
363 }
366# Global performance tracker instance
367_performance_tracker: Optional[PerformanceTracker] = None
370def get_performance_tracker() -> PerformanceTracker:
371 """Get or create the global performance tracker instance.
373 Returns:
374 Global PerformanceTracker instance
375 """
376 global _performance_tracker # pylint: disable=global-statement
377 if _performance_tracker is None:
378 _performance_tracker = PerformanceTracker()
379 return _performance_tracker