Coverage for mcpgateway / cache / resource_cache.py: 100%
94 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/cache/resource_cache.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Resource Cache Implementation.
8This module implements a simple in-memory cache with TTL expiration for caching
9resource content in the MCP Gateway. Features:
10- TTL-based expiration
11- Maximum size limit with LRU eviction
12- Thread-safe operations
14Examples:
15 >>> from mcpgateway.cache.resource_cache import ResourceCache
16 >>> from unittest.mock import patch
17 >>> cache = ResourceCache(max_size=2, ttl=1)
18 >>> cache.set('a', 1)
19 >>> cache.get('a')
20 1
22 Test TTL expiration using mocked time (no actual sleep):
24 >>> with patch("time.time") as mock_time:
25 ... mock_time.return_value = 1000
26 ... cache2 = ResourceCache(max_size=2, ttl=1)
27 ... cache2.set('x', 100)
28 ... cache2.get('x') # Before expiration
29 ... mock_time.return_value = 1002 # Advance past TTL
30 ... cache2.get('x') is None # After expiration
31 100
32 True
34 Test LRU eviction:
36 >>> cache.set('a', 1)
37 >>> cache.set('b', 2)
38 >>> cache.set('c', 3) # LRU eviction
39 >>> sorted(cache._cache.keys())
40 ['b', 'c']
41 >>> cache.delete('b')
42 >>> cache.get('b') is None
43 True
44 >>> cache.clear()
45 >>> cache.get('a') is None
46 True
47"""
49# Standard
50import asyncio
51from collections import OrderedDict
52from dataclasses import dataclass
53import heapq
54import threading
55import time
56from typing import Any, Optional
58# First-Party
59from mcpgateway.services.logging_service import LoggingService
61# Initialize logging service first
62logging_service = LoggingService()
63logger = logging_service.get_logger(__name__)
66@dataclass
67class CacheEntry:
68 """Cache entry with expiration."""
70 value: Any
71 expires_at: float
74class ResourceCache:
75 """
76 Resource content cache with TTL expiration.
78 Attributes:
79 max_size: Maximum number of entries
80 ttl: Time-to-live in seconds
81 _cache: Cache storage
82 _lock: Threading lock for thread safety
84 Examples:
85 >>> from mcpgateway.cache.resource_cache import ResourceCache
86 >>> from unittest.mock import patch
87 >>> cache = ResourceCache(max_size=2, ttl=1)
88 >>> cache.set('a', 1)
89 >>> cache.get('a')
90 1
92 Test TTL expiration using mocked time (no actual sleep):
94 >>> with patch("time.time") as mock_time:
95 ... mock_time.return_value = 1000
96 ... cache2 = ResourceCache(max_size=2, ttl=1)
97 ... cache2.set('x', 100)
98 ... cache2.get('x') # Before expiration
99 ... mock_time.return_value = 1002 # Advance past TTL
100 ... cache2.get('x') is None # After expiration
101 100
102 True
104 Test LRU eviction:
106 >>> cache.set('a', 1)
107 >>> cache.set('b', 2)
108 >>> cache.set('c', 3) # LRU eviction
109 >>> sorted(cache._cache.keys())
110 ['b', 'c']
111 >>> cache.delete('b')
112 >>> cache.get('b') is None
113 True
114 >>> cache.clear()
115 >>> cache.get('a') is None
116 True
117 """
119 def __init__(self, max_size: int = 1000, ttl: int = 3600):
120 """Initialize cache.
122 Args:
123 max_size: Maximum number of entries
124 ttl: Time-to-live in seconds
125 """
126 self.max_size = max_size
127 self.ttl = ttl
128 self._cache: OrderedDict[str, CacheEntry] = OrderedDict()
129 # Use a threading lock for thread-safe operations across sync methods
130 # and the background cleanup thread.
131 self._lock = threading.Lock()
132 # Min-heap of (expires_at, key) for efficient expiration cleanup
133 self._expiry_heap: list[tuple[float, str]] = []
135 async def initialize(self) -> None:
136 """Initialize cache service."""
137 logger.info("Initializing resource cache")
138 # Start cleanup task
139 asyncio.create_task(self._cleanup_loop())
141 async def shutdown(self) -> None:
142 """Shutdown cache service."""
143 logger.info("Shutting down resource cache")
144 self.clear()
146 def get(self, key: str) -> Optional[Any]:
147 """
148 Get value from cache.
150 Args:
151 key: Cache key
153 Returns:
154 Cached value or None if not found/expired
156 Examples:
157 >>> from mcpgateway.cache.resource_cache import ResourceCache
158 >>> from unittest.mock import patch
160 >>> # Normal get
161 >>> cache = ResourceCache(max_size=2, ttl=1)
162 >>> cache.set('a', 1)
163 >>> cache.get('a')
164 1
166 >>> # Test expiration deterministically using mock time
167 >>> with patch("time.time") as mock_time:
168 ... mock_time.return_value = 1000
169 ... short_cache = ResourceCache(max_size=2, ttl=0.1)
170 ... short_cache.set('b', 2)
171 ... short_cache.get('b')
172 ... # Advance time past TTL
173 ... mock_time.return_value = 1000.2
174 ... short_cache.get('b') is None
175 2
176 True
177 """
178 with self._lock:
179 if key not in self._cache:
180 return None
182 entry = self._cache[key]
183 now = time.time()
185 # Check expiration
186 if now > entry.expires_at:
187 del self._cache[key]
188 return None
190 self._cache.move_to_end(key)
192 return entry.value
194 def set(self, key: str, value: Any) -> None:
195 """
196 Set value in cache.
198 Args:
199 key: Cache key
200 value: Value to cache
202 Examples:
203 >>> from mcpgateway.cache.resource_cache import ResourceCache
204 >>> cache = ResourceCache(max_size=2, ttl=1)
205 >>> cache.set('a', 1)
206 >>> cache.get('a')
207 1
208 """
209 expires_at = time.time() + self.ttl
210 with self._lock:
211 if key in self._cache:
212 self._cache.move_to_end(key)
213 elif len(self._cache) >= self.max_size:
214 # Evict LRU
215 self._cache.popitem(last=False)
217 # Add / update entry
218 self._cache[key] = CacheEntry(value=value, expires_at=expires_at)
219 # Push expiry into heap; stale heap entries are ignored later
220 heapq.heappush(self._expiry_heap, (expires_at, key))
222 def delete(self, key: str) -> None:
223 """
224 Delete value from cache.
226 Args:
227 key: Cache key to delete
229 Examples:
230 >>> from mcpgateway.cache.resource_cache import ResourceCache
231 >>> cache = ResourceCache()
232 >>> cache.set('a', 1)
233 >>> cache.delete('a')
234 >>> cache.get('a') is None
235 True
236 """
237 with self._lock:
238 self._cache.pop(key, None)
239 # We don't remove entries from the heap here; they'll be ignored
240 # by the cleanup when popped if missing or timestamp differs.
242 def clear(self) -> None:
243 """
244 Clear all cached entries.
246 Examples:
247 >>> from mcpgateway.cache.resource_cache import ResourceCache
248 >>> cache = ResourceCache()
249 >>> cache.set('a', 1)
250 >>> cache.clear()
251 >>> cache.get('a') is None
252 True
253 """
254 with self._lock:
255 self._cache.clear()
256 self._expiry_heap.clear()
258 async def _cleanup_loop(self) -> None:
259 """Background task to clean expired entries efficiently.
261 Uses a min-heap of expiration timestamps to avoid scanning the
262 entire cache on each run. The actual cleanup work runs under the
263 same threading lock as sync methods by delegating to a thread via
264 `asyncio.to_thread` so we don't block the event loop.
265 """
267 async def _run_once() -> None:
268 """Execute a single cleanup pass, catching and logging any errors."""
269 try:
270 await asyncio.to_thread(self._cleanup_once)
271 except Exception as e:
272 logger.error(f"Cache cleanup error: {e}")
274 while True:
275 await _run_once()
276 await asyncio.sleep(60) # Run every minute
278 def _cleanup_once(self) -> None:
279 """Synchronous cleanup routine executed in a thread.
281 Pops entries from the expiry heap until the next non-expired
282 timestamp is reached. Each popped entry is validated against
283 the current cache entry to avoid removing updated entries.
284 Also compacts the heap if it grows too large relative to cache size.
285 """
286 now = time.time()
287 removed = 0
288 needs_compaction = False
290 with self._lock:
291 while self._expiry_heap and self._expiry_heap[0][0] <= now:
292 expires_at, key = heapq.heappop(self._expiry_heap)
293 entry = self._cache.get(key)
294 # If entry is present and timestamps match, remove it
295 if entry is not None and entry.expires_at == expires_at:
296 del self._cache[key]
297 removed += 1
299 # Check if heap needs compaction (done outside lock)
300 needs_compaction = len(self._expiry_heap) > 2 * self.max_size
302 if removed:
303 logger.debug(f"Cleaned {removed} expired cache entries")
305 # Compact heap outside the main lock to minimize contention
306 if needs_compaction:
307 self._compact_heap()
309 def _compact_heap(self) -> None:
310 """Rebuild the expiry heap with only valid (current) entries.
312 Called when heap grows too large due to stale entries from
313 key updates or deletions. Minimizes lock contention by doing
314 the O(n) heapify outside the lock.
315 """
316 # Snapshot current entries under lock (fast dict iteration)
317 with self._lock:
318 entries = [(entry.expires_at, key) for key, entry in self._cache.items()]
319 old_size = len(self._expiry_heap)
320 # Track max expiry in snapshot to identify entries added during compaction
321 max_snapshot_expiry = max((e[0] for e in entries), default=0.0)
323 # Build heap outside lock - O(n) work doesn't block get/set
324 heapq.heapify(entries)
326 # Swap back under lock, preserving entries added during compaction
327 with self._lock:
328 # Keep heap entries with expiry > max_snapshot_expiry (added via set() during compaction)
329 new_entries = [(exp, k) for exp, k in self._expiry_heap if exp > max_snapshot_expiry]
330 self._expiry_heap = entries
331 for entry in new_entries:
332 heapq.heappush(self._expiry_heap, entry)
334 logger.debug(f"Compacted expiry heap: {old_size} -> {len(self._expiry_heap)} entries")
336 def __len__(self) -> int:
337 """
338 Get the number of entries in cache.
340 Args:
341 None
343 Returns:
344 int: Number of entries in cache
346 Examples:
347 >>> from mcpgateway.cache.resource_cache import ResourceCache
348 >>> cache = ResourceCache(max_size=2, ttl=1)
349 >>> cache.set('a', 1)
350 >>> len(cache)
351 1
352 """
353 with self._lock:
354 return len(self._cache)