Coverage for mcpgateway / services / event_service.py: 100%

99 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/event_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5 

6Authors: Keval Mahajan 

7 

8Description: 

9 This module implements a Centralized Event Service designed to decouple event 

10 producers from consumers within the MCP Gateway architecture for various services 

11 such as gateway_service, tool_service, and more. 

12 

13 - Primary Transport (Redis): Uses Redis Pub/Sub for distributed event 

14 broadcasting. This allows multiple Gateway instances (scaled horizontally) 

15 to share events. 

16 - Fallback Transport (Local Queue): Uses `asyncio.Queue` for in-memory 

17 communication. This activates automatically if Redis is unavailable or 

18 misconfigured, ensuring the application remains functional in a single-node 

19 development environment. 

20 

21Usage Guide: 

22 

23 1. Initialization 

24 Instantiate the service with a unique channel name. This acts as the "Topic". 

25 

26 ```python 

27 from mcpgateway.services.event_service import EventService 

28 

29 # Create a service instance for tool execution events 

30 tool_events = EventService(channel_name="mcpgateway:tools") 

31 ``` 

32 

33 2. Publishing Events (Producer) 

34 Any part of the application can publish a dictionary to the channel. 

35 

36 ```python 

37 await tool_events.publish_event({ 

38 "event": "tool_start", 

39 "tool_name": "calculator", 

40 "timestamp": datetime.now().isoformat() 

41 }) 

42 ``` 

43 

44 3. Subscribing to Events (Consumer) 

45 Use an async for-loop to listen to the stream. This generator yields 

46 events as they arrive. 

47 

48 ```python 

49 async for event in tool_events.subscribe_events(): 

50 print(f"Received event: {event['event']}") 

51 # Process event... 

52 ``` 

53""" 

54 

55# Standard 

56import asyncio 

57import importlib.util 

58from typing import Any, AsyncGenerator, Dict, List, Optional 

59 

60# Third-Party 

61import orjson 

62 

63# First-Party 

64from mcpgateway.config import settings 

65from mcpgateway.services.logging_service import LoggingService 

66from mcpgateway.utils.redis_client import get_redis_client 

67 

68try: 

69 REDIS_AVAILABLE = importlib.util.find_spec("redis.asyncio") is not None 

70except (ModuleNotFoundError, AttributeError) as e: 

71 # ModuleNotFoundError: redis package not installed 

72 # AttributeError: 'redis' exists but isn't a proper package (e.g., shadowed by a file) 

73 # Standard 

74 import logging 

75 

76 logging.getLogger(__name__).warning(f"Redis module check failed ({type(e).__name__}: {e}), Redis support disabled") 

77 REDIS_AVAILABLE = False 

78 

79# Initialize logging 

80logging_service = LoggingService() 

81logger = logging_service.get_logger(__name__) 

82 

83 

84class EventService: 

85 """Generic Event Service handling Redis PubSub with Local Queue fallback. 

86 

87 Replicates the logic from GatewayService for use in other services. It attempts 

88 to connect to Redis for a distributed event bus. If Redis is unavailable or 

89 configured to perform locally, it falls back to asyncio.Queue for in-process 

90 communication. 

91 

92 Attributes: 

93 channel_name (str): The specific Redis/Queue channel identifier. 

94 redis_url (Optional[str]): The URL for the Redis connection. 

95 """ 

96 

97 def __init__(self, channel_name: str) -> None: 

98 """Initialize the Event Service. 

99 

100 Args: 

101 channel_name: The specific Redis channel to use (e.g., 'mcpgateway:tool_events') 

102 to ensure separation of services. 

103 

104 Example: 

105 >>> service = EventService("test:channel") 

106 >>> service.channel_name 

107 'test:channel' 

108 """ 

109 self.channel_name = channel_name 

110 self._event_subscribers: List[asyncio.Queue] = [] 

111 

112 self.redis_url = settings.redis_url if settings.cache_type == "redis" else None 

113 self._redis_client: Optional[Any] = None 

114 # Redis client is set in initialize() via the shared factory 

115 

116 async def initialize(self) -> None: 

117 """Initialize the event service with shared Redis client. 

118 

119 Should be called during application startup to get the shared Redis client. 

120 """ 

121 if self.redis_url and REDIS_AVAILABLE: 

122 try: 

123 self._redis_client = await get_redis_client() 

124 if self._redis_client: 

125 logger.info(f"EventService ({self.channel_name}) connected to Redis") 

126 except Exception as e: 

127 logger.warning(f"Failed to initialize Redis for EventService ({self.channel_name}): {e}") 

128 self._redis_client = None 

129 

130 async def publish_event(self, event: Dict[str, Any]) -> None: 

131 """Publish event to Redis or fallback to local subscribers. 

132 

133 If a Redis client is active, the event is serialized to JSON and published 

134 to the configured channel. If Redis fails or is inactive, the event is 

135 pushed to all registered local asyncio queues. 

136 

137 Args: 

138 event: A dictionary containing the event data to be published. 

139 

140 Example: 

141 >>> import asyncio 

142 >>> async def test_pub(): 

143 ... # Force local mode for test 

144 ... service = EventService("test:pub") 

145 ... service._redis_client = None 

146 ... # Create a listener 

147 ... queue = asyncio.Queue() 

148 ... service._event_subscribers.append(queue) 

149 ... 

150 ... await service.publish_event({"type": "test", "data": 123}) 

151 ... return await queue.get() 

152 >>> asyncio.run(test_pub()) 

153 {'type': 'test', 'data': 123} 

154 """ 

155 if self._redis_client: 

156 try: 

157 await self._redis_client.publish(self.channel_name, orjson.dumps(event)) 

158 except Exception as e: 

159 logger.error(f"Failed to publish event to Redis channel {self.channel_name}: {e}") 

160 # Fallback: push to local queues if Redis fails 

161 for queue in self._event_subscribers: 

162 await queue.put(event) 

163 else: 

164 # Local only (single worker or file-lock mode) 

165 for queue in self._event_subscribers: 

166 await queue.put(event) 

167 

168 async def subscribe_events(self) -> AsyncGenerator[Dict[str, Any], None]: 

169 """Subscribe to events. Yields events as they are published. 

170 

171 If Redis is available, this creates a dedicated async Redis connection 

172 and yields messages from the PubSub channel. If Redis is not available, 

173 it creates a local asyncio.Queue, adds it to the subscriber list, and 

174 yields items put into that queue. 

175 

176 Yields: 

177 Dict[str, Any]: The deserialized event data. 

178 

179 Raises: 

180 asyncio.CancelledError: If the async task is cancelled. 

181 Exception: For underlying Redis connection errors. 

182 

183 Example: 

184 >>> import asyncio 

185 >>> async def test_sub(): 

186 ... service = EventService("test:sub") 

187 ... service._redis_client = None # Force local mode 

188 ... 

189 ... # Producer task 

190 ... async def produce(): 

191 ... await asyncio.sleep(0.1) 

192 ... await service.publish_event({"msg": "hello"}) 

193 ... 

194 ... # Consumer task 

195 ... async def consume(): 

196 ... async for event in service.subscribe_events(): 

197 ... return event 

198 ... 

199 ... # Run both 

200 ... _, event = await asyncio.gather(produce(), consume()) 

201 ... return event 

202 >>> # asyncio.run(test_sub()) 

203 {'msg': 'hello'} 

204 """ 

205 

206 fallback_to_local = False 

207 

208 if self._redis_client: 

209 

210 try: 

211 # Get shared Redis client from factory 

212 # PubSub uses the client's connection pool but creates dedicated subscription 

213 client = await get_redis_client() 

214 if not client: 

215 fallback_to_local = True 

216 else: 

217 pubsub = client.pubsub() 

218 

219 await pubsub.subscribe(self.channel_name) 

220 

221 # Use timeout-based polling instead of blocking listen() 

222 # This allows the generator to respond to cancellation properly 

223 # and prevents CPU spin loops when cancelled but stuck on async iterator 

224 poll_timeout = 1.0 

225 

226 try: 

227 while True: 

228 try: 

229 message = await asyncio.wait_for( 

230 pubsub.get_message(ignore_subscribe_messages=True, timeout=poll_timeout), 

231 timeout=poll_timeout + 0.5, 

232 ) 

233 except asyncio.TimeoutError: 

234 # No message, continue loop to check for cancellation 

235 continue 

236 

237 if message is None: 

238 # Prevent spin if get_message returns None immediately 

239 await asyncio.sleep(0.1) 

240 continue 

241 

242 if message["type"] != "message": 

243 # Sleep on non-message types to prevent spin 

244 await asyncio.sleep(0.1) 

245 continue 

246 

247 # Yield the data portion 

248 yield orjson.loads(message["data"]) 

249 except asyncio.CancelledError: 

250 # Handle client disconnection 

251 logger.debug(f"Client disconnected from Redis subscription: {self.channel_name}") 

252 raise 

253 except Exception as e: 

254 logger.error(f"Redis subscription error on {self.channel_name}: {e}") 

255 raise 

256 finally: 

257 # Cleanup pubsub only (don't close shared client) 

258 try: 

259 await pubsub.unsubscribe(self.channel_name) 

260 await pubsub.aclose() 

261 except Exception as e: 

262 logger.warning(f"Error closing Redis subscription: {e}") 

263 except ImportError: 

264 fallback_to_local = True 

265 logger.error("Redis is configured but redis-py does not support asyncio or is not installed.") 

266 # Fallthrough to queue mode if import fails 

267 

268 # Local Queue (Redis not available or import failed) 

269 if fallback_to_local or not (self.redis_url and REDIS_AVAILABLE): 

270 queue: asyncio.Queue = asyncio.Queue() 

271 self._event_subscribers.append(queue) 

272 try: 

273 while True: 

274 event = await queue.get() 

275 yield event 

276 except asyncio.CancelledError: 

277 logger.debug(f"Client disconnected from local event subscription: {self.channel_name}") 

278 raise 

279 finally: 

280 if queue in self._event_subscribers: 

281 self._event_subscribers.remove(queue) 

282 

283 async def event_generator(self) -> AsyncGenerator[str, None]: 

284 """Generates Server-Sent Events (SSE) formatted strings. 

285 

286 This is a convenience wrapper around `subscribe_events` designed for 

287 direct use with streaming HTTP responses (e.g., FastAPI's StreamingResponse). 

288 

289 Yields: 

290 str: A string formatted as an SSE message: 'data: {...}\\n\\n' 

291 

292 Raises: 

293 asyncio.CancelledError: If the client disconnects and the streaming 

294 task is cancelled. 

295 """ 

296 try: 

297 async for event in self.subscribe_events(): 

298 # Serialize the dictionary to a JSON string and format as SSE 

299 yield f"data: {orjson.dumps(event).decode()}\n\n" 

300 except asyncio.CancelledError: 

301 # Handle client disconnection gracefully 

302 logger.info(f"Client disconnected from event stream: {self.channel_name}") 

303 raise 

304 

305 async def shutdown(self): 

306 """Cleanup resources. 

307 

308 Clears local subscribers. The shared Redis client is managed by the factory. 

309 

310 Example: 

311 >>> import asyncio 

312 >>> async def test_shutdown(): 

313 ... service = EventService("test:shutdown") 

314 ... await service.shutdown() 

315 ... return len(service._event_subscribers) == 0 

316 >>> asyncio.run(test_shutdown()) 

317 True 

318 """ 

319 # Don't close the shared Redis client - it's managed by redis_client.py 

320 self._redis_client = None 

321 self._event_subscribers.clear()