Coverage for mcpgateway / cache / resource_cache.py: 100%
101 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/cache/resource_cache.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Resource Cache Implementation.
8This module implements a simple in-memory cache with TTL expiration for caching
9resource content in ContextForge. Features:
10- TTL-based expiration
11- Maximum size limit with LRU eviction
12- Thread-safe operations
14Examples:
15 >>> from mcpgateway.cache.resource_cache import ResourceCache
16 >>> from unittest.mock import patch
17 >>> cache = ResourceCache(max_size=2, ttl=1)
18 >>> cache.set('a', 1)
19 >>> cache.get('a')
20 1
22 Test TTL expiration using mocked time (no actual sleep):
24 >>> with patch("time.time") as mock_time:
25 ... mock_time.return_value = 1000
26 ... cache2 = ResourceCache(max_size=2, ttl=1)
27 ... cache2.set('x', 100)
28 ... cache2.get('x') # Before expiration
29 ... mock_time.return_value = 1002 # Advance past TTL
30 ... cache2.get('x') is None # After expiration
31 100
32 True
34 Test LRU eviction:
36 >>> cache.set('a', 1)
37 >>> cache.set('b', 2)
38 >>> cache.set('c', 3) # LRU eviction
39 >>> sorted(cache._cache.keys())
40 ['b', 'c']
41 >>> cache.delete('b')
42 >>> cache.get('b') is None
43 True
44 >>> cache.clear()
45 >>> cache.get('a') is None
46 True
47"""
49# Standard
50import asyncio
51from collections import OrderedDict
52from dataclasses import dataclass
53import heapq
54import threading
55import time
56from typing import Any, Optional
58# First-Party
59from mcpgateway.services.logging_service import LoggingService
61# Initialize logging service first
62logging_service = LoggingService()
63logger = logging_service.get_logger(__name__)
66@dataclass
67class CacheEntry:
68 """Cache entry with expiration."""
70 value: Any
71 expires_at: float
74class ResourceCache:
75 """
76 Resource content cache with TTL expiration.
78 Attributes:
79 max_size: Maximum number of entries
80 ttl: Time-to-live in seconds
81 _cache: Cache storage
82 _lock: Threading lock for thread safety
84 Examples:
85 >>> from mcpgateway.cache.resource_cache import ResourceCache
86 >>> from unittest.mock import patch
87 >>> cache = ResourceCache(max_size=2, ttl=1)
88 >>> cache.set('a', 1)
89 >>> cache.get('a')
90 1
92 Test TTL expiration using mocked time (no actual sleep):
94 >>> with patch("time.time") as mock_time:
95 ... mock_time.return_value = 1000
96 ... cache2 = ResourceCache(max_size=2, ttl=1)
97 ... cache2.set('x', 100)
98 ... cache2.get('x') # Before expiration
99 ... mock_time.return_value = 1002 # Advance past TTL
100 ... cache2.get('x') is None # After expiration
101 100
102 True
104 Test LRU eviction:
106 >>> cache.set('a', 1)
107 >>> cache.set('b', 2)
108 >>> cache.set('c', 3) # LRU eviction
109 >>> sorted(cache._cache.keys())
110 ['b', 'c']
111 >>> cache.delete('b')
112 >>> cache.get('b') is None
113 True
114 >>> cache.clear()
115 >>> cache.get('a') is None
116 True
117 """
119 def __init__(self, max_size: int = 1000, ttl: int = 3600):
120 """Initialize cache.
122 Args:
123 max_size: Maximum number of entries
124 ttl: Time-to-live in seconds
125 """
126 self.max_size = max_size
127 self.ttl = ttl
128 self._cache: OrderedDict[str, CacheEntry] = OrderedDict()
129 # Use a threading lock for thread-safe operations across sync methods
130 # and the background cleanup thread.
131 self._lock = threading.Lock()
132 # Min-heap of (expires_at, key) for efficient expiration cleanup
133 self._expiry_heap: list[tuple[float, str]] = []
134 self._cleanup_task: Optional[asyncio.Task] = None
136 async def initialize(self) -> None:
137 """Initialize cache service."""
138 logger.info("Initializing resource cache")
139 # Start cleanup task and store reference to prevent GC
140 self._cleanup_task = asyncio.create_task(self._cleanup_loop())
142 async def shutdown(self) -> None:
143 """Shutdown cache service."""
144 logger.info("Shutting down resource cache")
145 if self._cleanup_task and not self._cleanup_task.done():
146 self._cleanup_task.cancel()
147 try:
148 await self._cleanup_task
149 except asyncio.CancelledError:
150 pass
151 self.clear()
153 def get(self, key: str) -> Optional[Any]:
154 """
155 Get value from cache.
157 Args:
158 key: Cache key
160 Returns:
161 Cached value or None if not found/expired
163 Examples:
164 >>> from mcpgateway.cache.resource_cache import ResourceCache
165 >>> from unittest.mock import patch
167 >>> # Normal get
168 >>> cache = ResourceCache(max_size=2, ttl=1)
169 >>> cache.set('a', 1)
170 >>> cache.get('a')
171 1
173 >>> # Test expiration deterministically using mock time
174 >>> with patch("time.time") as mock_time:
175 ... mock_time.return_value = 1000
176 ... short_cache = ResourceCache(max_size=2, ttl=0.1)
177 ... short_cache.set('b', 2)
178 ... short_cache.get('b')
179 ... # Advance time past TTL
180 ... mock_time.return_value = 1000.2
181 ... short_cache.get('b') is None
182 2
183 True
184 """
185 with self._lock:
186 if key not in self._cache:
187 return None
189 entry = self._cache[key]
190 now = time.time()
192 # Check expiration
193 if now > entry.expires_at:
194 del self._cache[key]
195 return None
197 self._cache.move_to_end(key)
199 return entry.value
201 def set(self, key: str, value: Any) -> None:
202 """
203 Set value in cache.
205 Args:
206 key: Cache key
207 value: Value to cache
209 Examples:
210 >>> from mcpgateway.cache.resource_cache import ResourceCache
211 >>> cache = ResourceCache(max_size=2, ttl=1)
212 >>> cache.set('a', 1)
213 >>> cache.get('a')
214 1
215 """
216 expires_at = time.time() + self.ttl
217 with self._lock:
218 if key in self._cache:
219 self._cache.move_to_end(key)
220 elif len(self._cache) >= self.max_size:
221 # Evict LRU
222 self._cache.popitem(last=False)
224 # Add / update entry
225 self._cache[key] = CacheEntry(value=value, expires_at=expires_at)
226 # Push expiry into heap; stale heap entries are ignored later
227 heapq.heappush(self._expiry_heap, (expires_at, key))
229 def delete(self, key: str) -> None:
230 """
231 Delete value from cache.
233 Args:
234 key: Cache key to delete
236 Examples:
237 >>> from mcpgateway.cache.resource_cache import ResourceCache
238 >>> cache = ResourceCache()
239 >>> cache.set('a', 1)
240 >>> cache.delete('a')
241 >>> cache.get('a') is None
242 True
243 """
244 with self._lock:
245 self._cache.pop(key, None)
246 # We don't remove entries from the heap here; they'll be ignored
247 # by the cleanup when popped if missing or timestamp differs.
249 def clear(self) -> None:
250 """
251 Clear all cached entries.
253 Examples:
254 >>> from mcpgateway.cache.resource_cache import ResourceCache
255 >>> cache = ResourceCache()
256 >>> cache.set('a', 1)
257 >>> cache.clear()
258 >>> cache.get('a') is None
259 True
260 """
261 with self._lock:
262 self._cache.clear()
263 self._expiry_heap.clear()
265 async def _cleanup_loop(self) -> None:
266 """Background task to clean expired entries efficiently.
268 Uses a min-heap of expiration timestamps to avoid scanning the
269 entire cache on each run. The actual cleanup work runs under the
270 same threading lock as sync methods by delegating to a thread via
271 `asyncio.to_thread` so we don't block the event loop.
272 """
274 async def _run_once() -> None:
275 """Execute a single cleanup pass, catching and logging any errors."""
276 try:
277 await asyncio.to_thread(self._cleanup_once)
278 except Exception as e:
279 logger.error(f"Cache cleanup error: {e}")
281 while True:
282 await _run_once()
283 await asyncio.sleep(60) # Run every minute
285 def _cleanup_once(self) -> None:
286 """Synchronous cleanup routine executed in a thread.
288 Pops entries from the expiry heap until the next non-expired
289 timestamp is reached. Each popped entry is validated against
290 the current cache entry to avoid removing updated entries.
291 Also compacts the heap if it grows too large relative to cache size.
292 """
293 now = time.time()
294 removed = 0
295 needs_compaction = False
297 with self._lock:
298 while self._expiry_heap and self._expiry_heap[0][0] <= now:
299 expires_at, key = heapq.heappop(self._expiry_heap)
300 entry = self._cache.get(key)
301 # If entry is present and timestamps match, remove it
302 if entry is not None and entry.expires_at == expires_at:
303 del self._cache[key]
304 removed += 1
306 # Check if heap needs compaction (done outside lock)
307 needs_compaction = len(self._expiry_heap) > 2 * self.max_size
309 if removed:
310 logger.debug(f"Cleaned {removed} expired cache entries")
312 # Compact heap outside the main lock to minimize contention
313 if needs_compaction:
314 self._compact_heap()
316 def _compact_heap(self) -> None:
317 """Rebuild the expiry heap with only valid (current) entries.
319 Called when heap grows too large due to stale entries from
320 key updates or deletions. Minimizes lock contention by doing
321 the O(n) heapify outside the lock.
322 """
323 # Snapshot current entries under lock (fast dict iteration)
324 with self._lock:
325 entries = [(entry.expires_at, key) for key, entry in self._cache.items()]
326 old_size = len(self._expiry_heap)
327 # Track max expiry in snapshot to identify entries added during compaction
328 max_snapshot_expiry = max((e[0] for e in entries), default=0.0)
330 # Build heap outside lock - O(n) work doesn't block get/set
331 heapq.heapify(entries)
333 # Swap back under lock, preserving entries added during compaction
334 with self._lock:
335 # Keep heap entries with expiry > max_snapshot_expiry (added via set() during compaction)
336 new_entries = [(exp, k) for exp, k in self._expiry_heap if exp > max_snapshot_expiry]
337 self._expiry_heap = entries
338 for entry in new_entries:
339 heapq.heappush(self._expiry_heap, entry)
341 logger.debug(f"Compacted expiry heap: {old_size} -> {len(self._expiry_heap)} entries")
343 def __len__(self) -> int:
344 """
345 Get the number of entries in cache.
347 Args:
348 None
350 Returns:
351 int: Number of entries in cache
353 Examples:
354 >>> from mcpgateway.cache.resource_cache import ResourceCache
355 >>> cache = ResourceCache(max_size=2, ttl=1)
356 >>> cache.set('a', 1)
357 >>> len(cache)
358 1
359 """
360 with self._lock:
361 return len(self._cache)