Coverage for mcpgateway / cache / registry_cache.py: 99%

308 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/cache/registry_cache.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5 

6Registry Data Cache. 

7 

8This module implements a thread-safe cache for registry data (tools, prompts, 

9resources, agents, servers, gateways) with Redis as the primary store and 

10in-memory fallback. It reduces database queries for list endpoints. 

11 

12Performance Impact: 

13 - Before: 1-2 DB queries per list request 

14 - After: 0 DB queries (cache hit) per TTL period 

15 - Expected 95%+ cache hit rate under load 

16 

17Examples: 

18 >>> from mcpgateway.cache.registry_cache import registry_cache 

19 >>> # Cache is used automatically by list endpoints 

20 >>> # Manual invalidation after tool update: 

21 >>> import asyncio 

22 >>> # asyncio.run(registry_cache.invalidate_tools()) 

23""" 

24 

25# Standard 

26import asyncio 

27from dataclasses import dataclass 

28import hashlib 

29import logging 

30import threading 

31import time 

32from typing import Any, Dict, Optional 

33 

34logger = logging.getLogger(__name__) 

35 

36 

37def _get_cleanup_timeout() -> float: 

38 """Get cleanup timeout from config (lazy import to avoid circular deps). 

39 

40 Returns: 

41 Cleanup timeout in seconds (default: 5.0). 

42 """ 

43 try: 

44 # First-Party 

45 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel 

46 

47 return settings.mcp_session_pool_cleanup_timeout 

48 except Exception: 

49 return 5.0 

50 

51 

52@dataclass 

53class CacheEntry: 

54 """Cache entry with value and expiry timestamp. 

55 

56 Examples: 

57 >>> import time 

58 >>> entry = CacheEntry(value=["item1", "item2"], expiry=time.time() + 60) 

59 >>> entry.is_expired() 

60 False 

61 """ 

62 

63 value: Any 

64 expiry: float 

65 

66 def is_expired(self) -> bool: 

67 """Check if this cache entry has expired. 

68 

69 Returns: 

70 bool: True if the entry has expired, False otherwise. 

71 """ 

72 return time.time() >= self.expiry 

73 

74 

75@dataclass 

76class RegistryCacheConfig: 

77 """Configuration for registry cache TTLs. 

78 

79 Attributes: 

80 enabled: Whether caching is enabled 

81 tools_ttl: TTL in seconds for tools list cache 

82 prompts_ttl: TTL in seconds for prompts list cache 

83 resources_ttl: TTL in seconds for resources list cache 

84 agents_ttl: TTL in seconds for agents list cache 

85 servers_ttl: TTL in seconds for servers list cache 

86 gateways_ttl: TTL in seconds for gateways list cache 

87 catalog_ttl: TTL in seconds for catalog servers list cache 

88 

89 Examples: 

90 >>> config = RegistryCacheConfig() 

91 >>> config.tools_ttl 

92 20 

93 """ 

94 

95 enabled: bool = True 

96 tools_ttl: int = 20 

97 prompts_ttl: int = 15 

98 resources_ttl: int = 15 

99 agents_ttl: int = 20 

100 servers_ttl: int = 20 

101 gateways_ttl: int = 20 

102 catalog_ttl: int = 300 

103 

104 

105class RegistryCache: 

106 """Thread-safe registry cache with Redis and in-memory tiers. 

107 

108 This cache reduces database load for list endpoints by caching: 

109 - Tools list 

110 - Prompts list 

111 - Resources list 

112 - A2A Agents list 

113 - Servers list 

114 - Gateways list 

115 - Catalog servers list 

116 

117 The cache uses Redis as the primary store for distributed deployments 

118 and falls back to in-memory caching when Redis is unavailable. 

119 

120 Examples: 

121 >>> cache = RegistryCache() 

122 >>> cache.stats()["hit_count"] 

123 0 

124 """ 

125 

126 def __init__(self, config: Optional[RegistryCacheConfig] = None): 

127 """Initialize the registry cache. 

128 

129 Args: 

130 config: Cache configuration. If None, loads from settings. 

131 

132 Examples: 

133 >>> cache = RegistryCache() 

134 >>> cache._enabled 

135 True 

136 """ 

137 # Import settings lazily to avoid circular imports 

138 try: 

139 # First-Party 

140 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel 

141 

142 self._enabled = getattr(settings, "registry_cache_enabled", True) 

143 self._tools_ttl = getattr(settings, "registry_cache_tools_ttl", 20) 

144 self._prompts_ttl = getattr(settings, "registry_cache_prompts_ttl", 15) 

145 self._resources_ttl = getattr(settings, "registry_cache_resources_ttl", 15) 

146 self._agents_ttl = getattr(settings, "registry_cache_agents_ttl", 20) 

147 self._servers_ttl = getattr(settings, "registry_cache_servers_ttl", 20) 

148 self._gateways_ttl = getattr(settings, "registry_cache_gateways_ttl", 20) 

149 self._catalog_ttl = getattr(settings, "registry_cache_catalog_ttl", 300) 

150 self._cache_prefix = getattr(settings, "cache_prefix", "mcpgw:") 

151 except ImportError: 

152 cfg = config or RegistryCacheConfig() 

153 self._enabled = cfg.enabled 

154 self._tools_ttl = cfg.tools_ttl 

155 self._prompts_ttl = cfg.prompts_ttl 

156 self._resources_ttl = cfg.resources_ttl 

157 self._agents_ttl = cfg.agents_ttl 

158 self._servers_ttl = cfg.servers_ttl 

159 self._gateways_ttl = cfg.gateways_ttl 

160 self._catalog_ttl = cfg.catalog_ttl 

161 self._cache_prefix = "mcpgw:" 

162 

163 # In-memory cache (fallback when Redis unavailable) 

164 self._cache: Dict[str, CacheEntry] = {} 

165 

166 # Thread safety 

167 self._lock = threading.Lock() 

168 

169 # Redis availability (None = not checked yet) 

170 self._redis_checked = False 

171 self._redis_available = False 

172 

173 # Statistics 

174 self._hit_count = 0 

175 self._miss_count = 0 

176 self._redis_hit_count = 0 

177 self._redis_miss_count = 0 

178 

179 logger.info( 

180 f"RegistryCache initialized: enabled={self._enabled}, " 

181 f"tools_ttl={self._tools_ttl}s, prompts_ttl={self._prompts_ttl}s, " 

182 f"resources_ttl={self._resources_ttl}s, agents_ttl={self._agents_ttl}s, " 

183 f"catalog_ttl={self._catalog_ttl}s" 

184 ) 

185 

186 def _get_redis_key(self, cache_type: str, filters_hash: str = "") -> str: 

187 """Generate Redis key with proper prefix. 

188 

189 Args: 

190 cache_type: Type of cache entry (tools, prompts, etc.) 

191 filters_hash: Hash of filter parameters 

192 

193 Returns: 

194 Full Redis key with prefix 

195 

196 Examples: 

197 >>> cache = RegistryCache() 

198 >>> cache._get_redis_key("tools", "abc123") 

199 'mcpgw:registry:tools:abc123' 

200 """ 

201 if filters_hash: 

202 return f"{self._cache_prefix}registry:{cache_type}:{filters_hash}" 

203 return f"{self._cache_prefix}registry:{cache_type}" 

204 

205 def hash_filters(self, **kwargs) -> str: 

206 """Generate a hash from filter parameters. 

207 

208 Args: 

209 **kwargs: Filter parameters to hash 

210 

211 Returns: 

212 MD5 hash of the filter parameters 

213 

214 Examples: 

215 >>> cache = RegistryCache() 

216 >>> h = cache.hash_filters(include_inactive=False, tags=["api"]) 

217 >>> len(h) 

218 32 

219 """ 

220 # Sort keys for consistent hashing 

221 sorted_items = sorted(kwargs.items()) 

222 filter_str = str(sorted_items) 

223 return hashlib.md5(filter_str.encode()).hexdigest() # nosec B324 # noqa: DUO130 

224 

225 async def _get_redis_client(self): 

226 """Get Redis client if available. 

227 

228 Returns: 

229 Redis client or None if unavailable. 

230 """ 

231 try: 

232 # First-Party 

233 from mcpgateway.utils.redis_client import get_redis_client # pylint: disable=import-outside-toplevel 

234 

235 client = await get_redis_client() 

236 if client and not self._redis_checked: 

237 self._redis_checked = True 

238 self._redis_available = True 

239 logger.debug("RegistryCache: Redis client available") 

240 return client 

241 except Exception as e: 

242 if not self._redis_checked: 242 ↛ 246line 242 didn't jump to line 246 because the condition on line 242 was always true

243 self._redis_checked = True 

244 self._redis_available = False 

245 logger.debug(f"RegistryCache: Redis unavailable, using in-memory cache: {e}") 

246 return None 

247 

248 async def get(self, cache_type: str, filters_hash: str = "") -> Optional[Any]: 

249 """Get cached data. 

250 

251 Args: 

252 cache_type: Type of cache (tools, prompts, resources, agents, servers, gateways) 

253 filters_hash: Hash of filter parameters 

254 

255 Returns: 

256 Cached data if found, None otherwise 

257 

258 Examples: 

259 >>> import asyncio 

260 >>> cache = RegistryCache() 

261 >>> result = asyncio.run(cache.get("tools", "abc123")) 

262 >>> result is None # Cache miss on fresh cache 

263 True 

264 """ 

265 if not self._enabled: 

266 return None 

267 

268 cache_key = self._get_redis_key(cache_type, filters_hash) 

269 

270 # Try Redis first 

271 redis = await self._get_redis_client() 

272 if redis: 

273 try: 

274 data = await redis.get(cache_key) 

275 if data: 

276 # Third-Party 

277 import orjson # pylint: disable=import-outside-toplevel 

278 

279 self._hit_count += 1 

280 self._redis_hit_count += 1 

281 return orjson.loads(data) 

282 self._redis_miss_count += 1 

283 except Exception as e: 

284 logger.warning(f"RegistryCache Redis get failed: {e}") 

285 

286 # Fall back to in-memory cache 

287 with self._lock: 

288 entry = self._cache.get(cache_key) 

289 if entry and not entry.is_expired(): 

290 self._hit_count += 1 

291 return entry.value 

292 

293 self._miss_count += 1 

294 return None 

295 

296 async def set(self, cache_type: str, data: Any, filters_hash: str = "", ttl: Optional[int] = None) -> None: 

297 """Store data in cache. 

298 

299 Args: 

300 cache_type: Type of cache (tools, prompts, resources, agents, servers, gateways) 

301 data: Data to cache (must be JSON-serializable) 

302 filters_hash: Hash of filter parameters 

303 ttl: TTL in seconds (uses default for cache_type if not specified) 

304 

305 Examples: 

306 >>> import asyncio 

307 >>> cache = RegistryCache() 

308 >>> asyncio.run(cache.set("tools", [{"id": "1", "name": "tool1"}], "abc123")) 

309 """ 

310 if not self._enabled: 

311 return 

312 

313 # Determine TTL 

314 if ttl is None: 

315 ttl_map = { 

316 "tools": self._tools_ttl, 

317 "prompts": self._prompts_ttl, 

318 "resources": self._resources_ttl, 

319 "agents": self._agents_ttl, 

320 "servers": self._servers_ttl, 

321 "gateways": self._gateways_ttl, 

322 "catalog": self._catalog_ttl, 

323 } 

324 ttl = ttl_map.get(cache_type, 20) 

325 

326 cache_key = self._get_redis_key(cache_type, filters_hash) 

327 

328 # Store in Redis 

329 redis = await self._get_redis_client() 

330 if redis: 

331 try: 

332 # Third-Party 

333 import orjson # pylint: disable=import-outside-toplevel 

334 

335 await redis.setex(cache_key, ttl, orjson.dumps(data)) 

336 except Exception as e: 

337 logger.warning(f"RegistryCache Redis set failed: {e}") 

338 

339 # Store in in-memory cache 

340 with self._lock: 

341 self._cache[cache_key] = CacheEntry(value=data, expiry=time.time() + ttl) 

342 

343 async def invalidate(self, cache_type: str) -> None: 

344 """Invalidate all cached data for a cache type. 

345 

346 Args: 

347 cache_type: Type of cache to invalidate (tools, prompts, etc.) 

348 

349 Examples: 

350 >>> import asyncio 

351 >>> cache = RegistryCache() 

352 >>> asyncio.run(cache.invalidate("tools")) 

353 """ 

354 logger.debug(f"RegistryCache: Invalidating {cache_type} cache") 

355 prefix = self._get_redis_key(cache_type) 

356 

357 # Clear in-memory cache 

358 with self._lock: 

359 keys_to_remove = [k for k in self._cache if k.startswith(prefix)] 

360 for key in keys_to_remove: 

361 self._cache.pop(key, None) 

362 

363 # Clear Redis 

364 redis = await self._get_redis_client() 

365 if redis: 

366 try: 

367 pattern = f"{prefix}*" 

368 async for key in redis.scan_iter(match=pattern): 

369 await redis.delete(key) 

370 

371 # Publish invalidation for other workers 

372 await redis.publish("mcpgw:cache:invalidate", f"registry:{cache_type}") 

373 except Exception as e: 

374 logger.warning(f"RegistryCache Redis invalidate failed: {e}") 

375 

376 async def invalidate_tools(self) -> None: 

377 """Invalidate tools cache. 

378 

379 Examples: 

380 >>> import asyncio 

381 >>> cache = RegistryCache() 

382 >>> asyncio.run(cache.invalidate_tools()) 

383 """ 

384 await self.invalidate("tools") 

385 

386 async def invalidate_prompts(self) -> None: 

387 """Invalidate prompts cache. 

388 

389 Examples: 

390 >>> import asyncio 

391 >>> cache = RegistryCache() 

392 >>> asyncio.run(cache.invalidate_prompts()) 

393 """ 

394 await self.invalidate("prompts") 

395 

396 async def invalidate_resources(self) -> None: 

397 """Invalidate resources cache. 

398 

399 Examples: 

400 >>> import asyncio 

401 >>> cache = RegistryCache() 

402 >>> asyncio.run(cache.invalidate_resources()) 

403 """ 

404 await self.invalidate("resources") 

405 

406 async def invalidate_agents(self) -> None: 

407 """Invalidate agents cache. 

408 

409 Examples: 

410 >>> import asyncio 

411 >>> cache = RegistryCache() 

412 >>> asyncio.run(cache.invalidate_agents()) 

413 """ 

414 await self.invalidate("agents") 

415 

416 async def invalidate_servers(self) -> None: 

417 """Invalidate servers cache. 

418 

419 Examples: 

420 >>> import asyncio 

421 >>> cache = RegistryCache() 

422 >>> asyncio.run(cache.invalidate_servers()) 

423 """ 

424 await self.invalidate("servers") 

425 

426 async def invalidate_gateways(self) -> None: 

427 """Invalidate gateways cache. 

428 

429 Examples: 

430 >>> import asyncio 

431 >>> cache = RegistryCache() 

432 >>> asyncio.run(cache.invalidate_gateways()) 

433 """ 

434 await self.invalidate("gateways") 

435 

436 async def invalidate_catalog(self) -> None: 

437 """Invalidate catalog servers cache. 

438 

439 Examples: 

440 >>> import asyncio 

441 >>> cache = RegistryCache() 

442 >>> asyncio.run(cache.invalidate_catalog()) 

443 """ 

444 await self.invalidate("catalog") 

445 

446 def invalidate_all(self) -> None: 

447 """Invalidate all cached data synchronously. 

448 

449 Examples: 

450 >>> cache = RegistryCache() 

451 >>> cache.invalidate_all() 

452 """ 

453 with self._lock: 

454 self._cache.clear() 

455 logger.info("RegistryCache: All caches invalidated") 

456 

457 def stats(self) -> Dict[str, Any]: 

458 """Get cache statistics. 

459 

460 Returns: 

461 Dictionary with hit/miss counts and hit rate 

462 

463 Examples: 

464 >>> cache = RegistryCache() 

465 >>> stats = cache.stats() 

466 >>> "hit_count" in stats 

467 True 

468 """ 

469 total = self._hit_count + self._miss_count 

470 redis_total = self._redis_hit_count + self._redis_miss_count 

471 

472 return { 

473 "enabled": self._enabled, 

474 "hit_count": self._hit_count, 

475 "miss_count": self._miss_count, 

476 "hit_rate": self._hit_count / total if total > 0 else 0.0, 

477 "redis_hit_count": self._redis_hit_count, 

478 "redis_miss_count": self._redis_miss_count, 

479 "redis_hit_rate": self._redis_hit_count / redis_total if redis_total > 0 else 0.0, 

480 "redis_available": self._redis_available, 

481 "cache_size": len(self._cache), 

482 "ttls": { 

483 "tools": self._tools_ttl, 

484 "prompts": self._prompts_ttl, 

485 "resources": self._resources_ttl, 

486 "agents": self._agents_ttl, 

487 "servers": self._servers_ttl, 

488 "gateways": self._gateways_ttl, 

489 "catalog": self._catalog_ttl, 

490 }, 

491 } 

492 

493 def reset_stats(self) -> None: 

494 """Reset hit/miss counters. 

495 

496 Examples: 

497 >>> cache = RegistryCache() 

498 >>> cache._hit_count = 100 

499 >>> cache.reset_stats() 

500 >>> cache._hit_count 

501 0 

502 """ 

503 self._hit_count = 0 

504 self._miss_count = 0 

505 self._redis_hit_count = 0 

506 self._redis_miss_count = 0 

507 

508 

509# Global singleton instance 

510_registry_cache: Optional[RegistryCache] = None 

511 

512 

513def get_registry_cache() -> RegistryCache: 

514 """Get or create the singleton RegistryCache instance. 

515 

516 Returns: 

517 RegistryCache: The singleton registry cache instance 

518 

519 Examples: 

520 >>> cache = get_registry_cache() 

521 >>> isinstance(cache, RegistryCache) 

522 True 

523 """ 

524 global _registry_cache # pylint: disable=global-statement 

525 if _registry_cache is None: 

526 _registry_cache = RegistryCache() 

527 return _registry_cache 

528 

529 

530# Convenience alias for direct import 

531registry_cache = get_registry_cache() 

532 

533 

534class CacheInvalidationSubscriber: 

535 """Redis pubsub subscriber for cross-worker cache invalidation. 

536 

537 This class subscribes to the 'mcpgw:cache:invalidate' Redis channel 

538 and processes invalidation messages from other workers, ensuring 

539 local in-memory caches stay synchronized in multi-worker deployments. 

540 

541 Message formats handled: 

542 - registry:{cache_type} - Invalidate registry cache (tools, prompts, etc.) 

543 - tool_lookup:{name} - Invalidate specific tool lookup 

544 - tool_lookup:gateway:{gateway_id} - Invalidate all tools for a gateway 

545 - admin:{prefix} - Invalidate admin stats cache 

546 

547 Examples: 

548 >>> subscriber = CacheInvalidationSubscriber() 

549 >>> # Start listening in background task: 

550 >>> # await subscriber.start() 

551 >>> # Stop when shutting down: 

552 >>> # await subscriber.stop() 

553 """ 

554 

555 def __init__(self) -> None: 

556 """Initialize the cache invalidation subscriber.""" 

557 self._task: Optional[asyncio.Task[None]] = None 

558 self._stop_event: Optional[asyncio.Event] = None 

559 self._pubsub: Optional[Any] = None 

560 self._channel = "mcpgw:cache:invalidate" 

561 self._started = False 

562 

563 async def start(self) -> None: 

564 """Start listening for cache invalidation messages. 

565 

566 This creates a background task that subscribes to the Redis 

567 channel and processes invalidation messages. 

568 

569 Examples: 

570 >>> import asyncio 

571 >>> subscriber = CacheInvalidationSubscriber() 

572 >>> # asyncio.run(subscriber.start()) 

573 """ 

574 if self._started: 

575 logger.debug("CacheInvalidationSubscriber already started") 

576 return 

577 

578 try: 

579 # First-Party 

580 from mcpgateway.utils.redis_client import get_redis_client # pylint: disable=import-outside-toplevel 

581 

582 redis = await get_redis_client() 

583 if not redis: 

584 logger.info("CacheInvalidationSubscriber: Redis unavailable, skipping cross-worker invalidation") 

585 return 

586 

587 self._stop_event = asyncio.Event() 

588 self._pubsub = redis.pubsub() 

589 await self._pubsub.subscribe(self._channel) # pyright: ignore[reportOptionalMemberAccess] 

590 

591 self._task = asyncio.create_task(self._listen_loop()) 

592 self._started = True 

593 logger.info("CacheInvalidationSubscriber started on channel '%s'", self._channel) 

594 

595 except Exception as e: 

596 logger.warning("CacheInvalidationSubscriber failed to start: %s", e) 

597 # Clean up partially created pubsub to prevent leaks 

598 # Use timeout to prevent blocking if pubsub doesn't close cleanly 

599 cleanup_timeout = _get_cleanup_timeout() 

600 if self._pubsub is not None: 600 ↛ exitline 600 didn't return from function 'start' because the condition on line 600 was always true

601 try: 

602 try: 

603 await asyncio.wait_for(self._pubsub.aclose(), timeout=cleanup_timeout) 

604 except AttributeError: 

605 await asyncio.wait_for(self._pubsub.close(), timeout=cleanup_timeout) 

606 except asyncio.TimeoutError: 

607 logger.debug("Pubsub cleanup timed out - proceeding anyway") 

608 except Exception as cleanup_err: 

609 logger.debug("Error during pubsub cleanup: %s", cleanup_err) 

610 self._pubsub = None 

611 

612 async def stop(self) -> None: 

613 """Stop listening for cache invalidation messages. 

614 

615 This cancels the background task and cleans up resources. 

616 

617 Examples: 

618 >>> import asyncio 

619 >>> subscriber = CacheInvalidationSubscriber() 

620 >>> # asyncio.run(subscriber.stop()) 

621 """ 

622 if not self._started: 

623 return 

624 

625 self._started = False 

626 

627 if self._stop_event: 627 ↛ 630line 627 didn't jump to line 630 because the condition on line 627 was always true

628 self._stop_event.set() 

629 

630 if self._task: 

631 self._task.cancel() 

632 try: 

633 await asyncio.wait_for(self._task, timeout=2.0) 

634 except (asyncio.CancelledError, asyncio.TimeoutError): 

635 pass 

636 self._task = None 

637 

638 if self._pubsub: 

639 cleanup_timeout = _get_cleanup_timeout() 

640 try: 

641 await asyncio.wait_for(self._pubsub.unsubscribe(self._channel), timeout=cleanup_timeout) 

642 except asyncio.TimeoutError: 

643 logger.debug("Pubsub unsubscribe timed out - proceeding anyway") 

644 except Exception as e: 

645 logger.debug("Error unsubscribing from pubsub: %s", e) 

646 try: 

647 try: 

648 await asyncio.wait_for(self._pubsub.aclose(), timeout=cleanup_timeout) 

649 except AttributeError: 

650 await asyncio.wait_for(self._pubsub.close(), timeout=cleanup_timeout) 

651 except asyncio.TimeoutError: 

652 logger.debug("Pubsub close timed out - proceeding anyway") 

653 except Exception as e: 

654 logger.debug("Error closing pubsub: %s", e) 

655 self._pubsub = None 

656 

657 logger.info("CacheInvalidationSubscriber stopped") 

658 

659 async def _listen_loop(self) -> None: 

660 """Background loop that listens for and processes invalidation messages.""" 

661 logger.debug("CacheInvalidationSubscriber listen loop started") 

662 try: 

663 while self._started and not (self._stop_event and self._stop_event.is_set()): 

664 if self._pubsub is None: 

665 break 

666 try: 

667 message = await asyncio.wait_for( 

668 self._pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0), 

669 timeout=2.0, 

670 ) 

671 if message and message.get("type") == "message": 

672 data = message.get("data") 

673 if isinstance(data, bytes): 

674 data = data.decode("utf-8") 

675 if data: 

676 await self._process_invalidation(data) 

677 except asyncio.TimeoutError: 

678 continue 

679 except Exception as e: # pylint: disable=broad-exception-caught 

680 logger.debug("CacheInvalidationSubscriber message error: %s", e) 

681 await asyncio.sleep(0.1) 

682 except asyncio.CancelledError: 

683 logger.debug("CacheInvalidationSubscriber listen loop cancelled") 

684 finally: 

685 logger.debug("CacheInvalidationSubscriber listen loop exited") 

686 

687 async def _process_invalidation(self, message: str) -> None: # pylint: disable=too-many-branches 

688 """Process a cache invalidation message. 

689 

690 Args: 

691 message: The invalidation message in format 'type:identifier' 

692 """ 

693 logger.debug("CacheInvalidationSubscriber received: %s", message) 

694 

695 # pylint: disable=protected-access 

696 # pyright: ignore[reportPrivateUsage] 

697 # We intentionally access protected members to clear local in-memory caches 

698 # without triggering another round of Redis pubsub invalidation messages 

699 try: 

700 if message.startswith("registry:"): 

701 # Handle registry cache invalidation (tools, prompts, resources, etc.) 

702 cache_type = message[len("registry:") :] 

703 cache = get_registry_cache() 

704 # Only clear local in-memory cache to avoid infinite loops 

705 prefix = cache._get_redis_key(cache_type) # pyright: ignore[reportPrivateUsage] 

706 with cache._lock: # pyright: ignore[reportPrivateUsage] 

707 keys_to_remove = [k for k in cache._cache if k.startswith(prefix)] # pyright: ignore[reportPrivateUsage] 

708 for key in keys_to_remove: 

709 cache._cache.pop(key, None) # pyright: ignore[reportPrivateUsage] 

710 logger.debug("CacheInvalidationSubscriber: Cleared local registry:%s cache (%d keys)", cache_type, len(keys_to_remove)) 

711 

712 elif message.startswith("tool_lookup:gateway:"): 

713 # Handle gateway-wide tool lookup invalidation 

714 gateway_id = message[len("tool_lookup:gateway:") :] 

715 # First-Party 

716 from mcpgateway.cache.tool_lookup_cache import tool_lookup_cache # pylint: disable=import-outside-toplevel 

717 

718 # Only clear local L1 cache 

719 with tool_lookup_cache._lock: # pyright: ignore[reportPrivateUsage] 

720 to_remove = [name for name, entry in tool_lookup_cache._cache.items() if entry.value.get("tool", {}).get("gateway_id") == gateway_id] # pyright: ignore[reportPrivateUsage] 

721 for name in to_remove: 

722 tool_lookup_cache._cache.pop(name, None) # pyright: ignore[reportPrivateUsage] 

723 logger.debug("CacheInvalidationSubscriber: Cleared local tool_lookup for gateway %s (%d keys)", gateway_id, len(to_remove)) 

724 

725 elif message.startswith("tool_lookup:"): 

726 # Handle specific tool lookup invalidation 

727 tool_name = message[len("tool_lookup:") :] 

728 # First-Party 

729 from mcpgateway.cache.tool_lookup_cache import tool_lookup_cache # pylint: disable=import-outside-toplevel 

730 

731 # Only clear local L1 cache 

732 with tool_lookup_cache._lock: # pyright: ignore[reportPrivateUsage] 

733 tool_lookup_cache._cache.pop(tool_name, None) # pyright: ignore[reportPrivateUsage] 

734 logger.debug("CacheInvalidationSubscriber: Cleared local tool_lookup:%s", tool_name) 

735 

736 elif message.startswith("admin:"): 

737 # Handle admin stats cache invalidation 

738 prefix = message[len("admin:") :] 

739 # First-Party 

740 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

741 

742 # Only clear local in-memory cache 

743 full_prefix = admin_stats_cache._get_redis_key(prefix) # pyright: ignore[reportPrivateUsage] 

744 with admin_stats_cache._lock: # pyright: ignore[reportPrivateUsage] 

745 keys_to_remove = [k for k in admin_stats_cache._cache if k.startswith(full_prefix)] # pyright: ignore[reportPrivateUsage] 

746 for key in keys_to_remove: 

747 admin_stats_cache._cache.pop(key, None) # pyright: ignore[reportPrivateUsage] 

748 logger.debug("CacheInvalidationSubscriber: Cleared local admin:%s cache (%d keys)", prefix, len(keys_to_remove)) 

749 

750 else: 

751 logger.debug("CacheInvalidationSubscriber: Unknown message format: %s", message) 

752 

753 except Exception as e: # pylint: disable=broad-exception-caught 

754 logger.warning("CacheInvalidationSubscriber: Error processing '%s': %s", message, e) 

755 

756 

757# Global singleton for cache invalidation subscriber 

758_cache_invalidation_subscriber: Optional[CacheInvalidationSubscriber] = None 

759 

760 

761def get_cache_invalidation_subscriber() -> CacheInvalidationSubscriber: 

762 """Get or create the singleton CacheInvalidationSubscriber instance. 

763 

764 Returns: 

765 CacheInvalidationSubscriber: The singleton instance 

766 

767 Examples: 

768 >>> subscriber = get_cache_invalidation_subscriber() 

769 >>> isinstance(subscriber, CacheInvalidationSubscriber) 

770 True 

771 """ 

772 global _cache_invalidation_subscriber # pylint: disable=global-statement 

773 if _cache_invalidation_subscriber is None: 

774 _cache_invalidation_subscriber = CacheInvalidationSubscriber() 

775 return _cache_invalidation_subscriber