Coverage for mcpgateway / cache / admin_stats_cache.py: 99%

370 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/cache/admin_stats_cache.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5 

6Admin Statistics Cache. 

7 

8This module implements a thread-safe cache for admin dashboard statistics 

9with Redis as the primary store and in-memory fallback. It caches system 

10stats, observability stats, and other frequently-accessed admin data. 

11 

12Performance Impact: 

13 - Before: 10+ COUNT queries per dashboard load 

14 - After: 0 queries (cache hit) per TTL period 

15 - Expected 1000+ queries/hour eliminated 

16 

17Examples: 

18 >>> from mcpgateway.cache.admin_stats_cache import admin_stats_cache 

19 >>> # Cache is used automatically by admin endpoints 

20 >>> import asyncio 

21 >>> # asyncio.run(admin_stats_cache.invalidate_system_stats()) 

22""" 

23 

24# Standard 

25from dataclasses import dataclass 

26import logging 

27import threading 

28import time 

29from typing import Any, Dict, Optional 

30 

31logger = logging.getLogger(__name__) 

32 

33 

34@dataclass 

35class CacheEntry: 

36 """Cache entry with value and expiry timestamp. 

37 

38 Examples: 

39 >>> import time 

40 >>> entry = CacheEntry(value={"total": 100}, expiry=time.time() + 60) 

41 >>> entry.is_expired() 

42 False 

43 """ 

44 

45 value: Any 

46 expiry: float 

47 

48 def is_expired(self) -> bool: 

49 """Check if this cache entry has expired. 

50 

51 Returns: 

52 bool: True if the entry has expired, False otherwise. 

53 """ 

54 return time.time() >= self.expiry 

55 

56 

57class AdminStatsCache: 

58 """Thread-safe admin statistics cache with Redis and in-memory tiers. 

59 

60 This cache reduces database load for admin dashboard by caching: 

61 - System stats (entity counts) 

62 - Observability stats (trace/span counts) 

63 - User/team listings 

64 - Other admin-related aggregations 

65 

66 The cache uses Redis as the primary store for distributed deployments 

67 and falls back to in-memory caching when Redis is unavailable. 

68 

69 Examples: 

70 >>> cache = AdminStatsCache() 

71 >>> cache.stats()["hit_count"] 

72 0 

73 """ 

74 

75 def __init__( 

76 self, 

77 system_ttl: Optional[int] = None, 

78 observability_ttl: Optional[int] = None, 

79 users_ttl: Optional[int] = None, 

80 teams_ttl: Optional[int] = None, 

81 tags_ttl: Optional[int] = None, 

82 plugins_ttl: Optional[int] = None, 

83 performance_ttl: Optional[int] = None, 

84 enabled: Optional[bool] = None, 

85 ): 

86 """Initialize the admin stats cache. 

87 

88 Args: 

89 system_ttl: TTL for system stats cache in seconds (default: 60) 

90 observability_ttl: TTL for observability stats in seconds (default: 30) 

91 users_ttl: TTL for user listings in seconds (default: 30) 

92 teams_ttl: TTL for team listings in seconds (default: 60) 

93 tags_ttl: TTL for tags listing in seconds (default: 120) 

94 plugins_ttl: TTL for plugin stats in seconds (default: 120) 

95 performance_ttl: TTL for performance aggregates in seconds (default: 60) 

96 enabled: Whether caching is enabled (default: True) 

97 

98 Examples: 

99 >>> cache = AdminStatsCache(system_ttl=120) 

100 >>> cache._system_ttl 

101 120 

102 """ 

103 # Import settings lazily to avoid circular imports 

104 try: 

105 # First-Party 

106 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel 

107 

108 self._system_ttl = system_ttl or getattr(settings, "admin_stats_cache_system_ttl", 60) 

109 self._observability_ttl = observability_ttl or getattr(settings, "admin_stats_cache_observability_ttl", 30) 

110 self._users_ttl = users_ttl or getattr(settings, "admin_stats_cache_users_ttl", 30) 

111 self._teams_ttl = teams_ttl or getattr(settings, "admin_stats_cache_teams_ttl", 60) 

112 self._tags_ttl = tags_ttl or getattr(settings, "admin_stats_cache_tags_ttl", 120) 

113 self._plugins_ttl = plugins_ttl or getattr(settings, "admin_stats_cache_plugins_ttl", 120) 

114 self._performance_ttl = performance_ttl or getattr(settings, "admin_stats_cache_performance_ttl", 60) 

115 self._enabled = enabled if enabled is not None else getattr(settings, "admin_stats_cache_enabled", True) 

116 self._cache_prefix = getattr(settings, "cache_prefix", "mcpgw:") 

117 except ImportError: 

118 self._system_ttl = system_ttl or 60 

119 self._observability_ttl = observability_ttl or 30 

120 self._users_ttl = users_ttl or 30 

121 self._teams_ttl = teams_ttl or 60 

122 self._tags_ttl = tags_ttl or 120 

123 self._plugins_ttl = plugins_ttl or 120 

124 self._performance_ttl = performance_ttl or 60 

125 self._enabled = enabled if enabled is not None else True 

126 self._cache_prefix = "mcpgw:" 

127 

128 # In-memory cache (fallback when Redis unavailable) 

129 self._cache: Dict[str, CacheEntry] = {} 

130 

131 # Thread safety 

132 self._lock = threading.Lock() 

133 

134 # Redis availability 

135 self._redis_checked = False 

136 self._redis_available = False 

137 

138 # Statistics 

139 self._hit_count = 0 

140 self._miss_count = 0 

141 self._redis_hit_count = 0 

142 self._redis_miss_count = 0 

143 

144 logger.info(f"AdminStatsCache initialized: enabled={self._enabled}, " f"system_ttl={self._system_ttl}s, observability_ttl={self._observability_ttl}s, tags_ttl={self._tags_ttl}s") 

145 

146 def _get_redis_key(self, key_type: str, identifier: str = "") -> str: 

147 """Generate Redis key with proper prefix. 

148 

149 Args: 

150 key_type: Type of cache entry (system, observability, users, teams) 

151 identifier: Optional identifier suffix 

152 

153 Returns: 

154 Full Redis key with prefix 

155 

156 Examples: 

157 >>> cache = AdminStatsCache() 

158 >>> cache._get_redis_key("system", "comprehensive") 

159 'mcpgw:admin:system:comprehensive' 

160 """ 

161 if identifier: 

162 return f"{self._cache_prefix}admin:{key_type}:{identifier}" 

163 return f"{self._cache_prefix}admin:{key_type}" 

164 

165 async def _get_redis_client(self): 

166 """Get Redis client if available. 

167 

168 Returns: 

169 Redis client or None if unavailable. 

170 """ 

171 try: 

172 # First-Party 

173 from mcpgateway.utils.redis_client import get_redis_client # pylint: disable=import-outside-toplevel 

174 

175 client = await get_redis_client() 

176 if client and not self._redis_checked: 

177 self._redis_checked = True 

178 self._redis_available = True 

179 logger.debug("AdminStatsCache: Redis client available") 

180 return client 

181 except Exception as e: 

182 if not self._redis_checked: 182 ↛ 186line 182 didn't jump to line 186 because the condition on line 182 was always true

183 self._redis_checked = True 

184 self._redis_available = False 

185 logger.debug(f"AdminStatsCache: Redis unavailable, using in-memory cache: {e}") 

186 return None 

187 

188 async def get_system_stats(self) -> Optional[Dict[str, Any]]: 

189 """Get cached system statistics. 

190 

191 Returns: 

192 Cached system stats or None on cache miss 

193 

194 Examples: 

195 >>> import asyncio 

196 >>> cache = AdminStatsCache() 

197 >>> result = asyncio.run(cache.get_system_stats()) 

198 >>> result is None # Cache miss on fresh cache 

199 True 

200 """ 

201 if not self._enabled: 

202 return None 

203 

204 cache_key = self._get_redis_key("system", "comprehensive") 

205 

206 # Try Redis first 

207 redis = await self._get_redis_client() 

208 if redis: 

209 try: 

210 data = await redis.get(cache_key) 

211 if data: 

212 # Third-Party 

213 import orjson # pylint: disable=import-outside-toplevel 

214 

215 self._hit_count += 1 

216 self._redis_hit_count += 1 

217 return orjson.loads(data) 

218 self._redis_miss_count += 1 

219 except Exception as e: 

220 logger.warning(f"AdminStatsCache Redis get failed: {e}") 

221 

222 # Fall back to in-memory cache 

223 with self._lock: 

224 entry = self._cache.get(cache_key) 

225 if entry and not entry.is_expired(): 

226 self._hit_count += 1 

227 return entry.value 

228 

229 self._miss_count += 1 

230 return None 

231 

232 async def set_system_stats(self, stats: Dict[str, Any]) -> None: 

233 """Store system statistics in cache. 

234 

235 Args: 

236 stats: System statistics dictionary 

237 

238 Examples: 

239 >>> import asyncio 

240 >>> cache = AdminStatsCache() 

241 >>> asyncio.run(cache.set_system_stats({"tools": 10, "prompts": 5})) 

242 """ 

243 if not self._enabled: 

244 return 

245 

246 cache_key = self._get_redis_key("system", "comprehensive") 

247 

248 # Store in Redis 

249 redis = await self._get_redis_client() 

250 if redis: 

251 try: 

252 # Third-Party 

253 import orjson # pylint: disable=import-outside-toplevel 

254 

255 await redis.setex(cache_key, self._system_ttl, orjson.dumps(stats)) 

256 except Exception as e: 

257 logger.warning(f"AdminStatsCache Redis set failed: {e}") 

258 

259 # Store in in-memory cache 

260 with self._lock: 

261 self._cache[cache_key] = CacheEntry(value=stats, expiry=time.time() + self._system_ttl) 

262 

263 async def get_observability_stats(self, hours: int = 24) -> Optional[Dict[str, Any]]: 

264 """Get cached observability statistics. 

265 

266 Args: 

267 hours: Time range in hours for stats 

268 

269 Returns: 

270 Cached observability stats or None on cache miss 

271 

272 Examples: 

273 >>> import asyncio 

274 >>> cache = AdminStatsCache() 

275 >>> result = asyncio.run(cache.get_observability_stats(24)) 

276 >>> result is None 

277 True 

278 """ 

279 if not self._enabled: 

280 return None 

281 

282 cache_key = self._get_redis_key("observability", str(hours)) 

283 

284 # Try Redis first 

285 redis = await self._get_redis_client() 

286 if redis: 

287 try: 

288 data = await redis.get(cache_key) 

289 if data: 

290 # Third-Party 

291 import orjson # pylint: disable=import-outside-toplevel 

292 

293 self._hit_count += 1 

294 self._redis_hit_count += 1 

295 return orjson.loads(data) 

296 self._redis_miss_count += 1 

297 except Exception as e: 

298 logger.warning(f"AdminStatsCache Redis get failed: {e}") 

299 

300 # Fall back to in-memory cache 

301 with self._lock: 

302 entry = self._cache.get(cache_key) 

303 if entry and not entry.is_expired(): 

304 self._hit_count += 1 

305 return entry.value 

306 

307 self._miss_count += 1 

308 return None 

309 

310 async def set_observability_stats(self, stats: Dict[str, Any], hours: int = 24) -> None: 

311 """Store observability statistics in cache. 

312 

313 Args: 

314 stats: Observability statistics dictionary 

315 hours: Time range in hours for stats 

316 

317 Examples: 

318 >>> import asyncio 

319 >>> cache = AdminStatsCache() 

320 >>> asyncio.run(cache.set_observability_stats({"total_traces": 100}, 24)) 

321 """ 

322 if not self._enabled: 

323 return 

324 

325 cache_key = self._get_redis_key("observability", str(hours)) 

326 

327 # Store in Redis 

328 redis = await self._get_redis_client() 

329 if redis: 

330 try: 

331 # Third-Party 

332 import orjson # pylint: disable=import-outside-toplevel 

333 

334 await redis.setex(cache_key, self._observability_ttl, orjson.dumps(stats)) 

335 except Exception as e: 

336 logger.warning(f"AdminStatsCache Redis set failed: {e}") 

337 

338 # Store in in-memory cache 

339 with self._lock: 

340 self._cache[cache_key] = CacheEntry(value=stats, expiry=time.time() + self._observability_ttl) 

341 

342 async def get_users_list(self, limit: int, offset: int) -> Optional[Any]: 

343 """Get cached users list. 

344 

345 Args: 

346 limit: Page size 

347 offset: Page offset 

348 

349 Returns: 

350 Cached users list or None on cache miss 

351 

352 Examples: 

353 >>> import asyncio 

354 >>> cache = AdminStatsCache() 

355 >>> result = asyncio.run(cache.get_users_list(100, 0)) 

356 >>> result is None 

357 True 

358 """ 

359 if not self._enabled: 

360 return None 

361 

362 cache_key = self._get_redis_key("users", f"{limit}:{offset}") 

363 

364 # Try Redis first 

365 redis = await self._get_redis_client() 

366 if redis: 

367 try: 

368 data = await redis.get(cache_key) 

369 if data: 

370 # Third-Party 

371 import orjson # pylint: disable=import-outside-toplevel 

372 

373 self._hit_count += 1 

374 self._redis_hit_count += 1 

375 return orjson.loads(data) 

376 self._redis_miss_count += 1 

377 except Exception as e: 

378 logger.warning(f"AdminStatsCache Redis get failed: {e}") 

379 

380 # Fall back to in-memory cache 

381 with self._lock: 

382 entry = self._cache.get(cache_key) 

383 if entry and not entry.is_expired(): 

384 self._hit_count += 1 

385 return entry.value 

386 

387 self._miss_count += 1 

388 return None 

389 

390 async def set_users_list(self, users: Any, limit: int, offset: int) -> None: 

391 """Store users list in cache. 

392 

393 Args: 

394 users: Users list data 

395 limit: Page size 

396 offset: Page offset 

397 

398 Examples: 

399 >>> import asyncio 

400 >>> cache = AdminStatsCache() 

401 >>> asyncio.run(cache.set_users_list([{"email": "test@example.com"}], 100, 0)) 

402 """ 

403 if not self._enabled: 

404 return 

405 

406 cache_key = self._get_redis_key("users", f"{limit}:{offset}") 

407 

408 # Store in Redis 

409 redis = await self._get_redis_client() 

410 if redis: 

411 try: 

412 # Third-Party 

413 import orjson # pylint: disable=import-outside-toplevel 

414 

415 await redis.setex(cache_key, self._users_ttl, orjson.dumps(users)) 

416 except Exception as e: 

417 logger.warning(f"AdminStatsCache Redis set failed: {e}") 

418 

419 # Store in in-memory cache 

420 with self._lock: 

421 self._cache[cache_key] = CacheEntry(value=users, expiry=time.time() + self._users_ttl) 

422 

423 async def get_teams_list(self, limit: int, offset: int) -> Optional[Any]: 

424 """Get cached teams list. 

425 

426 Args: 

427 limit: Page size 

428 offset: Page offset 

429 

430 Returns: 

431 Cached teams list or None on cache miss 

432 

433 Examples: 

434 >>> import asyncio 

435 >>> cache = AdminStatsCache() 

436 >>> result = asyncio.run(cache.get_teams_list(100, 0)) 

437 >>> result is None 

438 True 

439 """ 

440 if not self._enabled: 

441 return None 

442 

443 cache_key = self._get_redis_key("teams", f"{limit}:{offset}") 

444 

445 # Try Redis first 

446 redis = await self._get_redis_client() 

447 if redis: 

448 try: 

449 data = await redis.get(cache_key) 

450 if data: 

451 # Third-Party 

452 import orjson # pylint: disable=import-outside-toplevel 

453 

454 self._hit_count += 1 

455 self._redis_hit_count += 1 

456 return orjson.loads(data) 

457 self._redis_miss_count += 1 

458 except Exception as e: 

459 logger.warning(f"AdminStatsCache Redis get failed: {e}") 

460 

461 # Fall back to in-memory cache 

462 with self._lock: 

463 entry = self._cache.get(cache_key) 

464 if entry and not entry.is_expired(): 

465 self._hit_count += 1 

466 return entry.value 

467 

468 self._miss_count += 1 

469 return None 

470 

471 async def set_teams_list(self, teams: Any, limit: int, offset: int) -> None: 

472 """Store teams list in cache. 

473 

474 Args: 

475 teams: Teams list data 

476 limit: Page size 

477 offset: Page offset 

478 

479 Examples: 

480 >>> import asyncio 

481 >>> cache = AdminStatsCache() 

482 >>> asyncio.run(cache.set_teams_list([{"id": "team1", "name": "Team 1"}], 100, 0)) 

483 """ 

484 if not self._enabled: 

485 return 

486 

487 cache_key = self._get_redis_key("teams", f"{limit}:{offset}") 

488 

489 # Store in Redis 

490 redis = await self._get_redis_client() 

491 if redis: 

492 try: 

493 # Third-Party 

494 import orjson # pylint: disable=import-outside-toplevel 

495 

496 await redis.setex(cache_key, self._teams_ttl, orjson.dumps(teams)) 

497 except Exception as e: 

498 logger.warning(f"AdminStatsCache Redis set failed: {e}") 

499 

500 # Store in in-memory cache 

501 with self._lock: 

502 self._cache[cache_key] = CacheEntry(value=teams, expiry=time.time() + self._teams_ttl) 

503 

504 async def get_tags(self, entity_types_hash: str) -> Optional[Any]: 

505 """Get cached tags listing. 

506 

507 Args: 

508 entity_types_hash: Hash of entity types filter 

509 

510 Returns: 

511 Cached tags list or None on cache miss 

512 

513 Examples: 

514 >>> import asyncio 

515 >>> cache = AdminStatsCache() 

516 >>> result = asyncio.run(cache.get_tags("all")) 

517 >>> result is None 

518 True 

519 """ 

520 if not self._enabled: 

521 return None 

522 

523 cache_key = self._get_redis_key("tags", entity_types_hash) 

524 

525 # Try Redis first 

526 redis = await self._get_redis_client() 

527 if redis: 

528 try: 

529 data = await redis.get(cache_key) 

530 if data: 

531 # Third-Party 

532 import orjson # pylint: disable=import-outside-toplevel 

533 

534 self._hit_count += 1 

535 self._redis_hit_count += 1 

536 return orjson.loads(data) 

537 self._redis_miss_count += 1 

538 except Exception as e: 

539 logger.warning(f"AdminStatsCache Redis get failed: {e}") 

540 

541 # Fall back to in-memory cache 

542 with self._lock: 

543 entry = self._cache.get(cache_key) 

544 if entry and not entry.is_expired(): 

545 self._hit_count += 1 

546 return entry.value 

547 

548 self._miss_count += 1 

549 return None 

550 

551 async def set_tags(self, tags: Any, entity_types_hash: str) -> None: 

552 """Store tags listing in cache. 

553 

554 Args: 

555 tags: Tags list data 

556 entity_types_hash: Hash of entity types filter 

557 

558 Examples: 

559 >>> import asyncio 

560 >>> cache = AdminStatsCache() 

561 >>> asyncio.run(cache.set_tags([{"name": "api", "count": 10}], "all")) 

562 """ 

563 if not self._enabled: 

564 return 

565 

566 cache_key = self._get_redis_key("tags", entity_types_hash) 

567 

568 # Store in Redis 

569 redis = await self._get_redis_client() 

570 if redis: 

571 try: 

572 # Third-Party 

573 import orjson # pylint: disable=import-outside-toplevel 

574 

575 await redis.setex(cache_key, self._tags_ttl, orjson.dumps(tags)) 

576 except Exception as e: 

577 logger.warning(f"AdminStatsCache Redis set failed: {e}") 

578 

579 # Store in in-memory cache 

580 with self._lock: 

581 self._cache[cache_key] = CacheEntry(value=tags, expiry=time.time() + self._tags_ttl) 

582 

583 async def get_plugin_stats(self) -> Optional[Dict[str, Any]]: 

584 """Get cached plugin statistics. 

585 

586 Returns: 

587 Cached plugin stats or None on cache miss 

588 

589 Examples: 

590 >>> import asyncio 

591 >>> cache = AdminStatsCache() 

592 >>> result = asyncio.run(cache.get_plugin_stats()) 

593 >>> result is None 

594 True 

595 """ 

596 if not self._enabled: 

597 return None 

598 

599 cache_key = self._get_redis_key("plugins", "stats") 

600 

601 # Try Redis first 

602 redis = await self._get_redis_client() 

603 if redis: 

604 try: 

605 data = await redis.get(cache_key) 

606 if data: 

607 # Third-Party 

608 import orjson # pylint: disable=import-outside-toplevel 

609 

610 self._hit_count += 1 

611 self._redis_hit_count += 1 

612 return orjson.loads(data) 

613 self._redis_miss_count += 1 

614 except Exception as e: 

615 logger.warning(f"AdminStatsCache Redis get failed: {e}") 

616 

617 # Fall back to in-memory cache 

618 with self._lock: 

619 entry = self._cache.get(cache_key) 

620 if entry and not entry.is_expired(): 

621 self._hit_count += 1 

622 return entry.value 

623 

624 self._miss_count += 1 

625 return None 

626 

627 async def set_plugin_stats(self, stats: Dict[str, Any]) -> None: 

628 """Store plugin statistics in cache. 

629 

630 Args: 

631 stats: Plugin statistics dictionary 

632 

633 Examples: 

634 >>> import asyncio 

635 >>> cache = AdminStatsCache() 

636 >>> asyncio.run(cache.set_plugin_stats({"total_plugins": 5})) 

637 """ 

638 if not self._enabled: 

639 return 

640 

641 cache_key = self._get_redis_key("plugins", "stats") 

642 

643 # Store in Redis 

644 redis = await self._get_redis_client() 

645 if redis: 

646 try: 

647 # Third-Party 

648 import orjson # pylint: disable=import-outside-toplevel 

649 

650 await redis.setex(cache_key, self._plugins_ttl, orjson.dumps(stats)) 

651 except Exception as e: 

652 logger.warning(f"AdminStatsCache Redis set failed: {e}") 

653 

654 # Store in in-memory cache 

655 with self._lock: 

656 self._cache[cache_key] = CacheEntry(value=stats, expiry=time.time() + self._plugins_ttl) 

657 

658 async def get_performance_history(self, cache_key_suffix: str) -> Optional[Dict[str, Any]]: 

659 """Get cached performance aggregates. 

660 

661 Args: 

662 cache_key_suffix: Cache key suffix with filter params 

663 

664 Returns: 

665 Cached performance data or None on cache miss 

666 

667 Examples: 

668 >>> import asyncio 

669 >>> cache = AdminStatsCache() 

670 >>> result = asyncio.run(cache.get_performance_history("hourly:168")) 

671 >>> result is None 

672 True 

673 """ 

674 if not self._enabled: 

675 return None 

676 

677 cache_key = self._get_redis_key("performance", cache_key_suffix) 

678 

679 # Try Redis first 

680 redis = await self._get_redis_client() 

681 if redis: 

682 try: 

683 data = await redis.get(cache_key) 

684 if data: 

685 # Third-Party 

686 import orjson # pylint: disable=import-outside-toplevel 

687 

688 self._hit_count += 1 

689 self._redis_hit_count += 1 

690 return orjson.loads(data) 

691 self._redis_miss_count += 1 

692 except Exception as e: 

693 logger.warning(f"AdminStatsCache Redis get failed: {e}") 

694 

695 # Fall back to in-memory cache 

696 with self._lock: 

697 entry = self._cache.get(cache_key) 

698 if entry and not entry.is_expired(): 

699 self._hit_count += 1 

700 return entry.value 

701 

702 self._miss_count += 1 

703 return None 

704 

705 async def set_performance_history(self, data: Dict[str, Any], cache_key_suffix: str) -> None: 

706 """Store performance aggregates in cache. 

707 

708 Args: 

709 data: Performance data dictionary 

710 cache_key_suffix: Cache key suffix with filter params 

711 

712 Examples: 

713 >>> import asyncio 

714 >>> cache = AdminStatsCache() 

715 >>> asyncio.run(cache.set_performance_history({"aggregates": []}, "hourly:168")) 

716 """ 

717 if not self._enabled: 

718 return 

719 

720 cache_key = self._get_redis_key("performance", cache_key_suffix) 

721 

722 # Store in Redis 

723 redis = await self._get_redis_client() 

724 if redis: 

725 try: 

726 # Third-Party 

727 import orjson # pylint: disable=import-outside-toplevel 

728 

729 await redis.setex(cache_key, self._performance_ttl, orjson.dumps(data)) 

730 except Exception as e: 

731 logger.warning(f"AdminStatsCache Redis set failed: {e}") 

732 

733 # Store in in-memory cache 

734 with self._lock: 

735 self._cache[cache_key] = CacheEntry(value=data, expiry=time.time() + self._performance_ttl) 

736 

737 async def invalidate_system_stats(self) -> None: 

738 """Invalidate system stats cache. 

739 

740 Examples: 

741 >>> import asyncio 

742 >>> cache = AdminStatsCache() 

743 >>> asyncio.run(cache.invalidate_system_stats()) 

744 """ 

745 logger.debug("AdminStatsCache: Invalidating system stats cache") 

746 await self._invalidate_prefix("system") 

747 

748 async def invalidate_observability_stats(self) -> None: 

749 """Invalidate observability stats cache. 

750 

751 Examples: 

752 >>> import asyncio 

753 >>> cache = AdminStatsCache() 

754 >>> asyncio.run(cache.invalidate_observability_stats()) 

755 """ 

756 logger.debug("AdminStatsCache: Invalidating observability stats cache") 

757 await self._invalidate_prefix("observability") 

758 

759 async def invalidate_users(self) -> None: 

760 """Invalidate users cache. 

761 

762 Examples: 

763 >>> import asyncio 

764 >>> cache = AdminStatsCache() 

765 >>> asyncio.run(cache.invalidate_users()) 

766 """ 

767 logger.debug("AdminStatsCache: Invalidating users cache") 

768 await self._invalidate_prefix("users") 

769 

770 async def invalidate_teams(self) -> None: 

771 """Invalidate teams cache. 

772 

773 Examples: 

774 >>> import asyncio 

775 >>> cache = AdminStatsCache() 

776 >>> asyncio.run(cache.invalidate_teams()) 

777 """ 

778 logger.debug("AdminStatsCache: Invalidating teams cache") 

779 await self._invalidate_prefix("teams") 

780 

781 async def invalidate_tags(self) -> None: 

782 """Invalidate tags cache. 

783 

784 Examples: 

785 >>> import asyncio 

786 >>> cache = AdminStatsCache() 

787 >>> asyncio.run(cache.invalidate_tags()) 

788 """ 

789 logger.debug("AdminStatsCache: Invalidating tags cache") 

790 await self._invalidate_prefix("tags") 

791 

792 async def invalidate_plugins(self) -> None: 

793 """Invalidate plugins cache. 

794 

795 Examples: 

796 >>> import asyncio 

797 >>> cache = AdminStatsCache() 

798 >>> asyncio.run(cache.invalidate_plugins()) 

799 """ 

800 logger.debug("AdminStatsCache: Invalidating plugins cache") 

801 await self._invalidate_prefix("plugins") 

802 

803 async def invalidate_performance(self) -> None: 

804 """Invalidate performance cache. 

805 

806 Examples: 

807 >>> import asyncio 

808 >>> cache = AdminStatsCache() 

809 >>> asyncio.run(cache.invalidate_performance()) 

810 """ 

811 logger.debug("AdminStatsCache: Invalidating performance cache") 

812 await self._invalidate_prefix("performance") 

813 

814 async def _invalidate_prefix(self, prefix: str) -> None: 

815 """Invalidate all cache entries with given prefix. 

816 

817 Args: 

818 prefix: Cache key prefix to invalidate 

819 """ 

820 full_prefix = self._get_redis_key(prefix) 

821 

822 # Clear in-memory cache 

823 with self._lock: 

824 keys_to_remove = [k for k in self._cache if k.startswith(full_prefix)] 

825 for key in keys_to_remove: 

826 self._cache.pop(key, None) 

827 

828 # Clear Redis 

829 redis = await self._get_redis_client() 

830 if redis: 

831 try: 

832 pattern = f"{full_prefix}*" 

833 async for key in redis.scan_iter(match=pattern): 

834 await redis.delete(key) 

835 

836 # Publish invalidation for other workers 

837 await redis.publish("mcpgw:cache:invalidate", f"admin:{prefix}") 

838 except Exception as e: 

839 logger.warning(f"AdminStatsCache Redis invalidate failed: {e}") 

840 

841 def invalidate_all(self) -> None: 

842 """Invalidate all cached data synchronously. 

843 

844 Examples: 

845 >>> cache = AdminStatsCache() 

846 >>> cache.invalidate_all() 

847 """ 

848 with self._lock: 

849 self._cache.clear() 

850 logger.info("AdminStatsCache: All caches invalidated") 

851 

852 def stats(self) -> Dict[str, Any]: 

853 """Get cache statistics. 

854 

855 Returns: 

856 Dictionary with hit/miss counts and hit rate 

857 

858 Examples: 

859 >>> cache = AdminStatsCache() 

860 >>> stats = cache.stats() 

861 >>> "hit_count" in stats 

862 True 

863 """ 

864 total = self._hit_count + self._miss_count 

865 redis_total = self._redis_hit_count + self._redis_miss_count 

866 

867 return { 

868 "enabled": self._enabled, 

869 "hit_count": self._hit_count, 

870 "miss_count": self._miss_count, 

871 "hit_rate": self._hit_count / total if total > 0 else 0.0, 

872 "redis_hit_count": self._redis_hit_count, 

873 "redis_miss_count": self._redis_miss_count, 

874 "redis_hit_rate": self._redis_hit_count / redis_total if redis_total > 0 else 0.0, 

875 "redis_available": self._redis_available, 

876 "cache_size": len(self._cache), 

877 "ttls": { 

878 "system": self._system_ttl, 

879 "observability": self._observability_ttl, 

880 "users": self._users_ttl, 

881 "teams": self._teams_ttl, 

882 "tags": self._tags_ttl, 

883 "plugins": self._plugins_ttl, 

884 "performance": self._performance_ttl, 

885 }, 

886 } 

887 

888 def reset_stats(self) -> None: 

889 """Reset hit/miss counters. 

890 

891 Examples: 

892 >>> cache = AdminStatsCache() 

893 >>> cache._hit_count = 100 

894 >>> cache.reset_stats() 

895 >>> cache._hit_count 

896 0 

897 """ 

898 self._hit_count = 0 

899 self._miss_count = 0 

900 self._redis_hit_count = 0 

901 self._redis_miss_count = 0 

902 

903 

904# Global singleton instance 

905_admin_stats_cache: Optional[AdminStatsCache] = None 

906 

907 

908def get_admin_stats_cache() -> AdminStatsCache: 

909 """Get or create the singleton AdminStatsCache instance. 

910 

911 Returns: 

912 AdminStatsCache: The singleton admin stats cache instance 

913 

914 Examples: 

915 >>> cache = get_admin_stats_cache() 

916 >>> isinstance(cache, AdminStatsCache) 

917 True 

918 """ 

919 global _admin_stats_cache # pylint: disable=global-statement 

920 if _admin_stats_cache is None: 

921 _admin_stats_cache = AdminStatsCache() 

922 return _admin_stats_cache 

923 

924 

925# Convenience alias for direct import 

926admin_stats_cache = get_admin_stats_cache()