Coverage for mcpgateway / cache / metrics_cache.py: 100%
67 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Metrics aggregation cache for reducing database load.
4This module provides in-memory caching for metrics aggregation queries
5with optional Redis support for distributed deployments.
7The cache uses double-checked locking for thread safety and supports
8configurable TTL with automatic expiration.
10See GitHub Issue #1734 for details.
11"""
13# Future
14from __future__ import annotations
16# Standard
17import logging
18import threading
19import time
20from typing import Any, Dict, Optional
22logger = logging.getLogger(__name__)
25class MetricsCache:
26 """Thread-safe in-memory cache for metrics aggregation results.
28 Uses double-checked locking to minimize lock contention while
29 ensuring thread safety. Supports separate caches for different
30 metric types (tools, resources, prompts, servers, a2a).
32 Attributes:
33 ttl_seconds: Time-to-live for cached entries in seconds.
35 Examples:
36 >>> cache = MetricsCache(ttl_seconds=10)
37 >>> cache.get("tools") is None
38 True
39 >>> cache.set("tools", {"total": 100, "successful": 90})
40 >>> cache.get("tools")
41 {'total': 100, 'successful': 90}
42 >>> cache.invalidate("tools")
43 >>> cache.get("tools") is None
44 True
45 """
47 _NOT_CACHED = object() # Sentinel to distinguish "not cached" from "cached None"
49 def __init__(self, ttl_seconds: int = 10) -> None:
50 """Initialize the metrics cache.
52 Args:
53 ttl_seconds: Time-to-live for cached entries. Defaults to 10 seconds.
54 """
55 self._caches: Dict[str, Any] = {}
56 self._expiries: Dict[str, float] = {}
57 self._lock = threading.Lock()
58 self._ttl_seconds = ttl_seconds
59 self._hit_count = 0
60 self._miss_count = 0
62 def get(self, metric_type: str) -> Optional[Dict[str, Any]]:
63 """Get cached metrics for a specific type.
65 Uses double-checked locking for thread safety with minimal
66 lock contention on cache hits.
68 Args:
69 metric_type: Type of metrics (tools, resources, prompts, servers, a2a).
71 Returns:
72 Cached metrics dictionary if valid, None if expired or not cached.
74 Examples:
75 >>> cache = MetricsCache()
76 >>> cache.get("tools") is None
77 True
78 >>> cache.set("tools", {"total": 50})
79 >>> cache.get("tools")
80 {'total': 50}
81 """
82 now = time.time()
84 # Fast path: check without lock
85 cached = self._caches.get(metric_type, self._NOT_CACHED)
86 expiry = self._expiries.get(metric_type, 0)
88 if cached is not self._NOT_CACHED and now < expiry:
89 self._hit_count += 1
90 return cached
92 # Cache miss or expired
93 self._miss_count += 1
94 return None
96 def set(self, metric_type: str, value: Dict[str, Any]) -> None:
97 """Set cached metrics for a specific type.
99 Args:
100 metric_type: Type of metrics (tools, resources, prompts, servers, a2a).
101 value: Metrics dictionary to cache.
103 Examples:
104 >>> cache = MetricsCache(ttl_seconds=60)
105 >>> cache.set("tools", {"total": 100, "successful": 95})
106 >>> cache.get("tools")
107 {'total': 100, 'successful': 95}
108 """
109 with self._lock:
110 self._caches[metric_type] = value
111 self._expiries[metric_type] = time.time() + self._ttl_seconds
113 def invalidate(self, metric_type: Optional[str] = None) -> None:
114 """Invalidate cached metrics.
116 Args:
117 metric_type: Specific type to invalidate, or None to invalidate all.
119 Examples:
120 >>> cache = MetricsCache()
121 >>> cache.set("tools", {"total": 100})
122 >>> cache.set("resources", {"total": 50})
123 >>> cache.invalidate("tools")
124 >>> cache.get("tools") is None
125 True
126 >>> cache.get("resources") is not None
127 True
128 >>> cache.invalidate() # Invalidate all
129 >>> cache.get("resources") is None
130 True
131 """
132 with self._lock:
133 if metric_type is None:
134 self._caches.clear()
135 self._expiries.clear()
136 logger.debug("Invalidated all metrics caches")
137 else:
138 self._caches.pop(metric_type, None)
139 self._expiries.pop(metric_type, None)
140 logger.debug(f"Invalidated metrics cache for: {metric_type}")
142 def invalidate_prefix(self, prefix: str) -> None:
143 """Invalidate all cached metrics with keys starting with prefix.
145 Args:
146 prefix: Key prefix to match for invalidation.
148 Examples:
149 >>> cache = MetricsCache()
150 >>> cache.set("top_tools:5", [{"id": "1"}])
151 >>> cache.set("top_tools:10", [{"id": "2"}])
152 >>> cache.set("tools", {"total": 100})
153 >>> cache.invalidate_prefix("top_tools:")
154 >>> cache.get("top_tools:5") is None
155 True
156 >>> cache.get("top_tools:10") is None
157 True
158 >>> cache.get("tools") is not None
159 True
160 """
161 with self._lock:
162 keys_to_remove = [k for k in self._caches if k.startswith(prefix)]
163 for key in keys_to_remove:
164 self._caches.pop(key, None)
165 self._expiries.pop(key, None)
166 if keys_to_remove:
167 logger.debug(f"Invalidated {len(keys_to_remove)} metrics cache entries with prefix: {prefix}")
169 def stats(self) -> Dict[str, Any]:
170 """Get cache statistics.
172 Returns:
173 Dictionary containing hit_count, miss_count, hit_rate,
174 cached_types, and ttl_seconds.
176 Examples:
177 >>> cache = MetricsCache()
178 >>> cache.set("tools", {"total": 100})
179 >>> _ = cache.get("tools") # Hit
180 >>> _ = cache.get("tools") # Hit
181 >>> _ = cache.get("missing") # Miss
182 >>> stats = cache.stats()
183 >>> stats["hit_count"]
184 2
185 >>> stats["miss_count"]
186 1
187 """
188 total = self._hit_count + self._miss_count
189 now = time.time()
190 cached_types = [k for k, v in self._caches.items() if v is not self._NOT_CACHED and self._expiries.get(k, 0) > now]
191 return {
192 "hit_count": self._hit_count,
193 "miss_count": self._miss_count,
194 "hit_rate": self._hit_count / total if total > 0 else 0.0,
195 "cached_types": cached_types,
196 "ttl_seconds": self._ttl_seconds,
197 }
199 def reset_stats(self) -> None:
200 """Reset hit/miss counters.
202 Examples:
203 >>> cache = MetricsCache()
204 >>> cache.set("tools", {"total": 100})
205 >>> _ = cache.get("tools")
206 >>> cache.stats()["hit_count"]
207 1
208 >>> cache.reset_stats()
209 >>> cache.stats()["hit_count"]
210 0
211 """
212 self._hit_count = 0
213 self._miss_count = 0
216def _create_metrics_cache() -> MetricsCache:
217 """Create the metrics cache with settings from configuration.
219 Returns:
220 MetricsCache instance configured with TTL from settings.
221 """
222 try:
223 # First-Party
224 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel
226 ttl = getattr(settings, "metrics_cache_ttl_seconds", 10)
227 except ImportError:
228 ttl = 10
229 return MetricsCache(ttl_seconds=ttl)
232def is_cache_enabled() -> bool:
233 """Check if metrics caching is enabled in configuration.
235 Returns:
236 True if caching is enabled, False otherwise.
237 """
238 try:
239 # First-Party
240 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel
242 return getattr(settings, "metrics_cache_enabled", True)
243 except ImportError:
244 return True
247# Global singleton instance with configurable TTL
248# This is appropriate for metrics which are read frequently but
249# don't need to be perfectly real-time
250metrics_cache = _create_metrics_cache()