Coverage for mcpgateway / cache / metrics_cache.py: 100%

67 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Metrics aggregation cache for reducing database load. 

3 

4This module provides in-memory caching for metrics aggregation queries 

5with optional Redis support for distributed deployments. 

6 

7The cache uses double-checked locking for thread safety and supports 

8configurable TTL with automatic expiration. 

9 

10See GitHub Issue #1734 for details. 

11""" 

12 

13# Future 

14from __future__ import annotations 

15 

16# Standard 

17import logging 

18import threading 

19import time 

20from typing import Any, Dict, Optional 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25class MetricsCache: 

26 """Thread-safe in-memory cache for metrics aggregation results. 

27 

28 Uses double-checked locking to minimize lock contention while 

29 ensuring thread safety. Supports separate caches for different 

30 metric types (tools, resources, prompts, servers, a2a). 

31 

32 Attributes: 

33 ttl_seconds: Time-to-live for cached entries in seconds. 

34 

35 Examples: 

36 >>> cache = MetricsCache(ttl_seconds=10) 

37 >>> cache.get("tools") is None 

38 True 

39 >>> cache.set("tools", {"total": 100, "successful": 90}) 

40 >>> cache.get("tools") 

41 {'total': 100, 'successful': 90} 

42 >>> cache.invalidate("tools") 

43 >>> cache.get("tools") is None 

44 True 

45 """ 

46 

47 _NOT_CACHED = object() # Sentinel to distinguish "not cached" from "cached None" 

48 

49 def __init__(self, ttl_seconds: int = 10) -> None: 

50 """Initialize the metrics cache. 

51 

52 Args: 

53 ttl_seconds: Time-to-live for cached entries. Defaults to 10 seconds. 

54 """ 

55 self._caches: Dict[str, Any] = {} 

56 self._expiries: Dict[str, float] = {} 

57 self._lock = threading.Lock() 

58 self._ttl_seconds = ttl_seconds 

59 self._hit_count = 0 

60 self._miss_count = 0 

61 

62 def get(self, metric_type: str) -> Optional[Dict[str, Any]]: 

63 """Get cached metrics for a specific type. 

64 

65 Uses double-checked locking for thread safety with minimal 

66 lock contention on cache hits. 

67 

68 Args: 

69 metric_type: Type of metrics (tools, resources, prompts, servers, a2a). 

70 

71 Returns: 

72 Cached metrics dictionary if valid, None if expired or not cached. 

73 

74 Examples: 

75 >>> cache = MetricsCache() 

76 >>> cache.get("tools") is None 

77 True 

78 >>> cache.set("tools", {"total": 50}) 

79 >>> cache.get("tools") 

80 {'total': 50} 

81 """ 

82 now = time.time() 

83 

84 # Fast path: check without lock 

85 cached = self._caches.get(metric_type, self._NOT_CACHED) 

86 expiry = self._expiries.get(metric_type, 0) 

87 

88 if cached is not self._NOT_CACHED and now < expiry: 

89 self._hit_count += 1 

90 return cached 

91 

92 # Cache miss or expired 

93 self._miss_count += 1 

94 return None 

95 

96 def set(self, metric_type: str, value: Dict[str, Any]) -> None: 

97 """Set cached metrics for a specific type. 

98 

99 Args: 

100 metric_type: Type of metrics (tools, resources, prompts, servers, a2a). 

101 value: Metrics dictionary to cache. 

102 

103 Examples: 

104 >>> cache = MetricsCache(ttl_seconds=60) 

105 >>> cache.set("tools", {"total": 100, "successful": 95}) 

106 >>> cache.get("tools") 

107 {'total': 100, 'successful': 95} 

108 """ 

109 with self._lock: 

110 self._caches[metric_type] = value 

111 self._expiries[metric_type] = time.time() + self._ttl_seconds 

112 

113 def invalidate(self, metric_type: Optional[str] = None) -> None: 

114 """Invalidate cached metrics. 

115 

116 Args: 

117 metric_type: Specific type to invalidate, or None to invalidate all. 

118 

119 Examples: 

120 >>> cache = MetricsCache() 

121 >>> cache.set("tools", {"total": 100}) 

122 >>> cache.set("resources", {"total": 50}) 

123 >>> cache.invalidate("tools") 

124 >>> cache.get("tools") is None 

125 True 

126 >>> cache.get("resources") is not None 

127 True 

128 >>> cache.invalidate() # Invalidate all 

129 >>> cache.get("resources") is None 

130 True 

131 """ 

132 with self._lock: 

133 if metric_type is None: 

134 self._caches.clear() 

135 self._expiries.clear() 

136 logger.debug("Invalidated all metrics caches") 

137 else: 

138 self._caches.pop(metric_type, None) 

139 self._expiries.pop(metric_type, None) 

140 logger.debug(f"Invalidated metrics cache for: {metric_type}") 

141 

142 def invalidate_prefix(self, prefix: str) -> None: 

143 """Invalidate all cached metrics with keys starting with prefix. 

144 

145 Args: 

146 prefix: Key prefix to match for invalidation. 

147 

148 Examples: 

149 >>> cache = MetricsCache() 

150 >>> cache.set("top_tools:5", [{"id": "1"}]) 

151 >>> cache.set("top_tools:10", [{"id": "2"}]) 

152 >>> cache.set("tools", {"total": 100}) 

153 >>> cache.invalidate_prefix("top_tools:") 

154 >>> cache.get("top_tools:5") is None 

155 True 

156 >>> cache.get("top_tools:10") is None 

157 True 

158 >>> cache.get("tools") is not None 

159 True 

160 """ 

161 with self._lock: 

162 keys_to_remove = [k for k in self._caches if k.startswith(prefix)] 

163 for key in keys_to_remove: 

164 self._caches.pop(key, None) 

165 self._expiries.pop(key, None) 

166 if keys_to_remove: 

167 logger.debug(f"Invalidated {len(keys_to_remove)} metrics cache entries with prefix: {prefix}") 

168 

169 def stats(self) -> Dict[str, Any]: 

170 """Get cache statistics. 

171 

172 Returns: 

173 Dictionary containing hit_count, miss_count, hit_rate, 

174 cached_types, and ttl_seconds. 

175 

176 Examples: 

177 >>> cache = MetricsCache() 

178 >>> cache.set("tools", {"total": 100}) 

179 >>> _ = cache.get("tools") # Hit 

180 >>> _ = cache.get("tools") # Hit 

181 >>> _ = cache.get("missing") # Miss 

182 >>> stats = cache.stats() 

183 >>> stats["hit_count"] 

184 2 

185 >>> stats["miss_count"] 

186 1 

187 """ 

188 total = self._hit_count + self._miss_count 

189 now = time.time() 

190 cached_types = [k for k, v in self._caches.items() if v is not self._NOT_CACHED and self._expiries.get(k, 0) > now] 

191 return { 

192 "hit_count": self._hit_count, 

193 "miss_count": self._miss_count, 

194 "hit_rate": self._hit_count / total if total > 0 else 0.0, 

195 "cached_types": cached_types, 

196 "ttl_seconds": self._ttl_seconds, 

197 } 

198 

199 def reset_stats(self) -> None: 

200 """Reset hit/miss counters. 

201 

202 Examples: 

203 >>> cache = MetricsCache() 

204 >>> cache.set("tools", {"total": 100}) 

205 >>> _ = cache.get("tools") 

206 >>> cache.stats()["hit_count"] 

207 1 

208 >>> cache.reset_stats() 

209 >>> cache.stats()["hit_count"] 

210 0 

211 """ 

212 self._hit_count = 0 

213 self._miss_count = 0 

214 

215 

216def _create_metrics_cache() -> MetricsCache: 

217 """Create the metrics cache with settings from configuration. 

218 

219 Returns: 

220 MetricsCache instance configured with TTL from settings. 

221 """ 

222 try: 

223 # First-Party 

224 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel 

225 

226 ttl = getattr(settings, "metrics_cache_ttl_seconds", 10) 

227 except ImportError: 

228 ttl = 10 

229 return MetricsCache(ttl_seconds=ttl) 

230 

231 

232def is_cache_enabled() -> bool: 

233 """Check if metrics caching is enabled in configuration. 

234 

235 Returns: 

236 True if caching is enabled, False otherwise. 

237 """ 

238 try: 

239 # First-Party 

240 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel 

241 

242 return getattr(settings, "metrics_cache_enabled", True) 

243 except ImportError: 

244 return True 

245 

246 

247# Global singleton instance with configurable TTL 

248# This is appropriate for metrics which are read frequently but 

249# don't need to be perfectly real-time 

250metrics_cache = _create_metrics_cache()