Coverage for mcpgateway / services / log_storage_service.py: 100%
154 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/log_storage_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Log Storage Service Implementation.
8This service provides in-memory storage for recent logs with entity context,
9supporting filtering, pagination, and real-time streaming.
10"""
12# Standard
13import asyncio
14from collections import deque
15from datetime import datetime, timezone
16import sys
17from typing import Any, AsyncGenerator, Deque, Dict, List, Optional, TypedDict
18import uuid
20# First-Party
21from mcpgateway.common.models import LogLevel
22from mcpgateway.config import settings
25class LogEntryDict(TypedDict, total=False):
26 """TypedDict for LogEntry serialization."""
28 id: str
29 timestamp: str
30 level: LogLevel
31 entity_type: Optional[str]
32 entity_id: Optional[str]
33 entity_name: Optional[str]
34 message: str
35 logger: Optional[str]
36 data: Optional[Dict[str, Any]]
37 request_id: Optional[str]
40class LogEntry:
41 """Simple log entry for in-memory storage.
43 Attributes:
44 id: Unique identifier for the log entry
45 timestamp: When the log entry was created
46 level: Severity level of the log
47 entity_type: Type of entity (tool, resource, server, gateway)
48 entity_id: ID of the related entity
49 entity_name: Name of the related entity for display
50 message: The log message
51 logger: Logger name/source
52 data: Additional structured data
53 request_id: Associated request ID for tracing
54 """
56 __slots__ = ("id", "timestamp", "level", "entity_type", "entity_id", "entity_name", "message", "logger", "data", "request_id", "_size")
58 def __init__( # pylint: disable=too-many-positional-arguments
59 self,
60 level: LogLevel,
61 message: str,
62 entity_type: Optional[str] = None,
63 entity_id: Optional[str] = None,
64 entity_name: Optional[str] = None,
65 logger: Optional[str] = None,
66 data: Optional[Dict[str, Any]] = None,
67 request_id: Optional[str] = None,
68 ):
69 """Initialize a log entry.
71 Args:
72 level: Severity level of the log
73 message: The log message
74 entity_type: Type of entity (tool, resource, server, gateway)
75 entity_id: ID of the related entity
76 entity_name: Name of the related entity for display
77 logger: Logger name/source
78 data: Additional structured data
79 request_id: Associated request ID for tracing
80 """
81 self.id = str(uuid.uuid4())
82 self.timestamp = datetime.now(timezone.utc)
83 self.level = level
84 self.entity_type = entity_type
85 self.entity_id = entity_id
86 self.entity_name = entity_name
87 self.message = message
88 self.logger = logger
89 self.data = data
90 self.request_id = request_id
92 # Estimate memory size (rough approximation)
93 self._size = sys.getsizeof(self.id)
94 self._size += sys.getsizeof(self.timestamp)
95 self._size += sys.getsizeof(self.level)
96 self._size += sys.getsizeof(self.message)
97 self._size += sys.getsizeof(self.entity_type) if self.entity_type else 0
98 self._size += sys.getsizeof(self.entity_id) if self.entity_id else 0
99 self._size += sys.getsizeof(self.entity_name) if self.entity_name else 0
100 self._size += sys.getsizeof(self.logger) if self.logger else 0
101 self._size += sys.getsizeof(self.data) if self.data else 0
102 self._size += sys.getsizeof(self.request_id) if self.request_id else 0
104 def to_dict(self) -> LogEntryDict:
105 """Convert to dictionary for JSON serialization.
107 Returns:
108 Dictionary representation of the log entry
110 Examples:
111 >>> from mcpgateway.common.models import LogLevel
112 >>> entry = LogEntry(LogLevel.INFO, "Test message", entity_type="tool", entity_id="123")
113 >>> d = entry.to_dict()
114 >>> str(d['level'])
115 'LogLevel.INFO'
116 >>> d['message']
117 'Test message'
118 >>> d['entity_type']
119 'tool'
120 >>> d['entity_id']
121 '123'
122 >>> 'timestamp' in d
123 True
124 >>> 'id' in d
125 True
126 """
127 return {
128 "id": self.id,
129 "timestamp": self.timestamp.isoformat(),
130 "level": self.level,
131 "entity_type": self.entity_type,
132 "entity_id": self.entity_id,
133 "entity_name": self.entity_name,
134 "message": self.message,
135 "logger": self.logger,
136 "data": self.data,
137 "request_id": self.request_id,
138 }
141class LogStorageMessage(TypedDict):
142 """TypedDict for messages sent to subscribers."""
144 type: str
145 data: LogEntryDict
148class LogStorageService:
149 """Service for storing and retrieving log entries in memory.
151 Provides:
152 - Size-limited circular buffer (default 1MB)
153 - Entity context tracking
154 - Real-time streaming
155 - Filtering and pagination
156 """
158 def __init__(self) -> None:
159 """Initialize log storage service."""
160 # Calculate max buffer size in bytes
161 self._max_size_bytes = int(settings.log_buffer_size_mb * 1024 * 1024)
162 self._current_size_bytes = 0
164 # Use deque for efficient append/pop operations
165 self._buffer: Deque[LogEntry] = deque()
166 self._subscribers: List[asyncio.Queue[LogStorageMessage]] = []
168 # Indices for efficient filtering
169 self._entity_index: Dict[str, List[str]] = {} # entity_key -> [log_ids]
170 self._request_index: Dict[str, List[str]] = {} # request_id -> [log_ids]
172 async def add_log( # pylint: disable=too-many-positional-arguments
173 self,
174 level: LogLevel,
175 message: str,
176 entity_type: Optional[str] = None,
177 entity_id: Optional[str] = None,
178 entity_name: Optional[str] = None,
179 logger: Optional[str] = None,
180 data: Optional[Dict[str, Any]] = None,
181 request_id: Optional[str] = None,
182 ) -> LogEntry:
183 """Add a log entry to storage.
185 Args:
186 level: Log severity level
187 message: Log message
188 entity_type: Type of entity (tool, resource, server, gateway)
189 entity_id: ID of the related entity
190 entity_name: Name of the related entity
191 logger: Logger name/source
192 data: Additional structured data
193 request_id: Associated request ID for tracing
195 Returns:
196 The created LogEntry
197 """
198 log_entry = LogEntry(
199 level=level,
200 message=message,
201 entity_type=entity_type,
202 entity_id=entity_id,
203 entity_name=entity_name,
204 logger=logger,
205 data=data,
206 request_id=request_id,
207 )
209 # Add to buffer and update size
210 self._buffer.append(log_entry)
211 self._current_size_bytes += log_entry._size # pylint: disable=protected-access
213 # Update indices BEFORE eviction so they can be cleaned up properly
214 if entity_id:
215 key = f"{entity_type}:{entity_id}" if entity_type else entity_id
216 if key not in self._entity_index:
217 self._entity_index[key] = []
218 self._entity_index[key].append(log_entry.id)
220 if request_id:
221 if request_id not in self._request_index:
222 self._request_index[request_id] = []
223 self._request_index[request_id].append(log_entry.id)
225 # Remove old entries if size limit exceeded
226 while self._current_size_bytes > self._max_size_bytes and self._buffer:
227 old_entry = self._buffer.popleft()
228 self._current_size_bytes -= old_entry._size # pylint: disable=protected-access
229 self._remove_from_indices(old_entry)
231 # Notify subscribers
232 await self._notify_subscribers(log_entry)
234 return log_entry
236 def _remove_from_indices(self, entry: LogEntry) -> None:
237 """Remove entry from indices when evicted from buffer.
239 Args:
240 entry: LogEntry to remove from indices
241 """
242 # Remove from entity index
243 if entry.entity_id:
244 key = f"{entry.entity_type}:{entry.entity_id}" if entry.entity_type else entry.entity_id
245 if key in self._entity_index:
246 try:
247 self._entity_index[key].remove(entry.id)
248 if not self._entity_index[key]:
249 del self._entity_index[key]
250 except ValueError:
251 pass
253 # Remove from request index
254 if entry.request_id and entry.request_id in self._request_index:
255 try:
256 self._request_index[entry.request_id].remove(entry.id)
257 if not self._request_index[entry.request_id]:
258 del self._request_index[entry.request_id]
259 except ValueError:
260 pass
262 async def _notify_subscribers(self, log_entry: LogEntry) -> None:
263 """Notify subscribers of new log entry.
265 Args:
266 log_entry: New log entry
267 """
268 message: LogStorageMessage = {
269 "type": "log_entry",
270 "data": log_entry.to_dict(),
271 }
273 # Remove dead subscribers
274 dead_subscribers = []
275 for queue in self._subscribers:
276 try:
277 # Non-blocking put with timeout
278 queue.put_nowait(message)
279 except asyncio.QueueFull:
280 # Skip if subscriber is too slow
281 pass
282 except Exception:
283 # Mark for removal if queue is broken
284 dead_subscribers.append(queue)
286 # Clean up dead subscribers
287 for queue in dead_subscribers:
288 self._subscribers.remove(queue)
290 async def get_logs( # pylint: disable=too-many-positional-arguments
291 self,
292 entity_type: Optional[str] = None,
293 entity_id: Optional[str] = None,
294 level: Optional[LogLevel] = None,
295 start_time: Optional[datetime] = None,
296 end_time: Optional[datetime] = None,
297 request_id: Optional[str] = None,
298 search: Optional[str] = None,
299 limit: int = 100,
300 offset: int = 0,
301 order: str = "desc",
302 ) -> List[LogEntryDict]:
303 """Get filtered log entries.
305 Args:
306 entity_type: Filter by entity type
307 entity_id: Filter by entity ID
308 level: Minimum log level
309 start_time: Start of time range
310 end_time: End of time range
311 request_id: Filter by request ID
312 search: Search in message text
313 limit: Maximum number of results
314 offset: Number of results to skip
315 order: Sort order (asc or desc)
317 Returns:
318 List of matching log entries as dictionaries
319 """
320 # Start with all logs or filtered by indices
321 if entity_id:
322 key = f"{entity_type}:{entity_id}" if entity_type else entity_id
323 log_ids = set(self._entity_index.get(key, []))
324 candidates = [log for log in self._buffer if log.id in log_ids]
325 elif request_id:
326 log_ids = set(self._request_index.get(request_id, []))
327 candidates = [log for log in self._buffer if log.id in log_ids]
328 else:
329 candidates = list(self._buffer)
331 # Apply filters
332 filtered = []
333 for log in candidates:
334 # Entity type filter
335 if entity_type and log.entity_type != entity_type:
336 continue
338 # Level filter
339 if level and not self._meets_level_threshold(log.level, level):
340 continue
342 # Time range filters
343 if start_time and log.timestamp < start_time:
344 continue
345 if end_time and log.timestamp > end_time:
346 continue
348 # Search filter
349 if search and search.lower() not in log.message.lower():
350 continue
352 filtered.append(log)
354 # Sort
355 filtered.sort(key=lambda x: x.timestamp, reverse=order == "desc")
357 # Paginate
358 paginated = filtered[offset : offset + limit] # noqa: E203
360 # Convert to dictionaries
361 return [log.to_dict() for log in paginated]
363 def _meets_level_threshold(self, log_level: LogLevel, min_level: LogLevel) -> bool:
364 """Check if log level meets minimum threshold.
366 Args:
367 log_level: Log level to check
368 min_level: Minimum required level
370 Returns:
371 True if log level meets or exceeds minimum
373 Examples:
374 >>> from mcpgateway.common.models import LogLevel
375 >>> service = LogStorageService()
376 >>> service._meets_level_threshold(LogLevel.ERROR, LogLevel.WARNING)
377 True
378 >>> service._meets_level_threshold(LogLevel.INFO, LogLevel.WARNING)
379 False
380 >>> service._meets_level_threshold(LogLevel.CRITICAL, LogLevel.ERROR)
381 True
382 >>> service._meets_level_threshold(LogLevel.DEBUG, LogLevel.DEBUG)
383 True
384 """
385 level_values = {
386 LogLevel.DEBUG: 0,
387 LogLevel.INFO: 1,
388 LogLevel.NOTICE: 2,
389 LogLevel.WARNING: 3,
390 LogLevel.ERROR: 4,
391 LogLevel.CRITICAL: 5,
392 LogLevel.ALERT: 6,
393 LogLevel.EMERGENCY: 7,
394 }
396 return level_values.get(log_level, 0) >= level_values.get(min_level, 0)
398 async def subscribe(self) -> AsyncGenerator[LogStorageMessage, None]:
399 """Subscribe to real-time log updates.
401 Yields:
402 Log entry events as they occur
403 """
404 queue: asyncio.Queue[LogStorageMessage] = asyncio.Queue(maxsize=100)
405 self._subscribers.append(queue)
406 try:
407 while True:
408 message = await queue.get()
409 yield message
410 finally:
411 self._subscribers.remove(queue)
413 def get_stats(self) -> Dict[str, Any]:
414 """Get storage statistics.
416 Returns:
417 Dictionary with storage statistics
419 Examples:
420 >>> service = LogStorageService()
421 >>> stats = service.get_stats()
422 >>> 'total_logs' in stats
423 True
424 >>> 'buffer_size_bytes' in stats
425 True
426 >>> 'buffer_size_mb' in stats
427 True
428 >>> stats['total_logs']
429 0
430 >>> stats['unique_entities']
431 0
432 >>> stats['unique_requests']
433 0
434 """
435 level_counts: Dict[LogLevel, int] = {}
436 entity_counts: Dict[str, int] = {}
438 for log in self._buffer:
439 # Count by level
440 level_counts[log.level] = level_counts.get(log.level, 0) + 1
442 # Count by entity type
443 if log.entity_type:
444 entity_counts[log.entity_type] = entity_counts.get(log.entity_type, 0) + 1
446 return {
447 "total_logs": len(self._buffer),
448 "buffer_size_bytes": self._current_size_bytes,
449 "buffer_size_mb": round(self._current_size_bytes / (1024 * 1024), 2),
450 "max_size_mb": settings.log_buffer_size_mb,
451 "usage_percent": round((self._current_size_bytes / self._max_size_bytes) * 100, 1),
452 "unique_entities": len(self._entity_index),
453 "unique_requests": len(self._request_index),
454 "level_distribution": level_counts,
455 "entity_distribution": entity_counts,
456 }
458 def clear(self) -> int:
459 """Clear all logs from buffer.
461 Returns:
462 Number of logs cleared
464 Examples:
465 >>> from mcpgateway.common.models import LogLevel
466 >>> service = LogStorageService()
467 >>> import asyncio
468 >>> entry = asyncio.run(service.add_log(LogLevel.INFO, "Test"))
469 >>> isinstance(entry, LogEntry)
470 True
471 >>> service.clear()
472 1
473 >>> len(service._buffer)
474 0
475 """
476 count = len(self._buffer)
477 self._buffer.clear()
478 self._entity_index.clear()
479 self._request_index.clear()
480 self._current_size_bytes = 0
481 return count