Coverage for mcpgateway / services / log_storage_service.py: 100%

154 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/log_storage_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Log Storage Service Implementation. 

8This service provides in-memory storage for recent logs with entity context, 

9supporting filtering, pagination, and real-time streaming. 

10""" 

11 

12# Standard 

13import asyncio 

14from collections import deque 

15from datetime import datetime, timezone 

16import sys 

17from typing import Any, AsyncGenerator, Deque, Dict, List, Optional, TypedDict 

18import uuid 

19 

20# First-Party 

21from mcpgateway.common.models import LogLevel 

22from mcpgateway.config import settings 

23 

24 

25class LogEntryDict(TypedDict, total=False): 

26 """TypedDict for LogEntry serialization.""" 

27 

28 id: str 

29 timestamp: str 

30 level: LogLevel 

31 entity_type: Optional[str] 

32 entity_id: Optional[str] 

33 entity_name: Optional[str] 

34 message: str 

35 logger: Optional[str] 

36 data: Optional[Dict[str, Any]] 

37 request_id: Optional[str] 

38 

39 

40class LogEntry: 

41 """Simple log entry for in-memory storage. 

42 

43 Attributes: 

44 id: Unique identifier for the log entry 

45 timestamp: When the log entry was created 

46 level: Severity level of the log 

47 entity_type: Type of entity (tool, resource, server, gateway) 

48 entity_id: ID of the related entity 

49 entity_name: Name of the related entity for display 

50 message: The log message 

51 logger: Logger name/source 

52 data: Additional structured data 

53 request_id: Associated request ID for tracing 

54 """ 

55 

56 __slots__ = ("id", "timestamp", "level", "entity_type", "entity_id", "entity_name", "message", "logger", "data", "request_id", "_size") 

57 

58 def __init__( # pylint: disable=too-many-positional-arguments 

59 self, 

60 level: LogLevel, 

61 message: str, 

62 entity_type: Optional[str] = None, 

63 entity_id: Optional[str] = None, 

64 entity_name: Optional[str] = None, 

65 logger: Optional[str] = None, 

66 data: Optional[Dict[str, Any]] = None, 

67 request_id: Optional[str] = None, 

68 ): 

69 """Initialize a log entry. 

70 

71 Args: 

72 level: Severity level of the log 

73 message: The log message 

74 entity_type: Type of entity (tool, resource, server, gateway) 

75 entity_id: ID of the related entity 

76 entity_name: Name of the related entity for display 

77 logger: Logger name/source 

78 data: Additional structured data 

79 request_id: Associated request ID for tracing 

80 """ 

81 self.id = str(uuid.uuid4()) 

82 self.timestamp = datetime.now(timezone.utc) 

83 self.level = level 

84 self.entity_type = entity_type 

85 self.entity_id = entity_id 

86 self.entity_name = entity_name 

87 self.message = message 

88 self.logger = logger 

89 self.data = data 

90 self.request_id = request_id 

91 

92 # Estimate memory size (rough approximation) 

93 self._size = sys.getsizeof(self.id) 

94 self._size += sys.getsizeof(self.timestamp) 

95 self._size += sys.getsizeof(self.level) 

96 self._size += sys.getsizeof(self.message) 

97 self._size += sys.getsizeof(self.entity_type) if self.entity_type else 0 

98 self._size += sys.getsizeof(self.entity_id) if self.entity_id else 0 

99 self._size += sys.getsizeof(self.entity_name) if self.entity_name else 0 

100 self._size += sys.getsizeof(self.logger) if self.logger else 0 

101 self._size += sys.getsizeof(self.data) if self.data else 0 

102 self._size += sys.getsizeof(self.request_id) if self.request_id else 0 

103 

104 def to_dict(self) -> LogEntryDict: 

105 """Convert to dictionary for JSON serialization. 

106 

107 Returns: 

108 Dictionary representation of the log entry 

109 

110 Examples: 

111 >>> from mcpgateway.common.models import LogLevel 

112 >>> entry = LogEntry(LogLevel.INFO, "Test message", entity_type="tool", entity_id="123") 

113 >>> d = entry.to_dict() 

114 >>> str(d['level']) 

115 'LogLevel.INFO' 

116 >>> d['message'] 

117 'Test message' 

118 >>> d['entity_type'] 

119 'tool' 

120 >>> d['entity_id'] 

121 '123' 

122 >>> 'timestamp' in d 

123 True 

124 >>> 'id' in d 

125 True 

126 """ 

127 return { 

128 "id": self.id, 

129 "timestamp": self.timestamp.isoformat(), 

130 "level": self.level, 

131 "entity_type": self.entity_type, 

132 "entity_id": self.entity_id, 

133 "entity_name": self.entity_name, 

134 "message": self.message, 

135 "logger": self.logger, 

136 "data": self.data, 

137 "request_id": self.request_id, 

138 } 

139 

140 

141class LogStorageMessage(TypedDict): 

142 """TypedDict for messages sent to subscribers.""" 

143 

144 type: str 

145 data: LogEntryDict 

146 

147 

148class LogStorageService: 

149 """Service for storing and retrieving log entries in memory. 

150 

151 Provides: 

152 - Size-limited circular buffer (default 1MB) 

153 - Entity context tracking 

154 - Real-time streaming 

155 - Filtering and pagination 

156 """ 

157 

158 def __init__(self) -> None: 

159 """Initialize log storage service.""" 

160 # Calculate max buffer size in bytes 

161 self._max_size_bytes = int(settings.log_buffer_size_mb * 1024 * 1024) 

162 self._current_size_bytes = 0 

163 

164 # Use deque for efficient append/pop operations 

165 self._buffer: Deque[LogEntry] = deque() 

166 self._subscribers: List[asyncio.Queue[LogStorageMessage]] = [] 

167 

168 # Indices for efficient filtering 

169 self._entity_index: Dict[str, List[str]] = {} # entity_key -> [log_ids] 

170 self._request_index: Dict[str, List[str]] = {} # request_id -> [log_ids] 

171 

172 async def add_log( # pylint: disable=too-many-positional-arguments 

173 self, 

174 level: LogLevel, 

175 message: str, 

176 entity_type: Optional[str] = None, 

177 entity_id: Optional[str] = None, 

178 entity_name: Optional[str] = None, 

179 logger: Optional[str] = None, 

180 data: Optional[Dict[str, Any]] = None, 

181 request_id: Optional[str] = None, 

182 ) -> LogEntry: 

183 """Add a log entry to storage. 

184 

185 Args: 

186 level: Log severity level 

187 message: Log message 

188 entity_type: Type of entity (tool, resource, server, gateway) 

189 entity_id: ID of the related entity 

190 entity_name: Name of the related entity 

191 logger: Logger name/source 

192 data: Additional structured data 

193 request_id: Associated request ID for tracing 

194 

195 Returns: 

196 The created LogEntry 

197 """ 

198 log_entry = LogEntry( 

199 level=level, 

200 message=message, 

201 entity_type=entity_type, 

202 entity_id=entity_id, 

203 entity_name=entity_name, 

204 logger=logger, 

205 data=data, 

206 request_id=request_id, 

207 ) 

208 

209 # Add to buffer and update size 

210 self._buffer.append(log_entry) 

211 self._current_size_bytes += log_entry._size # pylint: disable=protected-access 

212 

213 # Update indices BEFORE eviction so they can be cleaned up properly 

214 if entity_id: 

215 key = f"{entity_type}:{entity_id}" if entity_type else entity_id 

216 if key not in self._entity_index: 

217 self._entity_index[key] = [] 

218 self._entity_index[key].append(log_entry.id) 

219 

220 if request_id: 

221 if request_id not in self._request_index: 

222 self._request_index[request_id] = [] 

223 self._request_index[request_id].append(log_entry.id) 

224 

225 # Remove old entries if size limit exceeded 

226 while self._current_size_bytes > self._max_size_bytes and self._buffer: 

227 old_entry = self._buffer.popleft() 

228 self._current_size_bytes -= old_entry._size # pylint: disable=protected-access 

229 self._remove_from_indices(old_entry) 

230 

231 # Notify subscribers 

232 await self._notify_subscribers(log_entry) 

233 

234 return log_entry 

235 

236 def _remove_from_indices(self, entry: LogEntry) -> None: 

237 """Remove entry from indices when evicted from buffer. 

238 

239 Args: 

240 entry: LogEntry to remove from indices 

241 """ 

242 # Remove from entity index 

243 if entry.entity_id: 

244 key = f"{entry.entity_type}:{entry.entity_id}" if entry.entity_type else entry.entity_id 

245 if key in self._entity_index: 

246 try: 

247 self._entity_index[key].remove(entry.id) 

248 if not self._entity_index[key]: 

249 del self._entity_index[key] 

250 except ValueError: 

251 pass 

252 

253 # Remove from request index 

254 if entry.request_id and entry.request_id in self._request_index: 

255 try: 

256 self._request_index[entry.request_id].remove(entry.id) 

257 if not self._request_index[entry.request_id]: 

258 del self._request_index[entry.request_id] 

259 except ValueError: 

260 pass 

261 

262 async def _notify_subscribers(self, log_entry: LogEntry) -> None: 

263 """Notify subscribers of new log entry. 

264 

265 Args: 

266 log_entry: New log entry 

267 """ 

268 message: LogStorageMessage = { 

269 "type": "log_entry", 

270 "data": log_entry.to_dict(), 

271 } 

272 

273 # Remove dead subscribers 

274 dead_subscribers = [] 

275 for queue in self._subscribers: 

276 try: 

277 # Non-blocking put with timeout 

278 queue.put_nowait(message) 

279 except asyncio.QueueFull: 

280 # Skip if subscriber is too slow 

281 pass 

282 except Exception: 

283 # Mark for removal if queue is broken 

284 dead_subscribers.append(queue) 

285 

286 # Clean up dead subscribers 

287 for queue in dead_subscribers: 

288 self._subscribers.remove(queue) 

289 

290 async def get_logs( # pylint: disable=too-many-positional-arguments 

291 self, 

292 entity_type: Optional[str] = None, 

293 entity_id: Optional[str] = None, 

294 level: Optional[LogLevel] = None, 

295 start_time: Optional[datetime] = None, 

296 end_time: Optional[datetime] = None, 

297 request_id: Optional[str] = None, 

298 search: Optional[str] = None, 

299 limit: int = 100, 

300 offset: int = 0, 

301 order: str = "desc", 

302 ) -> List[LogEntryDict]: 

303 """Get filtered log entries. 

304 

305 Args: 

306 entity_type: Filter by entity type 

307 entity_id: Filter by entity ID 

308 level: Minimum log level 

309 start_time: Start of time range 

310 end_time: End of time range 

311 request_id: Filter by request ID 

312 search: Search in message text 

313 limit: Maximum number of results 

314 offset: Number of results to skip 

315 order: Sort order (asc or desc) 

316 

317 Returns: 

318 List of matching log entries as dictionaries 

319 """ 

320 # Start with all logs or filtered by indices 

321 if entity_id: 

322 key = f"{entity_type}:{entity_id}" if entity_type else entity_id 

323 log_ids = set(self._entity_index.get(key, [])) 

324 candidates = [log for log in self._buffer if log.id in log_ids] 

325 elif request_id: 

326 log_ids = set(self._request_index.get(request_id, [])) 

327 candidates = [log for log in self._buffer if log.id in log_ids] 

328 else: 

329 candidates = list(self._buffer) 

330 

331 # Apply filters 

332 filtered = [] 

333 for log in candidates: 

334 # Entity type filter 

335 if entity_type and log.entity_type != entity_type: 

336 continue 

337 

338 # Level filter 

339 if level and not self._meets_level_threshold(log.level, level): 

340 continue 

341 

342 # Time range filters 

343 if start_time and log.timestamp < start_time: 

344 continue 

345 if end_time and log.timestamp > end_time: 

346 continue 

347 

348 # Search filter 

349 if search and search.lower() not in log.message.lower(): 

350 continue 

351 

352 filtered.append(log) 

353 

354 # Sort 

355 filtered.sort(key=lambda x: x.timestamp, reverse=order == "desc") 

356 

357 # Paginate 

358 paginated = filtered[offset : offset + limit] # noqa: E203 

359 

360 # Convert to dictionaries 

361 return [log.to_dict() for log in paginated] 

362 

363 def _meets_level_threshold(self, log_level: LogLevel, min_level: LogLevel) -> bool: 

364 """Check if log level meets minimum threshold. 

365 

366 Args: 

367 log_level: Log level to check 

368 min_level: Minimum required level 

369 

370 Returns: 

371 True if log level meets or exceeds minimum 

372 

373 Examples: 

374 >>> from mcpgateway.common.models import LogLevel 

375 >>> service = LogStorageService() 

376 >>> service._meets_level_threshold(LogLevel.ERROR, LogLevel.WARNING) 

377 True 

378 >>> service._meets_level_threshold(LogLevel.INFO, LogLevel.WARNING) 

379 False 

380 >>> service._meets_level_threshold(LogLevel.CRITICAL, LogLevel.ERROR) 

381 True 

382 >>> service._meets_level_threshold(LogLevel.DEBUG, LogLevel.DEBUG) 

383 True 

384 """ 

385 level_values = { 

386 LogLevel.DEBUG: 0, 

387 LogLevel.INFO: 1, 

388 LogLevel.NOTICE: 2, 

389 LogLevel.WARNING: 3, 

390 LogLevel.ERROR: 4, 

391 LogLevel.CRITICAL: 5, 

392 LogLevel.ALERT: 6, 

393 LogLevel.EMERGENCY: 7, 

394 } 

395 

396 return level_values.get(log_level, 0) >= level_values.get(min_level, 0) 

397 

398 async def subscribe(self) -> AsyncGenerator[LogStorageMessage, None]: 

399 """Subscribe to real-time log updates. 

400 

401 Yields: 

402 Log entry events as they occur 

403 """ 

404 queue: asyncio.Queue[LogStorageMessage] = asyncio.Queue(maxsize=100) 

405 self._subscribers.append(queue) 

406 try: 

407 while True: 

408 message = await queue.get() 

409 yield message 

410 finally: 

411 self._subscribers.remove(queue) 

412 

413 def get_stats(self) -> Dict[str, Any]: 

414 """Get storage statistics. 

415 

416 Returns: 

417 Dictionary with storage statistics 

418 

419 Examples: 

420 >>> service = LogStorageService() 

421 >>> stats = service.get_stats() 

422 >>> 'total_logs' in stats 

423 True 

424 >>> 'buffer_size_bytes' in stats 

425 True 

426 >>> 'buffer_size_mb' in stats 

427 True 

428 >>> stats['total_logs'] 

429 0 

430 >>> stats['unique_entities'] 

431 0 

432 >>> stats['unique_requests'] 

433 0 

434 """ 

435 level_counts: Dict[LogLevel, int] = {} 

436 entity_counts: Dict[str, int] = {} 

437 

438 for log in self._buffer: 

439 # Count by level 

440 level_counts[log.level] = level_counts.get(log.level, 0) + 1 

441 

442 # Count by entity type 

443 if log.entity_type: 

444 entity_counts[log.entity_type] = entity_counts.get(log.entity_type, 0) + 1 

445 

446 return { 

447 "total_logs": len(self._buffer), 

448 "buffer_size_bytes": self._current_size_bytes, 

449 "buffer_size_mb": round(self._current_size_bytes / (1024 * 1024), 2), 

450 "max_size_mb": settings.log_buffer_size_mb, 

451 "usage_percent": round((self._current_size_bytes / self._max_size_bytes) * 100, 1), 

452 "unique_entities": len(self._entity_index), 

453 "unique_requests": len(self._request_index), 

454 "level_distribution": level_counts, 

455 "entity_distribution": entity_counts, 

456 } 

457 

458 def clear(self) -> int: 

459 """Clear all logs from buffer. 

460 

461 Returns: 

462 Number of logs cleared 

463 

464 Examples: 

465 >>> from mcpgateway.common.models import LogLevel 

466 >>> service = LogStorageService() 

467 >>> import asyncio 

468 >>> entry = asyncio.run(service.add_log(LogLevel.INFO, "Test")) 

469 >>> isinstance(entry, LogEntry) 

470 True 

471 >>> service.clear() 

472 1 

473 >>> len(service._buffer) 

474 0 

475 """ 

476 count = len(self._buffer) 

477 self._buffer.clear() 

478 self._entity_index.clear() 

479 self._request_index.clear() 

480 self._current_size_bytes = 0 

481 return count