Coverage for mcpgateway / services / event_service.py: 100%
99 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/event_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
6Authors: Keval Mahajan
8Description:
9 This module implements a Centralized Event Service designed to decouple event
10 producers from consumers within the MCP Gateway architecture for various services
11 such as gateway_service, tool_service, and more.
13 - Primary Transport (Redis): Uses Redis Pub/Sub for distributed event
14 broadcasting. This allows multiple Gateway instances (scaled horizontally)
15 to share events.
16 - Fallback Transport (Local Queue): Uses `asyncio.Queue` for in-memory
17 communication. This activates automatically if Redis is unavailable or
18 misconfigured, ensuring the application remains functional in a single-node
19 development environment.
21Usage Guide:
23 1. Initialization
24 Instantiate the service with a unique channel name. This acts as the "Topic".
26 ```python
27 from mcpgateway.services.event_service import EventService
29 # Create a service instance for tool execution events
30 tool_events = EventService(channel_name="mcpgateway:tools")
31 ```
33 2. Publishing Events (Producer)
34 Any part of the application can publish a dictionary to the channel.
36 ```python
37 await tool_events.publish_event({
38 "event": "tool_start",
39 "tool_name": "calculator",
40 "timestamp": datetime.now().isoformat()
41 })
42 ```
44 3. Subscribing to Events (Consumer)
45 Use an async for-loop to listen to the stream. This generator yields
46 events as they arrive.
48 ```python
49 async for event in tool_events.subscribe_events():
50 print(f"Received event: {event['event']}")
51 # Process event...
52 ```
53"""
55# Standard
56import asyncio
57import importlib.util
58from typing import Any, AsyncGenerator, Dict, List, Optional
60# Third-Party
61import orjson
63# First-Party
64from mcpgateway.config import settings
65from mcpgateway.services.logging_service import LoggingService
66from mcpgateway.utils.redis_client import get_redis_client
68try:
69 REDIS_AVAILABLE = importlib.util.find_spec("redis.asyncio") is not None
70except (ModuleNotFoundError, AttributeError) as e:
71 # ModuleNotFoundError: redis package not installed
72 # AttributeError: 'redis' exists but isn't a proper package (e.g., shadowed by a file)
73 # Standard
74 import logging
76 logging.getLogger(__name__).warning(f"Redis module check failed ({type(e).__name__}: {e}), Redis support disabled")
77 REDIS_AVAILABLE = False
79# Initialize logging
80logging_service = LoggingService()
81logger = logging_service.get_logger(__name__)
84class EventService:
85 """Generic Event Service handling Redis PubSub with Local Queue fallback.
87 Replicates the logic from GatewayService for use in other services. It attempts
88 to connect to Redis for a distributed event bus. If Redis is unavailable or
89 configured to perform locally, it falls back to asyncio.Queue for in-process
90 communication.
92 Attributes:
93 channel_name (str): The specific Redis/Queue channel identifier.
94 redis_url (Optional[str]): The URL for the Redis connection.
95 """
97 def __init__(self, channel_name: str) -> None:
98 """Initialize the Event Service.
100 Args:
101 channel_name: The specific Redis channel to use (e.g., 'mcpgateway:tool_events')
102 to ensure separation of services.
104 Example:
105 >>> service = EventService("test:channel")
106 >>> service.channel_name
107 'test:channel'
108 """
109 self.channel_name = channel_name
110 self._event_subscribers: List[asyncio.Queue] = []
112 self.redis_url = settings.redis_url if settings.cache_type == "redis" else None
113 self._redis_client: Optional[Any] = None
114 # Redis client is set in initialize() via the shared factory
116 async def initialize(self) -> None:
117 """Initialize the event service with shared Redis client.
119 Should be called during application startup to get the shared Redis client.
120 """
121 if self.redis_url and REDIS_AVAILABLE:
122 try:
123 self._redis_client = await get_redis_client()
124 if self._redis_client:
125 logger.info(f"EventService ({self.channel_name}) connected to Redis")
126 except Exception as e:
127 logger.warning(f"Failed to initialize Redis for EventService ({self.channel_name}): {e}")
128 self._redis_client = None
130 async def publish_event(self, event: Dict[str, Any]) -> None:
131 """Publish event to Redis or fallback to local subscribers.
133 If a Redis client is active, the event is serialized to JSON and published
134 to the configured channel. If Redis fails or is inactive, the event is
135 pushed to all registered local asyncio queues.
137 Args:
138 event: A dictionary containing the event data to be published.
140 Example:
141 >>> import asyncio
142 >>> async def test_pub():
143 ... # Force local mode for test
144 ... service = EventService("test:pub")
145 ... service._redis_client = None
146 ... # Create a listener
147 ... queue = asyncio.Queue()
148 ... service._event_subscribers.append(queue)
149 ...
150 ... await service.publish_event({"type": "test", "data": 123})
151 ... return await queue.get()
152 >>> asyncio.run(test_pub())
153 {'type': 'test', 'data': 123}
154 """
155 if self._redis_client:
156 try:
157 await self._redis_client.publish(self.channel_name, orjson.dumps(event))
158 except Exception as e:
159 logger.error(f"Failed to publish event to Redis channel {self.channel_name}: {e}")
160 # Fallback: push to local queues if Redis fails
161 for queue in self._event_subscribers:
162 await queue.put(event)
163 else:
164 # Local only (single worker or file-lock mode)
165 for queue in self._event_subscribers:
166 await queue.put(event)
168 async def subscribe_events(self) -> AsyncGenerator[Dict[str, Any], None]:
169 """Subscribe to events. Yields events as they are published.
171 If Redis is available, this creates a dedicated async Redis connection
172 and yields messages from the PubSub channel. If Redis is not available,
173 it creates a local asyncio.Queue, adds it to the subscriber list, and
174 yields items put into that queue.
176 Yields:
177 Dict[str, Any]: The deserialized event data.
179 Raises:
180 asyncio.CancelledError: If the async task is cancelled.
181 Exception: For underlying Redis connection errors.
183 Example:
184 >>> import asyncio
185 >>> async def test_sub():
186 ... service = EventService("test:sub")
187 ... service._redis_client = None # Force local mode
188 ...
189 ... # Producer task
190 ... async def produce():
191 ... await asyncio.sleep(0.1)
192 ... await service.publish_event({"msg": "hello"})
193 ...
194 ... # Consumer task
195 ... async def consume():
196 ... async for event in service.subscribe_events():
197 ... return event
198 ...
199 ... # Run both
200 ... _, event = await asyncio.gather(produce(), consume())
201 ... return event
202 >>> # asyncio.run(test_sub())
203 {'msg': 'hello'}
204 """
206 fallback_to_local = False
208 if self._redis_client:
210 try:
211 # Get shared Redis client from factory
212 # PubSub uses the client's connection pool but creates dedicated subscription
213 client = await get_redis_client()
214 if not client:
215 fallback_to_local = True
216 else:
217 pubsub = client.pubsub()
219 await pubsub.subscribe(self.channel_name)
221 # Use timeout-based polling instead of blocking listen()
222 # This allows the generator to respond to cancellation properly
223 # and prevents CPU spin loops when cancelled but stuck on async iterator
224 poll_timeout = 1.0
226 try:
227 while True:
228 try:
229 message = await asyncio.wait_for(
230 pubsub.get_message(ignore_subscribe_messages=True, timeout=poll_timeout),
231 timeout=poll_timeout + 0.5,
232 )
233 except asyncio.TimeoutError:
234 # No message, continue loop to check for cancellation
235 continue
237 if message is None:
238 # Prevent spin if get_message returns None immediately
239 await asyncio.sleep(0.1)
240 continue
242 if message["type"] != "message":
243 # Sleep on non-message types to prevent spin
244 await asyncio.sleep(0.1)
245 continue
247 # Yield the data portion
248 yield orjson.loads(message["data"])
249 except asyncio.CancelledError:
250 # Handle client disconnection
251 logger.debug(f"Client disconnected from Redis subscription: {self.channel_name}")
252 raise
253 except Exception as e:
254 logger.error(f"Redis subscription error on {self.channel_name}: {e}")
255 raise
256 finally:
257 # Cleanup pubsub only (don't close shared client)
258 try:
259 await pubsub.unsubscribe(self.channel_name)
260 await pubsub.aclose()
261 except Exception as e:
262 logger.warning(f"Error closing Redis subscription: {e}")
263 except ImportError:
264 fallback_to_local = True
265 logger.error("Redis is configured but redis-py does not support asyncio or is not installed.")
266 # Fallthrough to queue mode if import fails
268 # Local Queue (Redis not available or import failed)
269 if fallback_to_local or not (self.redis_url and REDIS_AVAILABLE):
270 queue: asyncio.Queue = asyncio.Queue()
271 self._event_subscribers.append(queue)
272 try:
273 while True:
274 event = await queue.get()
275 yield event
276 except asyncio.CancelledError:
277 logger.debug(f"Client disconnected from local event subscription: {self.channel_name}")
278 raise
279 finally:
280 if queue in self._event_subscribers:
281 self._event_subscribers.remove(queue)
283 async def event_generator(self) -> AsyncGenerator[str, None]:
284 """Generates Server-Sent Events (SSE) formatted strings.
286 This is a convenience wrapper around `subscribe_events` designed for
287 direct use with streaming HTTP responses (e.g., FastAPI's StreamingResponse).
289 Yields:
290 str: A string formatted as an SSE message: 'data: {...}\\n\\n'
292 Raises:
293 asyncio.CancelledError: If the client disconnects and the streaming
294 task is cancelled.
295 """
296 try:
297 async for event in self.subscribe_events():
298 # Serialize the dictionary to a JSON string and format as SSE
299 yield f"data: {orjson.dumps(event).decode()}\n\n"
300 except asyncio.CancelledError:
301 # Handle client disconnection gracefully
302 logger.info(f"Client disconnected from event stream: {self.channel_name}")
303 raise
305 async def shutdown(self):
306 """Cleanup resources.
308 Clears local subscribers. The shared Redis client is managed by the factory.
310 Example:
311 >>> import asyncio
312 >>> async def test_shutdown():
313 ... service = EventService("test:shutdown")
314 ... await service.shutdown()
315 ... return len(service._event_subscribers) == 0
316 >>> asyncio.run(test_shutdown())
317 True
318 """
319 # Don't close the shared Redis client - it's managed by redis_client.py
320 self._redis_client = None
321 self._event_subscribers.clear()