Coverage for mcpgateway / cache / admin_stats_cache.py: 99%
370 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/cache/admin_stats_cache.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
6Admin Statistics Cache.
8This module implements a thread-safe cache for admin dashboard statistics
9with Redis as the primary store and in-memory fallback. It caches system
10stats, observability stats, and other frequently-accessed admin data.
12Performance Impact:
13 - Before: 10+ COUNT queries per dashboard load
14 - After: 0 queries (cache hit) per TTL period
15 - Expected 1000+ queries/hour eliminated
17Examples:
18 >>> from mcpgateway.cache.admin_stats_cache import admin_stats_cache
19 >>> # Cache is used automatically by admin endpoints
20 >>> import asyncio
21 >>> # asyncio.run(admin_stats_cache.invalidate_system_stats())
22"""
24# Standard
25from dataclasses import dataclass
26import logging
27import threading
28import time
29from typing import Any, Dict, Optional
31logger = logging.getLogger(__name__)
34@dataclass
35class CacheEntry:
36 """Cache entry with value and expiry timestamp.
38 Examples:
39 >>> import time
40 >>> entry = CacheEntry(value={"total": 100}, expiry=time.time() + 60)
41 >>> entry.is_expired()
42 False
43 """
45 value: Any
46 expiry: float
48 def is_expired(self) -> bool:
49 """Check if this cache entry has expired.
51 Returns:
52 bool: True if the entry has expired, False otherwise.
53 """
54 return time.time() >= self.expiry
57class AdminStatsCache:
58 """Thread-safe admin statistics cache with Redis and in-memory tiers.
60 This cache reduces database load for admin dashboard by caching:
61 - System stats (entity counts)
62 - Observability stats (trace/span counts)
63 - User/team listings
64 - Other admin-related aggregations
66 The cache uses Redis as the primary store for distributed deployments
67 and falls back to in-memory caching when Redis is unavailable.
69 Examples:
70 >>> cache = AdminStatsCache()
71 >>> cache.stats()["hit_count"]
72 0
73 """
75 def __init__(
76 self,
77 system_ttl: Optional[int] = None,
78 observability_ttl: Optional[int] = None,
79 users_ttl: Optional[int] = None,
80 teams_ttl: Optional[int] = None,
81 tags_ttl: Optional[int] = None,
82 plugins_ttl: Optional[int] = None,
83 performance_ttl: Optional[int] = None,
84 enabled: Optional[bool] = None,
85 ):
86 """Initialize the admin stats cache.
88 Args:
89 system_ttl: TTL for system stats cache in seconds (default: 60)
90 observability_ttl: TTL for observability stats in seconds (default: 30)
91 users_ttl: TTL for user listings in seconds (default: 30)
92 teams_ttl: TTL for team listings in seconds (default: 60)
93 tags_ttl: TTL for tags listing in seconds (default: 120)
94 plugins_ttl: TTL for plugin stats in seconds (default: 120)
95 performance_ttl: TTL for performance aggregates in seconds (default: 60)
96 enabled: Whether caching is enabled (default: True)
98 Examples:
99 >>> cache = AdminStatsCache(system_ttl=120)
100 >>> cache._system_ttl
101 120
102 """
103 # Import settings lazily to avoid circular imports
104 try:
105 # First-Party
106 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel
108 self._system_ttl = system_ttl or getattr(settings, "admin_stats_cache_system_ttl", 60)
109 self._observability_ttl = observability_ttl or getattr(settings, "admin_stats_cache_observability_ttl", 30)
110 self._users_ttl = users_ttl or getattr(settings, "admin_stats_cache_users_ttl", 30)
111 self._teams_ttl = teams_ttl or getattr(settings, "admin_stats_cache_teams_ttl", 60)
112 self._tags_ttl = tags_ttl or getattr(settings, "admin_stats_cache_tags_ttl", 120)
113 self._plugins_ttl = plugins_ttl or getattr(settings, "admin_stats_cache_plugins_ttl", 120)
114 self._performance_ttl = performance_ttl or getattr(settings, "admin_stats_cache_performance_ttl", 60)
115 self._enabled = enabled if enabled is not None else getattr(settings, "admin_stats_cache_enabled", True)
116 self._cache_prefix = getattr(settings, "cache_prefix", "mcpgw:")
117 except ImportError:
118 self._system_ttl = system_ttl or 60
119 self._observability_ttl = observability_ttl or 30
120 self._users_ttl = users_ttl or 30
121 self._teams_ttl = teams_ttl or 60
122 self._tags_ttl = tags_ttl or 120
123 self._plugins_ttl = plugins_ttl or 120
124 self._performance_ttl = performance_ttl or 60
125 self._enabled = enabled if enabled is not None else True
126 self._cache_prefix = "mcpgw:"
128 # In-memory cache (fallback when Redis unavailable)
129 self._cache: Dict[str, CacheEntry] = {}
131 # Thread safety
132 self._lock = threading.Lock()
134 # Redis availability
135 self._redis_checked = False
136 self._redis_available = False
138 # Statistics
139 self._hit_count = 0
140 self._miss_count = 0
141 self._redis_hit_count = 0
142 self._redis_miss_count = 0
144 logger.info(f"AdminStatsCache initialized: enabled={self._enabled}, " f"system_ttl={self._system_ttl}s, observability_ttl={self._observability_ttl}s, tags_ttl={self._tags_ttl}s")
146 def _get_redis_key(self, key_type: str, identifier: str = "") -> str:
147 """Generate Redis key with proper prefix.
149 Args:
150 key_type: Type of cache entry (system, observability, users, teams)
151 identifier: Optional identifier suffix
153 Returns:
154 Full Redis key with prefix
156 Examples:
157 >>> cache = AdminStatsCache()
158 >>> cache._get_redis_key("system", "comprehensive")
159 'mcpgw:admin:system:comprehensive'
160 """
161 if identifier:
162 return f"{self._cache_prefix}admin:{key_type}:{identifier}"
163 return f"{self._cache_prefix}admin:{key_type}"
165 async def _get_redis_client(self):
166 """Get Redis client if available.
168 Returns:
169 Redis client or None if unavailable.
170 """
171 try:
172 # First-Party
173 from mcpgateway.utils.redis_client import get_redis_client # pylint: disable=import-outside-toplevel
175 client = await get_redis_client()
176 if client and not self._redis_checked:
177 self._redis_checked = True
178 self._redis_available = True
179 logger.debug("AdminStatsCache: Redis client available")
180 return client
181 except Exception as e:
182 if not self._redis_checked: 182 ↛ 186line 182 didn't jump to line 186 because the condition on line 182 was always true
183 self._redis_checked = True
184 self._redis_available = False
185 logger.debug(f"AdminStatsCache: Redis unavailable, using in-memory cache: {e}")
186 return None
188 async def get_system_stats(self) -> Optional[Dict[str, Any]]:
189 """Get cached system statistics.
191 Returns:
192 Cached system stats or None on cache miss
194 Examples:
195 >>> import asyncio
196 >>> cache = AdminStatsCache()
197 >>> result = asyncio.run(cache.get_system_stats())
198 >>> result is None # Cache miss on fresh cache
199 True
200 """
201 if not self._enabled:
202 return None
204 cache_key = self._get_redis_key("system", "comprehensive")
206 # Try Redis first
207 redis = await self._get_redis_client()
208 if redis:
209 try:
210 data = await redis.get(cache_key)
211 if data:
212 # Third-Party
213 import orjson # pylint: disable=import-outside-toplevel
215 self._hit_count += 1
216 self._redis_hit_count += 1
217 return orjson.loads(data)
218 self._redis_miss_count += 1
219 except Exception as e:
220 logger.warning(f"AdminStatsCache Redis get failed: {e}")
222 # Fall back to in-memory cache
223 with self._lock:
224 entry = self._cache.get(cache_key)
225 if entry and not entry.is_expired():
226 self._hit_count += 1
227 return entry.value
229 self._miss_count += 1
230 return None
232 async def set_system_stats(self, stats: Dict[str, Any]) -> None:
233 """Store system statistics in cache.
235 Args:
236 stats: System statistics dictionary
238 Examples:
239 >>> import asyncio
240 >>> cache = AdminStatsCache()
241 >>> asyncio.run(cache.set_system_stats({"tools": 10, "prompts": 5}))
242 """
243 if not self._enabled:
244 return
246 cache_key = self._get_redis_key("system", "comprehensive")
248 # Store in Redis
249 redis = await self._get_redis_client()
250 if redis:
251 try:
252 # Third-Party
253 import orjson # pylint: disable=import-outside-toplevel
255 await redis.setex(cache_key, self._system_ttl, orjson.dumps(stats))
256 except Exception as e:
257 logger.warning(f"AdminStatsCache Redis set failed: {e}")
259 # Store in in-memory cache
260 with self._lock:
261 self._cache[cache_key] = CacheEntry(value=stats, expiry=time.time() + self._system_ttl)
263 async def get_observability_stats(self, hours: int = 24) -> Optional[Dict[str, Any]]:
264 """Get cached observability statistics.
266 Args:
267 hours: Time range in hours for stats
269 Returns:
270 Cached observability stats or None on cache miss
272 Examples:
273 >>> import asyncio
274 >>> cache = AdminStatsCache()
275 >>> result = asyncio.run(cache.get_observability_stats(24))
276 >>> result is None
277 True
278 """
279 if not self._enabled:
280 return None
282 cache_key = self._get_redis_key("observability", str(hours))
284 # Try Redis first
285 redis = await self._get_redis_client()
286 if redis:
287 try:
288 data = await redis.get(cache_key)
289 if data:
290 # Third-Party
291 import orjson # pylint: disable=import-outside-toplevel
293 self._hit_count += 1
294 self._redis_hit_count += 1
295 return orjson.loads(data)
296 self._redis_miss_count += 1
297 except Exception as e:
298 logger.warning(f"AdminStatsCache Redis get failed: {e}")
300 # Fall back to in-memory cache
301 with self._lock:
302 entry = self._cache.get(cache_key)
303 if entry and not entry.is_expired():
304 self._hit_count += 1
305 return entry.value
307 self._miss_count += 1
308 return None
310 async def set_observability_stats(self, stats: Dict[str, Any], hours: int = 24) -> None:
311 """Store observability statistics in cache.
313 Args:
314 stats: Observability statistics dictionary
315 hours: Time range in hours for stats
317 Examples:
318 >>> import asyncio
319 >>> cache = AdminStatsCache()
320 >>> asyncio.run(cache.set_observability_stats({"total_traces": 100}, 24))
321 """
322 if not self._enabled:
323 return
325 cache_key = self._get_redis_key("observability", str(hours))
327 # Store in Redis
328 redis = await self._get_redis_client()
329 if redis:
330 try:
331 # Third-Party
332 import orjson # pylint: disable=import-outside-toplevel
334 await redis.setex(cache_key, self._observability_ttl, orjson.dumps(stats))
335 except Exception as e:
336 logger.warning(f"AdminStatsCache Redis set failed: {e}")
338 # Store in in-memory cache
339 with self._lock:
340 self._cache[cache_key] = CacheEntry(value=stats, expiry=time.time() + self._observability_ttl)
342 async def get_users_list(self, limit: int, offset: int) -> Optional[Any]:
343 """Get cached users list.
345 Args:
346 limit: Page size
347 offset: Page offset
349 Returns:
350 Cached users list or None on cache miss
352 Examples:
353 >>> import asyncio
354 >>> cache = AdminStatsCache()
355 >>> result = asyncio.run(cache.get_users_list(100, 0))
356 >>> result is None
357 True
358 """
359 if not self._enabled:
360 return None
362 cache_key = self._get_redis_key("users", f"{limit}:{offset}")
364 # Try Redis first
365 redis = await self._get_redis_client()
366 if redis:
367 try:
368 data = await redis.get(cache_key)
369 if data:
370 # Third-Party
371 import orjson # pylint: disable=import-outside-toplevel
373 self._hit_count += 1
374 self._redis_hit_count += 1
375 return orjson.loads(data)
376 self._redis_miss_count += 1
377 except Exception as e:
378 logger.warning(f"AdminStatsCache Redis get failed: {e}")
380 # Fall back to in-memory cache
381 with self._lock:
382 entry = self._cache.get(cache_key)
383 if entry and not entry.is_expired():
384 self._hit_count += 1
385 return entry.value
387 self._miss_count += 1
388 return None
390 async def set_users_list(self, users: Any, limit: int, offset: int) -> None:
391 """Store users list in cache.
393 Args:
394 users: Users list data
395 limit: Page size
396 offset: Page offset
398 Examples:
399 >>> import asyncio
400 >>> cache = AdminStatsCache()
401 >>> asyncio.run(cache.set_users_list([{"email": "test@example.com"}], 100, 0))
402 """
403 if not self._enabled:
404 return
406 cache_key = self._get_redis_key("users", f"{limit}:{offset}")
408 # Store in Redis
409 redis = await self._get_redis_client()
410 if redis:
411 try:
412 # Third-Party
413 import orjson # pylint: disable=import-outside-toplevel
415 await redis.setex(cache_key, self._users_ttl, orjson.dumps(users))
416 except Exception as e:
417 logger.warning(f"AdminStatsCache Redis set failed: {e}")
419 # Store in in-memory cache
420 with self._lock:
421 self._cache[cache_key] = CacheEntry(value=users, expiry=time.time() + self._users_ttl)
423 async def get_teams_list(self, limit: int, offset: int) -> Optional[Any]:
424 """Get cached teams list.
426 Args:
427 limit: Page size
428 offset: Page offset
430 Returns:
431 Cached teams list or None on cache miss
433 Examples:
434 >>> import asyncio
435 >>> cache = AdminStatsCache()
436 >>> result = asyncio.run(cache.get_teams_list(100, 0))
437 >>> result is None
438 True
439 """
440 if not self._enabled:
441 return None
443 cache_key = self._get_redis_key("teams", f"{limit}:{offset}")
445 # Try Redis first
446 redis = await self._get_redis_client()
447 if redis:
448 try:
449 data = await redis.get(cache_key)
450 if data:
451 # Third-Party
452 import orjson # pylint: disable=import-outside-toplevel
454 self._hit_count += 1
455 self._redis_hit_count += 1
456 return orjson.loads(data)
457 self._redis_miss_count += 1
458 except Exception as e:
459 logger.warning(f"AdminStatsCache Redis get failed: {e}")
461 # Fall back to in-memory cache
462 with self._lock:
463 entry = self._cache.get(cache_key)
464 if entry and not entry.is_expired():
465 self._hit_count += 1
466 return entry.value
468 self._miss_count += 1
469 return None
471 async def set_teams_list(self, teams: Any, limit: int, offset: int) -> None:
472 """Store teams list in cache.
474 Args:
475 teams: Teams list data
476 limit: Page size
477 offset: Page offset
479 Examples:
480 >>> import asyncio
481 >>> cache = AdminStatsCache()
482 >>> asyncio.run(cache.set_teams_list([{"id": "team1", "name": "Team 1"}], 100, 0))
483 """
484 if not self._enabled:
485 return
487 cache_key = self._get_redis_key("teams", f"{limit}:{offset}")
489 # Store in Redis
490 redis = await self._get_redis_client()
491 if redis:
492 try:
493 # Third-Party
494 import orjson # pylint: disable=import-outside-toplevel
496 await redis.setex(cache_key, self._teams_ttl, orjson.dumps(teams))
497 except Exception as e:
498 logger.warning(f"AdminStatsCache Redis set failed: {e}")
500 # Store in in-memory cache
501 with self._lock:
502 self._cache[cache_key] = CacheEntry(value=teams, expiry=time.time() + self._teams_ttl)
504 async def get_tags(self, entity_types_hash: str) -> Optional[Any]:
505 """Get cached tags listing.
507 Args:
508 entity_types_hash: Hash of entity types filter
510 Returns:
511 Cached tags list or None on cache miss
513 Examples:
514 >>> import asyncio
515 >>> cache = AdminStatsCache()
516 >>> result = asyncio.run(cache.get_tags("all"))
517 >>> result is None
518 True
519 """
520 if not self._enabled:
521 return None
523 cache_key = self._get_redis_key("tags", entity_types_hash)
525 # Try Redis first
526 redis = await self._get_redis_client()
527 if redis:
528 try:
529 data = await redis.get(cache_key)
530 if data:
531 # Third-Party
532 import orjson # pylint: disable=import-outside-toplevel
534 self._hit_count += 1
535 self._redis_hit_count += 1
536 return orjson.loads(data)
537 self._redis_miss_count += 1
538 except Exception as e:
539 logger.warning(f"AdminStatsCache Redis get failed: {e}")
541 # Fall back to in-memory cache
542 with self._lock:
543 entry = self._cache.get(cache_key)
544 if entry and not entry.is_expired():
545 self._hit_count += 1
546 return entry.value
548 self._miss_count += 1
549 return None
551 async def set_tags(self, tags: Any, entity_types_hash: str) -> None:
552 """Store tags listing in cache.
554 Args:
555 tags: Tags list data
556 entity_types_hash: Hash of entity types filter
558 Examples:
559 >>> import asyncio
560 >>> cache = AdminStatsCache()
561 >>> asyncio.run(cache.set_tags([{"name": "api", "count": 10}], "all"))
562 """
563 if not self._enabled:
564 return
566 cache_key = self._get_redis_key("tags", entity_types_hash)
568 # Store in Redis
569 redis = await self._get_redis_client()
570 if redis:
571 try:
572 # Third-Party
573 import orjson # pylint: disable=import-outside-toplevel
575 await redis.setex(cache_key, self._tags_ttl, orjson.dumps(tags))
576 except Exception as e:
577 logger.warning(f"AdminStatsCache Redis set failed: {e}")
579 # Store in in-memory cache
580 with self._lock:
581 self._cache[cache_key] = CacheEntry(value=tags, expiry=time.time() + self._tags_ttl)
583 async def get_plugin_stats(self) -> Optional[Dict[str, Any]]:
584 """Get cached plugin statistics.
586 Returns:
587 Cached plugin stats or None on cache miss
589 Examples:
590 >>> import asyncio
591 >>> cache = AdminStatsCache()
592 >>> result = asyncio.run(cache.get_plugin_stats())
593 >>> result is None
594 True
595 """
596 if not self._enabled:
597 return None
599 cache_key = self._get_redis_key("plugins", "stats")
601 # Try Redis first
602 redis = await self._get_redis_client()
603 if redis:
604 try:
605 data = await redis.get(cache_key)
606 if data:
607 # Third-Party
608 import orjson # pylint: disable=import-outside-toplevel
610 self._hit_count += 1
611 self._redis_hit_count += 1
612 return orjson.loads(data)
613 self._redis_miss_count += 1
614 except Exception as e:
615 logger.warning(f"AdminStatsCache Redis get failed: {e}")
617 # Fall back to in-memory cache
618 with self._lock:
619 entry = self._cache.get(cache_key)
620 if entry and not entry.is_expired():
621 self._hit_count += 1
622 return entry.value
624 self._miss_count += 1
625 return None
627 async def set_plugin_stats(self, stats: Dict[str, Any]) -> None:
628 """Store plugin statistics in cache.
630 Args:
631 stats: Plugin statistics dictionary
633 Examples:
634 >>> import asyncio
635 >>> cache = AdminStatsCache()
636 >>> asyncio.run(cache.set_plugin_stats({"total_plugins": 5}))
637 """
638 if not self._enabled:
639 return
641 cache_key = self._get_redis_key("plugins", "stats")
643 # Store in Redis
644 redis = await self._get_redis_client()
645 if redis:
646 try:
647 # Third-Party
648 import orjson # pylint: disable=import-outside-toplevel
650 await redis.setex(cache_key, self._plugins_ttl, orjson.dumps(stats))
651 except Exception as e:
652 logger.warning(f"AdminStatsCache Redis set failed: {e}")
654 # Store in in-memory cache
655 with self._lock:
656 self._cache[cache_key] = CacheEntry(value=stats, expiry=time.time() + self._plugins_ttl)
658 async def get_performance_history(self, cache_key_suffix: str) -> Optional[Dict[str, Any]]:
659 """Get cached performance aggregates.
661 Args:
662 cache_key_suffix: Cache key suffix with filter params
664 Returns:
665 Cached performance data or None on cache miss
667 Examples:
668 >>> import asyncio
669 >>> cache = AdminStatsCache()
670 >>> result = asyncio.run(cache.get_performance_history("hourly:168"))
671 >>> result is None
672 True
673 """
674 if not self._enabled:
675 return None
677 cache_key = self._get_redis_key("performance", cache_key_suffix)
679 # Try Redis first
680 redis = await self._get_redis_client()
681 if redis:
682 try:
683 data = await redis.get(cache_key)
684 if data:
685 # Third-Party
686 import orjson # pylint: disable=import-outside-toplevel
688 self._hit_count += 1
689 self._redis_hit_count += 1
690 return orjson.loads(data)
691 self._redis_miss_count += 1
692 except Exception as e:
693 logger.warning(f"AdminStatsCache Redis get failed: {e}")
695 # Fall back to in-memory cache
696 with self._lock:
697 entry = self._cache.get(cache_key)
698 if entry and not entry.is_expired():
699 self._hit_count += 1
700 return entry.value
702 self._miss_count += 1
703 return None
705 async def set_performance_history(self, data: Dict[str, Any], cache_key_suffix: str) -> None:
706 """Store performance aggregates in cache.
708 Args:
709 data: Performance data dictionary
710 cache_key_suffix: Cache key suffix with filter params
712 Examples:
713 >>> import asyncio
714 >>> cache = AdminStatsCache()
715 >>> asyncio.run(cache.set_performance_history({"aggregates": []}, "hourly:168"))
716 """
717 if not self._enabled:
718 return
720 cache_key = self._get_redis_key("performance", cache_key_suffix)
722 # Store in Redis
723 redis = await self._get_redis_client()
724 if redis:
725 try:
726 # Third-Party
727 import orjson # pylint: disable=import-outside-toplevel
729 await redis.setex(cache_key, self._performance_ttl, orjson.dumps(data))
730 except Exception as e:
731 logger.warning(f"AdminStatsCache Redis set failed: {e}")
733 # Store in in-memory cache
734 with self._lock:
735 self._cache[cache_key] = CacheEntry(value=data, expiry=time.time() + self._performance_ttl)
737 async def invalidate_system_stats(self) -> None:
738 """Invalidate system stats cache.
740 Examples:
741 >>> import asyncio
742 >>> cache = AdminStatsCache()
743 >>> asyncio.run(cache.invalidate_system_stats())
744 """
745 logger.debug("AdminStatsCache: Invalidating system stats cache")
746 await self._invalidate_prefix("system")
748 async def invalidate_observability_stats(self) -> None:
749 """Invalidate observability stats cache.
751 Examples:
752 >>> import asyncio
753 >>> cache = AdminStatsCache()
754 >>> asyncio.run(cache.invalidate_observability_stats())
755 """
756 logger.debug("AdminStatsCache: Invalidating observability stats cache")
757 await self._invalidate_prefix("observability")
759 async def invalidate_users(self) -> None:
760 """Invalidate users cache.
762 Examples:
763 >>> import asyncio
764 >>> cache = AdminStatsCache()
765 >>> asyncio.run(cache.invalidate_users())
766 """
767 logger.debug("AdminStatsCache: Invalidating users cache")
768 await self._invalidate_prefix("users")
770 async def invalidate_teams(self) -> None:
771 """Invalidate teams cache.
773 Examples:
774 >>> import asyncio
775 >>> cache = AdminStatsCache()
776 >>> asyncio.run(cache.invalidate_teams())
777 """
778 logger.debug("AdminStatsCache: Invalidating teams cache")
779 await self._invalidate_prefix("teams")
781 async def invalidate_tags(self) -> None:
782 """Invalidate tags cache.
784 Examples:
785 >>> import asyncio
786 >>> cache = AdminStatsCache()
787 >>> asyncio.run(cache.invalidate_tags())
788 """
789 logger.debug("AdminStatsCache: Invalidating tags cache")
790 await self._invalidate_prefix("tags")
792 async def invalidate_plugins(self) -> None:
793 """Invalidate plugins cache.
795 Examples:
796 >>> import asyncio
797 >>> cache = AdminStatsCache()
798 >>> asyncio.run(cache.invalidate_plugins())
799 """
800 logger.debug("AdminStatsCache: Invalidating plugins cache")
801 await self._invalidate_prefix("plugins")
803 async def invalidate_performance(self) -> None:
804 """Invalidate performance cache.
806 Examples:
807 >>> import asyncio
808 >>> cache = AdminStatsCache()
809 >>> asyncio.run(cache.invalidate_performance())
810 """
811 logger.debug("AdminStatsCache: Invalidating performance cache")
812 await self._invalidate_prefix("performance")
814 async def _invalidate_prefix(self, prefix: str) -> None:
815 """Invalidate all cache entries with given prefix.
817 Args:
818 prefix: Cache key prefix to invalidate
819 """
820 full_prefix = self._get_redis_key(prefix)
822 # Clear in-memory cache
823 with self._lock:
824 keys_to_remove = [k for k in self._cache if k.startswith(full_prefix)]
825 for key in keys_to_remove:
826 self._cache.pop(key, None)
828 # Clear Redis
829 redis = await self._get_redis_client()
830 if redis:
831 try:
832 pattern = f"{full_prefix}*"
833 async for key in redis.scan_iter(match=pattern):
834 await redis.delete(key)
836 # Publish invalidation for other workers
837 await redis.publish("mcpgw:cache:invalidate", f"admin:{prefix}")
838 except Exception as e:
839 logger.warning(f"AdminStatsCache Redis invalidate failed: {e}")
841 def invalidate_all(self) -> None:
842 """Invalidate all cached data synchronously.
844 Examples:
845 >>> cache = AdminStatsCache()
846 >>> cache.invalidate_all()
847 """
848 with self._lock:
849 self._cache.clear()
850 logger.info("AdminStatsCache: All caches invalidated")
852 def stats(self) -> Dict[str, Any]:
853 """Get cache statistics.
855 Returns:
856 Dictionary with hit/miss counts and hit rate
858 Examples:
859 >>> cache = AdminStatsCache()
860 >>> stats = cache.stats()
861 >>> "hit_count" in stats
862 True
863 """
864 total = self._hit_count + self._miss_count
865 redis_total = self._redis_hit_count + self._redis_miss_count
867 return {
868 "enabled": self._enabled,
869 "hit_count": self._hit_count,
870 "miss_count": self._miss_count,
871 "hit_rate": self._hit_count / total if total > 0 else 0.0,
872 "redis_hit_count": self._redis_hit_count,
873 "redis_miss_count": self._redis_miss_count,
874 "redis_hit_rate": self._redis_hit_count / redis_total if redis_total > 0 else 0.0,
875 "redis_available": self._redis_available,
876 "cache_size": len(self._cache),
877 "ttls": {
878 "system": self._system_ttl,
879 "observability": self._observability_ttl,
880 "users": self._users_ttl,
881 "teams": self._teams_ttl,
882 "tags": self._tags_ttl,
883 "plugins": self._plugins_ttl,
884 "performance": self._performance_ttl,
885 },
886 }
888 def reset_stats(self) -> None:
889 """Reset hit/miss counters.
891 Examples:
892 >>> cache = AdminStatsCache()
893 >>> cache._hit_count = 100
894 >>> cache.reset_stats()
895 >>> cache._hit_count
896 0
897 """
898 self._hit_count = 0
899 self._miss_count = 0
900 self._redis_hit_count = 0
901 self._redis_miss_count = 0
904# Global singleton instance
905_admin_stats_cache: Optional[AdminStatsCache] = None
908def get_admin_stats_cache() -> AdminStatsCache:
909 """Get or create the singleton AdminStatsCache instance.
911 Returns:
912 AdminStatsCache: The singleton admin stats cache instance
914 Examples:
915 >>> cache = get_admin_stats_cache()
916 >>> isinstance(cache, AdminStatsCache)
917 True
918 """
919 global _admin_stats_cache # pylint: disable=global-statement
920 if _admin_stats_cache is None:
921 _admin_stats_cache = AdminStatsCache()
922 return _admin_stats_cache
925# Convenience alias for direct import
926admin_stats_cache = get_admin_stats_cache()