Coverage for mcpgateway / services / metrics_cleanup_service.py: 98%

185 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Metrics Cleanup Service for automatic deletion of old metrics. 

3 

4This service provides automatic and manual cleanup of old metrics data to prevent 

5unbounded table growth and maintain query performance. 

6 

7Features: 

8- Batched deletion to prevent long locks 

9- Configurable retention period 

10- Background task for periodic cleanup 

11- Manual cleanup trigger via admin API 

12- Per-table cleanup with statistics 

13 

14Copyright 2025 

15SPDX-License-Identifier: Apache-2.0 

16""" 

17 

18# Standard 

19import asyncio 

20from contextlib import contextmanager 

21from dataclasses import dataclass 

22from datetime import datetime, timedelta, timezone 

23import logging 

24import time 

25from typing import Dict, Optional 

26 

27# Third-Party 

28from sqlalchemy import delete, func, select 

29from sqlalchemy.orm import Session 

30 

31# First-Party 

32from mcpgateway.config import settings 

33from mcpgateway.db import ( 

34 A2AAgentMetric, 

35 A2AAgentMetricsHourly, 

36 fresh_db_session, 

37 PromptMetric, 

38 PromptMetricsHourly, 

39 ResourceMetric, 

40 ResourceMetricsHourly, 

41 ServerMetric, 

42 ServerMetricsHourly, 

43 ToolMetric, 

44 ToolMetricsHourly, 

45) 

46from mcpgateway.services.metrics_rollup_service import get_metrics_rollup_service_if_initialized 

47 

48logger = logging.getLogger(__name__) 

49 

50 

51def delete_metrics_in_batches(db: Session, model_class, filter_column, entity_id, batch_size: Optional[int] = None) -> int: 

52 """Delete metrics rows for a specific entity in batches within the current transaction. 

53 

54 Args: 

55 db: Database session. 

56 model_class: SQLAlchemy model to delete from. 

57 filter_column: Column used to filter by entity_id. 

58 entity_id: Entity identifier to delete metrics for. 

59 batch_size: Optional batch size override. 

60 

61 Returns: 

62 int: Total rows deleted. 

63 """ 

64 effective_batch_size = batch_size or getattr(settings, "metrics_cleanup_batch_size", 10000) 

65 total_deleted = 0 

66 

67 while True: 

68 subq = select(model_class.id).where(filter_column == entity_id).limit(effective_batch_size) 

69 delete_stmt = delete(model_class).where(model_class.id.in_(subq)) 

70 result = db.execute(delete_stmt) 

71 

72 rowcount = result.rowcount 

73 batch_deleted = rowcount if isinstance(rowcount, int) else 0 

74 total_deleted += batch_deleted 

75 

76 if batch_deleted <= 0 or batch_deleted < effective_batch_size: 

77 break 

78 

79 return total_deleted 

80 

81 

82@contextmanager 

83def pause_rollup_during_purge(reason: str = "purge_metrics"): 

84 """Pause rollup task while purging metrics to reduce race conditions. 

85 

86 Args: 

87 reason: Reason for pausing the rollup task. 

88 

89 Yields: 

90 None 

91 """ 

92 rollup_service = get_metrics_rollup_service_if_initialized() 

93 if rollup_service: 

94 rollup_service.pause(reason) 

95 try: 

96 yield 

97 finally: 

98 if rollup_service: 

99 rollup_service.resume() 

100 

101 

102@dataclass 

103class CleanupResult: 

104 """Result of a cleanup operation.""" 

105 

106 table_name: str 

107 deleted_count: int 

108 remaining_count: int 

109 cutoff_date: datetime 

110 duration_seconds: float 

111 error: Optional[str] = None 

112 

113 

114@dataclass 

115class CleanupSummary: 

116 """Summary of all cleanup operations.""" 

117 

118 total_deleted: int 

119 tables: Dict[str, CleanupResult] 

120 duration_seconds: float 

121 started_at: datetime 

122 completed_at: datetime 

123 

124 

125class MetricsCleanupService: 

126 """Service for cleaning up old metrics data. 

127 

128 This service provides: 

129 - Batched deletion of old metrics (prevents long locks) 

130 - Configurable retention period per table type 

131 - Background task for periodic cleanup 

132 - Manual cleanup trigger 

133 - Cleanup statistics and reporting 

134 

135 Configuration (via environment variables): 

136 - METRICS_CLEANUP_ENABLED: Enable automatic cleanup (default: True) 

137 - METRICS_RETENTION_DAYS: Days to retain raw metrics (default: 7) 

138 - METRICS_CLEANUP_INTERVAL_HOURS: Hours between cleanup runs (default: 1) 

139 - METRICS_CLEANUP_BATCH_SIZE: Batch size for deletion (default: 10000) 

140 - METRICS_DELETE_RAW_AFTER_ROLLUP: Delete raw after rollup exists (default: True) 

141 - METRICS_DELETE_RAW_AFTER_ROLLUP_HOURS: Hours after which to delete if rollup exists (default: 1) 

142 """ 

143 

144 # Map of raw metric tables to their hourly rollup counterparts 

145 METRIC_TABLES = [ 

146 ("tool_metrics", ToolMetric, ToolMetricsHourly, "tool_id"), 

147 ("resource_metrics", ResourceMetric, ResourceMetricsHourly, "resource_id"), 

148 ("prompt_metrics", PromptMetric, PromptMetricsHourly, "prompt_id"), 

149 ("server_metrics", ServerMetric, ServerMetricsHourly, "server_id"), 

150 ("a2a_agent_metrics", A2AAgentMetric, A2AAgentMetricsHourly, "a2a_agent_id"), 

151 ] 

152 

153 def __init__( 

154 self, 

155 retention_days: Optional[int] = None, 

156 batch_size: Optional[int] = None, 

157 cleanup_interval_hours: Optional[int] = None, 

158 enabled: Optional[bool] = None, 

159 ): 

160 """Initialize the metrics cleanup service. 

161 

162 Args: 

163 retention_days: Days to retain raw metrics (default: from settings or 7) 

164 batch_size: Batch size for deletion (default: from settings or 10000) 

165 cleanup_interval_hours: Hours between cleanup runs (default: from settings or 1) 

166 enabled: Whether cleanup is enabled (default: from settings or True) 

167 """ 

168 self.retention_days = retention_days or getattr(settings, "metrics_retention_days", 7) 

169 self.batch_size = batch_size or getattr(settings, "metrics_cleanup_batch_size", 10000) 

170 self.cleanup_interval_hours = cleanup_interval_hours or getattr(settings, "metrics_cleanup_interval_hours", 1) 

171 self.enabled = enabled if enabled is not None else getattr(settings, "metrics_cleanup_enabled", True) 

172 

173 # Raw deletion after rollup is handled by MetricsRollupService; stored for stats/reporting. 

174 self.delete_raw_after_rollup = getattr(settings, "metrics_delete_raw_after_rollup", True) 

175 self.delete_raw_after_rollup_hours = getattr(settings, "metrics_delete_raw_after_rollup_hours", 1) 

176 

177 # Rollup retention 

178 self.rollup_retention_days = getattr(settings, "metrics_rollup_retention_days", 365) 

179 

180 # Background task 

181 self._cleanup_task: Optional[asyncio.Task] = None 

182 self._shutdown_event = asyncio.Event() 

183 

184 # Stats 

185 self._total_cleaned = 0 

186 self._cleanup_runs = 0 

187 

188 logger.info( 

189 f"MetricsCleanupService initialized: enabled={self.enabled}, " f"retention_days={self.retention_days}, batch_size={self.batch_size}, " f"interval_hours={self.cleanup_interval_hours}" 

190 ) 

191 

192 async def start(self) -> None: 

193 """Start the background cleanup task.""" 

194 if not self.enabled: 

195 logger.info("MetricsCleanupService disabled, skipping start") 

196 return 

197 

198 if self._cleanup_task is None or self._cleanup_task.done(): 198 ↛ exitline 198 didn't return from function 'start' because the condition on line 198 was always true

199 self._shutdown_event.clear() 

200 self._cleanup_task = asyncio.create_task(self._cleanup_loop()) 

201 logger.info("MetricsCleanupService background task started") 

202 

203 async def shutdown(self) -> None: 

204 """Shutdown the cleanup service.""" 

205 logger.info("MetricsCleanupService shutting down...") 

206 

207 # Signal shutdown 

208 self._shutdown_event.set() 

209 

210 # Cancel the cleanup task 

211 if self._cleanup_task: 211 ↛ 218line 211 didn't jump to line 218 because the condition on line 211 was always true

212 self._cleanup_task.cancel() 

213 try: 

214 await self._cleanup_task 

215 except asyncio.CancelledError: 

216 pass 

217 

218 logger.info(f"MetricsCleanupService shutdown complete: " f"total_cleaned={self._total_cleaned}, cleanup_runs={self._cleanup_runs}") 

219 

220 async def _cleanup_loop(self) -> None: 

221 """Background task that periodically cleans up old metrics. 

222 

223 Raises: 

224 asyncio.CancelledError: When the task is cancelled during shutdown. 

225 """ 

226 logger.info(f"Metrics cleanup loop started (interval={self.cleanup_interval_hours}h)") 

227 

228 # Calculate interval in seconds 

229 interval_seconds = self.cleanup_interval_hours * 3600 

230 

231 while not self._shutdown_event.is_set(): 

232 try: 

233 # Wait for interval or shutdown 

234 try: 

235 await asyncio.wait_for( 

236 self._shutdown_event.wait(), 

237 timeout=interval_seconds, 

238 ) 

239 # Shutdown signaled 

240 break 

241 except asyncio.TimeoutError: 

242 # Normal timeout, proceed to cleanup 

243 pass 

244 

245 # Run cleanup 

246 summary = await self.cleanup_all() 

247 self._cleanup_runs += 1 

248 self._total_cleaned += summary.total_deleted 

249 

250 if summary.total_deleted > 0: 250 ↛ 231line 250 didn't jump to line 231 because the condition on line 250 was always true

251 logger.info(f"Metrics cleanup #{self._cleanup_runs}: deleted {summary.total_deleted} records " f"in {summary.duration_seconds:.2f}s") 

252 

253 except asyncio.CancelledError: 

254 logger.debug("Cleanup loop cancelled") 

255 raise 

256 except Exception as e: 

257 logger.error(f"Error in metrics cleanup loop: {e}", exc_info=True) 

258 # Continue the loop despite errors 

259 await asyncio.sleep(60) 

260 

261 async def cleanup_all( 

262 self, 

263 retention_days: Optional[int] = None, 

264 include_rollup: bool = True, 

265 ) -> CleanupSummary: 

266 """Clean up all old metrics across all tables. 

267 

268 Args: 

269 retention_days: Override retention period (optional). Use 0 to delete all. 

270 include_rollup: Also clean up old rollup tables 

271 

272 Returns: 

273 CleanupSummary: Summary of cleanup operations 

274 """ 

275 started_at = datetime.now(timezone.utc) 

276 start_time = time.monotonic() 

277 

278 # Use provided retention_days if set (including 0), otherwise use default 

279 retention = retention_days if retention_days is not None else self.retention_days 

280 # For retention=0 (delete all), use current time without hour-alignment to delete everything 

281 # Otherwise, hour-align cutoff to match query service's get_retention_cutoff() and prevent gaps 

282 if retention == 0: 

283 cutoff = datetime.now(timezone.utc) + timedelta(hours=1) # Future time ensures all data is deleted 

284 else: 

285 cutoff = (datetime.now(timezone.utc) - timedelta(days=retention)).replace(minute=0, second=0, microsecond=0) 

286 

287 results: Dict[str, CleanupResult] = {} 

288 total_deleted = 0 

289 

290 # Clean up raw metrics tables 

291 for table_name, model_class, _, _ in self.METRIC_TABLES: 

292 result = await asyncio.to_thread( 

293 self._cleanup_table, 

294 model_class, 

295 table_name, 

296 cutoff, 

297 ) 

298 results[table_name] = result 

299 total_deleted += result.deleted_count 

300 

301 # Clean up rollup tables if enabled 

302 if include_rollup: 302 ↛ 321line 302 didn't jump to line 321 because the condition on line 302 was always true

303 # If retention_days is explicitly 0 (delete all), also delete all rollups 

304 # Otherwise use the rollup retention period 

305 if retention_days is not None and retention_days == 0: 

306 rollup_cutoff = datetime.now(timezone.utc) # Delete all rollups too 

307 else: 

308 rollup_cutoff = datetime.now(timezone.utc) - timedelta(days=self.rollup_retention_days) 

309 for table_name, _, hourly_model_class, _ in self.METRIC_TABLES: 

310 hourly_table_name = f"{table_name}_hourly" 

311 result = await asyncio.to_thread( 

312 self._cleanup_table, 

313 hourly_model_class, 

314 hourly_table_name, 

315 rollup_cutoff, 

316 timestamp_column="hour_start", 

317 ) 

318 results[hourly_table_name] = result 

319 total_deleted += result.deleted_count 

320 

321 duration = time.monotonic() - start_time 

322 completed_at = datetime.now(timezone.utc) 

323 

324 return CleanupSummary( 

325 total_deleted=total_deleted, 

326 tables=results, 

327 duration_seconds=duration, 

328 started_at=started_at, 

329 completed_at=completed_at, 

330 ) 

331 

332 def _cleanup_table( 

333 self, 

334 model_class, 

335 table_name: str, 

336 cutoff: datetime, 

337 timestamp_column: str = "timestamp", 

338 ) -> CleanupResult: 

339 """Clean up old records from a single table using batched deletion. 

340 

341 Args: 

342 model_class: SQLAlchemy model class 

343 table_name: Table name for logging 

344 cutoff: Delete records older than this timestamp 

345 timestamp_column: Name of the timestamp column 

346 

347 Returns: 

348 CleanupResult: Result of the cleanup operation 

349 """ 

350 start_time = time.monotonic() 

351 total_deleted = 0 

352 error_msg = None 

353 

354 try: 

355 with fresh_db_session() as db: 

356 timestamp_col = getattr(model_class, timestamp_column) 

357 

358 # Delete in batches to prevent long locks 

359 while True: 

360 # Get batch of IDs to delete 

361 stmt = select(model_class.id).where(timestamp_col < cutoff).limit(self.batch_size) 

362 ids_to_delete = [row[0] for row in db.execute(stmt).fetchall()] 

363 

364 if not ids_to_delete: 

365 break 

366 

367 # Delete batch 

368 delete_stmt = delete(model_class).where(model_class.id.in_(ids_to_delete)) 

369 result = db.execute(delete_stmt) 

370 db.commit() 

371 

372 batch_deleted = result.rowcount 

373 total_deleted += batch_deleted 

374 

375 logger.debug(f"Cleaned {batch_deleted} records from {table_name}") 

376 

377 # If we deleted less than batch size, we're done 

378 if batch_deleted < self.batch_size: 378 ↛ 359line 378 didn't jump to line 359 because the condition on line 378 was always true

379 break 

380 

381 # Get remaining count 

382 remaining_count = db.execute(select(func.count()).select_from(model_class)).scalar() or 0 # pylint: disable=not-callable 

383 

384 except Exception as e: 

385 logger.error(f"Error cleaning up {table_name}: {e}") 

386 error_msg = str(e) 

387 remaining_count = -1 

388 

389 duration = time.monotonic() - start_time 

390 

391 if total_deleted > 0: 

392 logger.info(f"Cleaned {total_deleted} records from {table_name} (cutoff: {cutoff})") 

393 

394 return CleanupResult( 

395 table_name=table_name, 

396 deleted_count=total_deleted, 

397 remaining_count=remaining_count, 

398 cutoff_date=cutoff, 

399 duration_seconds=duration, 

400 error=error_msg, 

401 ) 

402 

403 async def cleanup_table( 

404 self, 

405 table_type: str, 

406 retention_days: Optional[int] = None, 

407 ) -> CleanupResult: 

408 """Clean up old records from a specific table. 

409 

410 Args: 

411 table_type: One of 'tool', 'resource', 'prompt', 'server', 'a2a_agent' 

412 retention_days: Override retention period (optional). Use 0 to delete all. 

413 

414 Returns: 

415 CleanupResult: Result of the cleanup operation 

416 

417 Raises: 

418 ValueError: If table_type is not recognized 

419 """ 

420 table_map = { 

421 "tool": ("tool_metrics", ToolMetric), 

422 "resource": ("resource_metrics", ResourceMetric), 

423 "prompt": ("prompt_metrics", PromptMetric), 

424 "server": ("server_metrics", ServerMetric), 

425 "a2a_agent": ("a2a_agent_metrics", A2AAgentMetric), 

426 } 

427 

428 if table_type not in table_map: 

429 raise ValueError(f"Unknown table type: {table_type}. Must be one of: {list(table_map.keys())}") 

430 

431 table_name, model_class = table_map[table_type] 

432 # Use provided retention_days if set (including 0), otherwise use default 

433 retention = retention_days if retention_days is not None else self.retention_days 

434 # For retention=0 (delete all), use current time without hour-alignment to delete everything 

435 # Otherwise, hour-align cutoff to match query service's get_retention_cutoff() and prevent gaps 

436 if retention == 0: 

437 cutoff = datetime.now(timezone.utc) + timedelta(hours=1) # Future time ensures all data is deleted 

438 else: 

439 cutoff = (datetime.now(timezone.utc) - timedelta(days=retention)).replace(minute=0, second=0, microsecond=0) 

440 

441 return await asyncio.to_thread( 

442 self._cleanup_table, 

443 model_class, 

444 table_name, 

445 cutoff, 

446 ) 

447 

448 def get_stats(self) -> dict: 

449 """Get cleanup service statistics. 

450 

451 Returns: 

452 dict: Cleanup statistics 

453 """ 

454 return { 

455 "enabled": self.enabled, 

456 "retention_days": self.retention_days, 

457 "rollup_retention_days": self.rollup_retention_days, 

458 "batch_size": self.batch_size, 

459 "cleanup_interval_hours": self.cleanup_interval_hours, 

460 "total_cleaned": self._total_cleaned, 

461 "cleanup_runs": self._cleanup_runs, 

462 "delete_raw_after_rollup": self.delete_raw_after_rollup, 

463 "delete_raw_after_rollup_hours": self.delete_raw_after_rollup_hours, 

464 } 

465 

466 async def get_table_sizes(self) -> Dict[str, int]: 

467 """Get the current size of all metrics tables. 

468 

469 Returns: 

470 Dict[str, int]: Table name to row count mapping 

471 """ 

472 sizes = {} 

473 

474 def _get_sizes() -> Dict[str, int]: 

475 with fresh_db_session() as db: 

476 for table_name, model_class, hourly_class, _ in self.METRIC_TABLES: 

477 # pylint: disable=not-callable 

478 sizes[table_name] = db.execute(select(func.count()).select_from(model_class)).scalar() or 0 

479 hourly_name = f"{table_name}_hourly" 

480 sizes[hourly_name] = db.execute(select(func.count()).select_from(hourly_class)).scalar() or 0 

481 return sizes 

482 

483 return await asyncio.to_thread(_get_sizes) 

484 

485 

486# Singleton instance 

487_metrics_cleanup_service: Optional[MetricsCleanupService] = None 

488 

489 

490def get_metrics_cleanup_service() -> MetricsCleanupService: 

491 """Get or create the singleton MetricsCleanupService instance. 

492 

493 Returns: 

494 MetricsCleanupService: The singleton cleanup service instance 

495 """ 

496 global _metrics_cleanup_service # pylint: disable=global-statement 

497 if _metrics_cleanup_service is None: 

498 _metrics_cleanup_service = MetricsCleanupService() 

499 return _metrics_cleanup_service