Coverage for mcpgateway / cache / resource_cache.py: 100%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/cache/resource_cache.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Resource Cache Implementation. 

8This module implements a simple in-memory cache with TTL expiration for caching 

9resource content in the MCP Gateway. Features: 

10- TTL-based expiration 

11- Maximum size limit with LRU eviction 

12- Thread-safe operations 

13 

14Examples: 

15 >>> from mcpgateway.cache.resource_cache import ResourceCache 

16 >>> from unittest.mock import patch 

17 >>> cache = ResourceCache(max_size=2, ttl=1) 

18 >>> cache.set('a', 1) 

19 >>> cache.get('a') 

20 1 

21 

22 Test TTL expiration using mocked time (no actual sleep): 

23 

24 >>> with patch("time.time") as mock_time: 

25 ... mock_time.return_value = 1000 

26 ... cache2 = ResourceCache(max_size=2, ttl=1) 

27 ... cache2.set('x', 100) 

28 ... cache2.get('x') # Before expiration 

29 ... mock_time.return_value = 1002 # Advance past TTL 

30 ... cache2.get('x') is None # After expiration 

31 100 

32 True 

33 

34 Test LRU eviction: 

35 

36 >>> cache.set('a', 1) 

37 >>> cache.set('b', 2) 

38 >>> cache.set('c', 3) # LRU eviction 

39 >>> sorted(cache._cache.keys()) 

40 ['b', 'c'] 

41 >>> cache.delete('b') 

42 >>> cache.get('b') is None 

43 True 

44 >>> cache.clear() 

45 >>> cache.get('a') is None 

46 True 

47""" 

48 

49# Standard 

50import asyncio 

51from collections import OrderedDict 

52from dataclasses import dataclass 

53import heapq 

54import threading 

55import time 

56from typing import Any, Optional 

57 

58# First-Party 

59from mcpgateway.services.logging_service import LoggingService 

60 

61# Initialize logging service first 

62logging_service = LoggingService() 

63logger = logging_service.get_logger(__name__) 

64 

65 

66@dataclass 

67class CacheEntry: 

68 """Cache entry with expiration.""" 

69 

70 value: Any 

71 expires_at: float 

72 

73 

74class ResourceCache: 

75 """ 

76 Resource content cache with TTL expiration. 

77 

78 Attributes: 

79 max_size: Maximum number of entries 

80 ttl: Time-to-live in seconds 

81 _cache: Cache storage 

82 _lock: Threading lock for thread safety 

83 

84 Examples: 

85 >>> from mcpgateway.cache.resource_cache import ResourceCache 

86 >>> from unittest.mock import patch 

87 >>> cache = ResourceCache(max_size=2, ttl=1) 

88 >>> cache.set('a', 1) 

89 >>> cache.get('a') 

90 1 

91 

92 Test TTL expiration using mocked time (no actual sleep): 

93 

94 >>> with patch("time.time") as mock_time: 

95 ... mock_time.return_value = 1000 

96 ... cache2 = ResourceCache(max_size=2, ttl=1) 

97 ... cache2.set('x', 100) 

98 ... cache2.get('x') # Before expiration 

99 ... mock_time.return_value = 1002 # Advance past TTL 

100 ... cache2.get('x') is None # After expiration 

101 100 

102 True 

103 

104 Test LRU eviction: 

105 

106 >>> cache.set('a', 1) 

107 >>> cache.set('b', 2) 

108 >>> cache.set('c', 3) # LRU eviction 

109 >>> sorted(cache._cache.keys()) 

110 ['b', 'c'] 

111 >>> cache.delete('b') 

112 >>> cache.get('b') is None 

113 True 

114 >>> cache.clear() 

115 >>> cache.get('a') is None 

116 True 

117 """ 

118 

119 def __init__(self, max_size: int = 1000, ttl: int = 3600): 

120 """Initialize cache. 

121 

122 Args: 

123 max_size: Maximum number of entries 

124 ttl: Time-to-live in seconds 

125 """ 

126 self.max_size = max_size 

127 self.ttl = ttl 

128 self._cache: OrderedDict[str, CacheEntry] = OrderedDict() 

129 # Use a threading lock for thread-safe operations across sync methods 

130 # and the background cleanup thread. 

131 self._lock = threading.Lock() 

132 # Min-heap of (expires_at, key) for efficient expiration cleanup 

133 self._expiry_heap: list[tuple[float, str]] = [] 

134 

135 async def initialize(self) -> None: 

136 """Initialize cache service.""" 

137 logger.info("Initializing resource cache") 

138 # Start cleanup task 

139 asyncio.create_task(self._cleanup_loop()) 

140 

141 async def shutdown(self) -> None: 

142 """Shutdown cache service.""" 

143 logger.info("Shutting down resource cache") 

144 self.clear() 

145 

146 def get(self, key: str) -> Optional[Any]: 

147 """ 

148 Get value from cache. 

149 

150 Args: 

151 key: Cache key 

152 

153 Returns: 

154 Cached value or None if not found/expired 

155 

156 Examples: 

157 >>> from mcpgateway.cache.resource_cache import ResourceCache 

158 >>> from unittest.mock import patch 

159 

160 >>> # Normal get 

161 >>> cache = ResourceCache(max_size=2, ttl=1) 

162 >>> cache.set('a', 1) 

163 >>> cache.get('a') 

164 1 

165 

166 >>> # Test expiration deterministically using mock time 

167 >>> with patch("time.time") as mock_time: 

168 ... mock_time.return_value = 1000 

169 ... short_cache = ResourceCache(max_size=2, ttl=0.1) 

170 ... short_cache.set('b', 2) 

171 ... short_cache.get('b') 

172 ... # Advance time past TTL 

173 ... mock_time.return_value = 1000.2 

174 ... short_cache.get('b') is None 

175 2 

176 True 

177 """ 

178 with self._lock: 

179 if key not in self._cache: 

180 return None 

181 

182 entry = self._cache[key] 

183 now = time.time() 

184 

185 # Check expiration 

186 if now > entry.expires_at: 

187 del self._cache[key] 

188 return None 

189 

190 self._cache.move_to_end(key) 

191 

192 return entry.value 

193 

194 def set(self, key: str, value: Any) -> None: 

195 """ 

196 Set value in cache. 

197 

198 Args: 

199 key: Cache key 

200 value: Value to cache 

201 

202 Examples: 

203 >>> from mcpgateway.cache.resource_cache import ResourceCache 

204 >>> cache = ResourceCache(max_size=2, ttl=1) 

205 >>> cache.set('a', 1) 

206 >>> cache.get('a') 

207 1 

208 """ 

209 expires_at = time.time() + self.ttl 

210 with self._lock: 

211 if key in self._cache: 

212 self._cache.move_to_end(key) 

213 elif len(self._cache) >= self.max_size: 

214 # Evict LRU 

215 self._cache.popitem(last=False) 

216 

217 # Add / update entry 

218 self._cache[key] = CacheEntry(value=value, expires_at=expires_at) 

219 # Push expiry into heap; stale heap entries are ignored later 

220 heapq.heappush(self._expiry_heap, (expires_at, key)) 

221 

222 def delete(self, key: str) -> None: 

223 """ 

224 Delete value from cache. 

225 

226 Args: 

227 key: Cache key to delete 

228 

229 Examples: 

230 >>> from mcpgateway.cache.resource_cache import ResourceCache 

231 >>> cache = ResourceCache() 

232 >>> cache.set('a', 1) 

233 >>> cache.delete('a') 

234 >>> cache.get('a') is None 

235 True 

236 """ 

237 with self._lock: 

238 self._cache.pop(key, None) 

239 # We don't remove entries from the heap here; they'll be ignored 

240 # by the cleanup when popped if missing or timestamp differs. 

241 

242 def clear(self) -> None: 

243 """ 

244 Clear all cached entries. 

245 

246 Examples: 

247 >>> from mcpgateway.cache.resource_cache import ResourceCache 

248 >>> cache = ResourceCache() 

249 >>> cache.set('a', 1) 

250 >>> cache.clear() 

251 >>> cache.get('a') is None 

252 True 

253 """ 

254 with self._lock: 

255 self._cache.clear() 

256 self._expiry_heap.clear() 

257 

258 async def _cleanup_loop(self) -> None: 

259 """Background task to clean expired entries efficiently. 

260 

261 Uses a min-heap of expiration timestamps to avoid scanning the 

262 entire cache on each run. The actual cleanup work runs under the 

263 same threading lock as sync methods by delegating to a thread via 

264 `asyncio.to_thread` so we don't block the event loop. 

265 """ 

266 

267 async def _run_once() -> None: 

268 """Execute a single cleanup pass, catching and logging any errors.""" 

269 try: 

270 await asyncio.to_thread(self._cleanup_once) 

271 except Exception as e: 

272 logger.error(f"Cache cleanup error: {e}") 

273 

274 while True: 

275 await _run_once() 

276 await asyncio.sleep(60) # Run every minute 

277 

278 def _cleanup_once(self) -> None: 

279 """Synchronous cleanup routine executed in a thread. 

280 

281 Pops entries from the expiry heap until the next non-expired 

282 timestamp is reached. Each popped entry is validated against 

283 the current cache entry to avoid removing updated entries. 

284 Also compacts the heap if it grows too large relative to cache size. 

285 """ 

286 now = time.time() 

287 removed = 0 

288 needs_compaction = False 

289 

290 with self._lock: 

291 while self._expiry_heap and self._expiry_heap[0][0] <= now: 

292 expires_at, key = heapq.heappop(self._expiry_heap) 

293 entry = self._cache.get(key) 

294 # If entry is present and timestamps match, remove it 

295 if entry is not None and entry.expires_at == expires_at: 

296 del self._cache[key] 

297 removed += 1 

298 

299 # Check if heap needs compaction (done outside lock) 

300 needs_compaction = len(self._expiry_heap) > 2 * self.max_size 

301 

302 if removed: 

303 logger.debug(f"Cleaned {removed} expired cache entries") 

304 

305 # Compact heap outside the main lock to minimize contention 

306 if needs_compaction: 

307 self._compact_heap() 

308 

309 def _compact_heap(self) -> None: 

310 """Rebuild the expiry heap with only valid (current) entries. 

311 

312 Called when heap grows too large due to stale entries from 

313 key updates or deletions. Minimizes lock contention by doing 

314 the O(n) heapify outside the lock. 

315 """ 

316 # Snapshot current entries under lock (fast dict iteration) 

317 with self._lock: 

318 entries = [(entry.expires_at, key) for key, entry in self._cache.items()] 

319 old_size = len(self._expiry_heap) 

320 # Track max expiry in snapshot to identify entries added during compaction 

321 max_snapshot_expiry = max((e[0] for e in entries), default=0.0) 

322 

323 # Build heap outside lock - O(n) work doesn't block get/set 

324 heapq.heapify(entries) 

325 

326 # Swap back under lock, preserving entries added during compaction 

327 with self._lock: 

328 # Keep heap entries with expiry > max_snapshot_expiry (added via set() during compaction) 

329 new_entries = [(exp, k) for exp, k in self._expiry_heap if exp > max_snapshot_expiry] 

330 self._expiry_heap = entries 

331 for entry in new_entries: 

332 heapq.heappush(self._expiry_heap, entry) 

333 

334 logger.debug(f"Compacted expiry heap: {old_size} -> {len(self._expiry_heap)} entries") 

335 

336 def __len__(self) -> int: 

337 """ 

338 Get the number of entries in cache. 

339 

340 Args: 

341 None 

342 

343 Returns: 

344 int: Number of entries in cache 

345 

346 Examples: 

347 >>> from mcpgateway.cache.resource_cache import ResourceCache 

348 >>> cache = ResourceCache(max_size=2, ttl=1) 

349 >>> cache.set('a', 1) 

350 >>> len(cache) 

351 1 

352 """ 

353 with self._lock: 

354 return len(self._cache)