Coverage for mcpgateway / cache / resource_cache.py: 100%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/cache/resource_cache.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Resource Cache Implementation. 

8This module implements a simple in-memory cache with TTL expiration for caching 

9resource content in ContextForge. Features: 

10- TTL-based expiration 

11- Maximum size limit with LRU eviction 

12- Thread-safe operations 

13 

14Examples: 

15 >>> from mcpgateway.cache.resource_cache import ResourceCache 

16 >>> from unittest.mock import patch 

17 >>> cache = ResourceCache(max_size=2, ttl=1) 

18 >>> cache.set('a', 1) 

19 >>> cache.get('a') 

20 1 

21 

22 Test TTL expiration using mocked time (no actual sleep): 

23 

24 >>> with patch("time.time") as mock_time: 

25 ... mock_time.return_value = 1000 

26 ... cache2 = ResourceCache(max_size=2, ttl=1) 

27 ... cache2.set('x', 100) 

28 ... cache2.get('x') # Before expiration 

29 ... mock_time.return_value = 1002 # Advance past TTL 

30 ... cache2.get('x') is None # After expiration 

31 100 

32 True 

33 

34 Test LRU eviction: 

35 

36 >>> cache.set('a', 1) 

37 >>> cache.set('b', 2) 

38 >>> cache.set('c', 3) # LRU eviction 

39 >>> sorted(cache._cache.keys()) 

40 ['b', 'c'] 

41 >>> cache.delete('b') 

42 >>> cache.get('b') is None 

43 True 

44 >>> cache.clear() 

45 >>> cache.get('a') is None 

46 True 

47""" 

48 

49# Standard 

50import asyncio 

51from collections import OrderedDict 

52from dataclasses import dataclass 

53import heapq 

54import threading 

55import time 

56from typing import Any, Optional 

57 

58# First-Party 

59from mcpgateway.services.logging_service import LoggingService 

60 

61# Initialize logging service first 

62logging_service = LoggingService() 

63logger = logging_service.get_logger(__name__) 

64 

65 

66@dataclass 

67class CacheEntry: 

68 """Cache entry with expiration.""" 

69 

70 value: Any 

71 expires_at: float 

72 

73 

74class ResourceCache: 

75 """ 

76 Resource content cache with TTL expiration. 

77 

78 Attributes: 

79 max_size: Maximum number of entries 

80 ttl: Time-to-live in seconds 

81 _cache: Cache storage 

82 _lock: Threading lock for thread safety 

83 

84 Examples: 

85 >>> from mcpgateway.cache.resource_cache import ResourceCache 

86 >>> from unittest.mock import patch 

87 >>> cache = ResourceCache(max_size=2, ttl=1) 

88 >>> cache.set('a', 1) 

89 >>> cache.get('a') 

90 1 

91 

92 Test TTL expiration using mocked time (no actual sleep): 

93 

94 >>> with patch("time.time") as mock_time: 

95 ... mock_time.return_value = 1000 

96 ... cache2 = ResourceCache(max_size=2, ttl=1) 

97 ... cache2.set('x', 100) 

98 ... cache2.get('x') # Before expiration 

99 ... mock_time.return_value = 1002 # Advance past TTL 

100 ... cache2.get('x') is None # After expiration 

101 100 

102 True 

103 

104 Test LRU eviction: 

105 

106 >>> cache.set('a', 1) 

107 >>> cache.set('b', 2) 

108 >>> cache.set('c', 3) # LRU eviction 

109 >>> sorted(cache._cache.keys()) 

110 ['b', 'c'] 

111 >>> cache.delete('b') 

112 >>> cache.get('b') is None 

113 True 

114 >>> cache.clear() 

115 >>> cache.get('a') is None 

116 True 

117 """ 

118 

119 def __init__(self, max_size: int = 1000, ttl: int = 3600): 

120 """Initialize cache. 

121 

122 Args: 

123 max_size: Maximum number of entries 

124 ttl: Time-to-live in seconds 

125 """ 

126 self.max_size = max_size 

127 self.ttl = ttl 

128 self._cache: OrderedDict[str, CacheEntry] = OrderedDict() 

129 # Use a threading lock for thread-safe operations across sync methods 

130 # and the background cleanup thread. 

131 self._lock = threading.Lock() 

132 # Min-heap of (expires_at, key) for efficient expiration cleanup 

133 self._expiry_heap: list[tuple[float, str]] = [] 

134 self._cleanup_task: Optional[asyncio.Task] = None 

135 

136 async def initialize(self) -> None: 

137 """Initialize cache service.""" 

138 logger.info("Initializing resource cache") 

139 # Start cleanup task and store reference to prevent GC 

140 self._cleanup_task = asyncio.create_task(self._cleanup_loop()) 

141 

142 async def shutdown(self) -> None: 

143 """Shutdown cache service.""" 

144 logger.info("Shutting down resource cache") 

145 if self._cleanup_task and not self._cleanup_task.done(): 

146 self._cleanup_task.cancel() 

147 try: 

148 await self._cleanup_task 

149 except asyncio.CancelledError: 

150 pass 

151 self.clear() 

152 

153 def get(self, key: str) -> Optional[Any]: 

154 """ 

155 Get value from cache. 

156 

157 Args: 

158 key: Cache key 

159 

160 Returns: 

161 Cached value or None if not found/expired 

162 

163 Examples: 

164 >>> from mcpgateway.cache.resource_cache import ResourceCache 

165 >>> from unittest.mock import patch 

166 

167 >>> # Normal get 

168 >>> cache = ResourceCache(max_size=2, ttl=1) 

169 >>> cache.set('a', 1) 

170 >>> cache.get('a') 

171 1 

172 

173 >>> # Test expiration deterministically using mock time 

174 >>> with patch("time.time") as mock_time: 

175 ... mock_time.return_value = 1000 

176 ... short_cache = ResourceCache(max_size=2, ttl=0.1) 

177 ... short_cache.set('b', 2) 

178 ... short_cache.get('b') 

179 ... # Advance time past TTL 

180 ... mock_time.return_value = 1000.2 

181 ... short_cache.get('b') is None 

182 2 

183 True 

184 """ 

185 with self._lock: 

186 if key not in self._cache: 

187 return None 

188 

189 entry = self._cache[key] 

190 now = time.time() 

191 

192 # Check expiration 

193 if now > entry.expires_at: 

194 del self._cache[key] 

195 return None 

196 

197 self._cache.move_to_end(key) 

198 

199 return entry.value 

200 

201 def set(self, key: str, value: Any) -> None: 

202 """ 

203 Set value in cache. 

204 

205 Args: 

206 key: Cache key 

207 value: Value to cache 

208 

209 Examples: 

210 >>> from mcpgateway.cache.resource_cache import ResourceCache 

211 >>> cache = ResourceCache(max_size=2, ttl=1) 

212 >>> cache.set('a', 1) 

213 >>> cache.get('a') 

214 1 

215 """ 

216 expires_at = time.time() + self.ttl 

217 with self._lock: 

218 if key in self._cache: 

219 self._cache.move_to_end(key) 

220 elif len(self._cache) >= self.max_size: 

221 # Evict LRU 

222 self._cache.popitem(last=False) 

223 

224 # Add / update entry 

225 self._cache[key] = CacheEntry(value=value, expires_at=expires_at) 

226 # Push expiry into heap; stale heap entries are ignored later 

227 heapq.heappush(self._expiry_heap, (expires_at, key)) 

228 

229 def delete(self, key: str) -> None: 

230 """ 

231 Delete value from cache. 

232 

233 Args: 

234 key: Cache key to delete 

235 

236 Examples: 

237 >>> from mcpgateway.cache.resource_cache import ResourceCache 

238 >>> cache = ResourceCache() 

239 >>> cache.set('a', 1) 

240 >>> cache.delete('a') 

241 >>> cache.get('a') is None 

242 True 

243 """ 

244 with self._lock: 

245 self._cache.pop(key, None) 

246 # We don't remove entries from the heap here; they'll be ignored 

247 # by the cleanup when popped if missing or timestamp differs. 

248 

249 def clear(self) -> None: 

250 """ 

251 Clear all cached entries. 

252 

253 Examples: 

254 >>> from mcpgateway.cache.resource_cache import ResourceCache 

255 >>> cache = ResourceCache() 

256 >>> cache.set('a', 1) 

257 >>> cache.clear() 

258 >>> cache.get('a') is None 

259 True 

260 """ 

261 with self._lock: 

262 self._cache.clear() 

263 self._expiry_heap.clear() 

264 

265 async def _cleanup_loop(self) -> None: 

266 """Background task to clean expired entries efficiently. 

267 

268 Uses a min-heap of expiration timestamps to avoid scanning the 

269 entire cache on each run. The actual cleanup work runs under the 

270 same threading lock as sync methods by delegating to a thread via 

271 `asyncio.to_thread` so we don't block the event loop. 

272 """ 

273 

274 async def _run_once() -> None: 

275 """Execute a single cleanup pass, catching and logging any errors.""" 

276 try: 

277 await asyncio.to_thread(self._cleanup_once) 

278 except Exception as e: 

279 logger.error(f"Cache cleanup error: {e}") 

280 

281 while True: 

282 await _run_once() 

283 await asyncio.sleep(60) # Run every minute 

284 

285 def _cleanup_once(self) -> None: 

286 """Synchronous cleanup routine executed in a thread. 

287 

288 Pops entries from the expiry heap until the next non-expired 

289 timestamp is reached. Each popped entry is validated against 

290 the current cache entry to avoid removing updated entries. 

291 Also compacts the heap if it grows too large relative to cache size. 

292 """ 

293 now = time.time() 

294 removed = 0 

295 needs_compaction = False 

296 

297 with self._lock: 

298 while self._expiry_heap and self._expiry_heap[0][0] <= now: 

299 expires_at, key = heapq.heappop(self._expiry_heap) 

300 entry = self._cache.get(key) 

301 # If entry is present and timestamps match, remove it 

302 if entry is not None and entry.expires_at == expires_at: 

303 del self._cache[key] 

304 removed += 1 

305 

306 # Check if heap needs compaction (done outside lock) 

307 needs_compaction = len(self._expiry_heap) > 2 * self.max_size 

308 

309 if removed: 

310 logger.debug(f"Cleaned {removed} expired cache entries") 

311 

312 # Compact heap outside the main lock to minimize contention 

313 if needs_compaction: 

314 self._compact_heap() 

315 

316 def _compact_heap(self) -> None: 

317 """Rebuild the expiry heap with only valid (current) entries. 

318 

319 Called when heap grows too large due to stale entries from 

320 key updates or deletions. Minimizes lock contention by doing 

321 the O(n) heapify outside the lock. 

322 """ 

323 # Snapshot current entries under lock (fast dict iteration) 

324 with self._lock: 

325 entries = [(entry.expires_at, key) for key, entry in self._cache.items()] 

326 old_size = len(self._expiry_heap) 

327 # Track max expiry in snapshot to identify entries added during compaction 

328 max_snapshot_expiry = max((e[0] for e in entries), default=0.0) 

329 

330 # Build heap outside lock - O(n) work doesn't block get/set 

331 heapq.heapify(entries) 

332 

333 # Swap back under lock, preserving entries added during compaction 

334 with self._lock: 

335 # Keep heap entries with expiry > max_snapshot_expiry (added via set() during compaction) 

336 new_entries = [(exp, k) for exp, k in self._expiry_heap if exp > max_snapshot_expiry] 

337 self._expiry_heap = entries 

338 for entry in new_entries: 

339 heapq.heappush(self._expiry_heap, entry) 

340 

341 logger.debug(f"Compacted expiry heap: {old_size} -> {len(self._expiry_heap)} entries") 

342 

343 def __len__(self) -> int: 

344 """ 

345 Get the number of entries in cache. 

346 

347 Args: 

348 None 

349 

350 Returns: 

351 int: Number of entries in cache 

352 

353 Examples: 

354 >>> from mcpgateway.cache.resource_cache import ResourceCache 

355 >>> cache = ResourceCache(max_size=2, ttl=1) 

356 >>> cache.set('a', 1) 

357 >>> len(cache) 

358 1 

359 """ 

360 with self._lock: 

361 return len(self._cache)