Coverage for mcpgateway / services / metrics_cleanup_service.py: 98%
185 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Metrics Cleanup Service for automatic deletion of old metrics.
4This service provides automatic and manual cleanup of old metrics data to prevent
5unbounded table growth and maintain query performance.
7Features:
8- Batched deletion to prevent long locks
9- Configurable retention period
10- Background task for periodic cleanup
11- Manual cleanup trigger via admin API
12- Per-table cleanup with statistics
14Copyright 2025
15SPDX-License-Identifier: Apache-2.0
16"""
18# Standard
19import asyncio
20from contextlib import contextmanager
21from dataclasses import dataclass
22from datetime import datetime, timedelta, timezone
23import logging
24import time
25from typing import Dict, Optional
27# Third-Party
28from sqlalchemy import delete, func, select
29from sqlalchemy.orm import Session
31# First-Party
32from mcpgateway.config import settings
33from mcpgateway.db import (
34 A2AAgentMetric,
35 A2AAgentMetricsHourly,
36 fresh_db_session,
37 PromptMetric,
38 PromptMetricsHourly,
39 ResourceMetric,
40 ResourceMetricsHourly,
41 ServerMetric,
42 ServerMetricsHourly,
43 ToolMetric,
44 ToolMetricsHourly,
45)
46from mcpgateway.services.metrics_rollup_service import get_metrics_rollup_service_if_initialized
48logger = logging.getLogger(__name__)
51def delete_metrics_in_batches(db: Session, model_class, filter_column, entity_id, batch_size: Optional[int] = None) -> int:
52 """Delete metrics rows for a specific entity in batches within the current transaction.
54 Args:
55 db: Database session.
56 model_class: SQLAlchemy model to delete from.
57 filter_column: Column used to filter by entity_id.
58 entity_id: Entity identifier to delete metrics for.
59 batch_size: Optional batch size override.
61 Returns:
62 int: Total rows deleted.
63 """
64 effective_batch_size = batch_size or getattr(settings, "metrics_cleanup_batch_size", 10000)
65 total_deleted = 0
67 while True:
68 subq = select(model_class.id).where(filter_column == entity_id).limit(effective_batch_size)
69 delete_stmt = delete(model_class).where(model_class.id.in_(subq))
70 result = db.execute(delete_stmt)
72 rowcount = result.rowcount
73 batch_deleted = rowcount if isinstance(rowcount, int) else 0
74 total_deleted += batch_deleted
76 if batch_deleted <= 0 or batch_deleted < effective_batch_size:
77 break
79 return total_deleted
82@contextmanager
83def pause_rollup_during_purge(reason: str = "purge_metrics"):
84 """Pause rollup task while purging metrics to reduce race conditions.
86 Args:
87 reason: Reason for pausing the rollup task.
89 Yields:
90 None
91 """
92 rollup_service = get_metrics_rollup_service_if_initialized()
93 if rollup_service:
94 rollup_service.pause(reason)
95 try:
96 yield
97 finally:
98 if rollup_service:
99 rollup_service.resume()
102@dataclass
103class CleanupResult:
104 """Result of a cleanup operation."""
106 table_name: str
107 deleted_count: int
108 remaining_count: int
109 cutoff_date: datetime
110 duration_seconds: float
111 error: Optional[str] = None
114@dataclass
115class CleanupSummary:
116 """Summary of all cleanup operations."""
118 total_deleted: int
119 tables: Dict[str, CleanupResult]
120 duration_seconds: float
121 started_at: datetime
122 completed_at: datetime
125class MetricsCleanupService:
126 """Service for cleaning up old metrics data.
128 This service provides:
129 - Batched deletion of old metrics (prevents long locks)
130 - Configurable retention period per table type
131 - Background task for periodic cleanup
132 - Manual cleanup trigger
133 - Cleanup statistics and reporting
135 Configuration (via environment variables):
136 - METRICS_CLEANUP_ENABLED: Enable automatic cleanup (default: True)
137 - METRICS_RETENTION_DAYS: Days to retain raw metrics (default: 7)
138 - METRICS_CLEANUP_INTERVAL_HOURS: Hours between cleanup runs (default: 1)
139 - METRICS_CLEANUP_BATCH_SIZE: Batch size for deletion (default: 10000)
140 - METRICS_DELETE_RAW_AFTER_ROLLUP: Delete raw after rollup exists (default: True)
141 - METRICS_DELETE_RAW_AFTER_ROLLUP_HOURS: Hours after which to delete if rollup exists (default: 1)
142 """
144 # Map of raw metric tables to their hourly rollup counterparts
145 METRIC_TABLES = [
146 ("tool_metrics", ToolMetric, ToolMetricsHourly, "tool_id"),
147 ("resource_metrics", ResourceMetric, ResourceMetricsHourly, "resource_id"),
148 ("prompt_metrics", PromptMetric, PromptMetricsHourly, "prompt_id"),
149 ("server_metrics", ServerMetric, ServerMetricsHourly, "server_id"),
150 ("a2a_agent_metrics", A2AAgentMetric, A2AAgentMetricsHourly, "a2a_agent_id"),
151 ]
153 def __init__(
154 self,
155 retention_days: Optional[int] = None,
156 batch_size: Optional[int] = None,
157 cleanup_interval_hours: Optional[int] = None,
158 enabled: Optional[bool] = None,
159 ):
160 """Initialize the metrics cleanup service.
162 Args:
163 retention_days: Days to retain raw metrics (default: from settings or 7)
164 batch_size: Batch size for deletion (default: from settings or 10000)
165 cleanup_interval_hours: Hours between cleanup runs (default: from settings or 1)
166 enabled: Whether cleanup is enabled (default: from settings or True)
167 """
168 self.retention_days = retention_days or getattr(settings, "metrics_retention_days", 7)
169 self.batch_size = batch_size or getattr(settings, "metrics_cleanup_batch_size", 10000)
170 self.cleanup_interval_hours = cleanup_interval_hours or getattr(settings, "metrics_cleanup_interval_hours", 1)
171 self.enabled = enabled if enabled is not None else getattr(settings, "metrics_cleanup_enabled", True)
173 # Raw deletion after rollup is handled by MetricsRollupService; stored for stats/reporting.
174 self.delete_raw_after_rollup = getattr(settings, "metrics_delete_raw_after_rollup", True)
175 self.delete_raw_after_rollup_hours = getattr(settings, "metrics_delete_raw_after_rollup_hours", 1)
177 # Rollup retention
178 self.rollup_retention_days = getattr(settings, "metrics_rollup_retention_days", 365)
180 # Background task
181 self._cleanup_task: Optional[asyncio.Task] = None
182 self._shutdown_event = asyncio.Event()
184 # Stats
185 self._total_cleaned = 0
186 self._cleanup_runs = 0
188 logger.info(
189 f"MetricsCleanupService initialized: enabled={self.enabled}, " f"retention_days={self.retention_days}, batch_size={self.batch_size}, " f"interval_hours={self.cleanup_interval_hours}"
190 )
192 async def start(self) -> None:
193 """Start the background cleanup task."""
194 if not self.enabled:
195 logger.info("MetricsCleanupService disabled, skipping start")
196 return
198 if self._cleanup_task is None or self._cleanup_task.done(): 198 ↛ exitline 198 didn't return from function 'start' because the condition on line 198 was always true
199 self._shutdown_event.clear()
200 self._cleanup_task = asyncio.create_task(self._cleanup_loop())
201 logger.info("MetricsCleanupService background task started")
203 async def shutdown(self) -> None:
204 """Shutdown the cleanup service."""
205 logger.info("MetricsCleanupService shutting down...")
207 # Signal shutdown
208 self._shutdown_event.set()
210 # Cancel the cleanup task
211 if self._cleanup_task: 211 ↛ 218line 211 didn't jump to line 218 because the condition on line 211 was always true
212 self._cleanup_task.cancel()
213 try:
214 await self._cleanup_task
215 except asyncio.CancelledError:
216 pass
218 logger.info(f"MetricsCleanupService shutdown complete: " f"total_cleaned={self._total_cleaned}, cleanup_runs={self._cleanup_runs}")
220 async def _cleanup_loop(self) -> None:
221 """Background task that periodically cleans up old metrics.
223 Raises:
224 asyncio.CancelledError: When the task is cancelled during shutdown.
225 """
226 logger.info(f"Metrics cleanup loop started (interval={self.cleanup_interval_hours}h)")
228 # Calculate interval in seconds
229 interval_seconds = self.cleanup_interval_hours * 3600
231 while not self._shutdown_event.is_set():
232 try:
233 # Wait for interval or shutdown
234 try:
235 await asyncio.wait_for(
236 self._shutdown_event.wait(),
237 timeout=interval_seconds,
238 )
239 # Shutdown signaled
240 break
241 except asyncio.TimeoutError:
242 # Normal timeout, proceed to cleanup
243 pass
245 # Run cleanup
246 summary = await self.cleanup_all()
247 self._cleanup_runs += 1
248 self._total_cleaned += summary.total_deleted
250 if summary.total_deleted > 0: 250 ↛ 231line 250 didn't jump to line 231 because the condition on line 250 was always true
251 logger.info(f"Metrics cleanup #{self._cleanup_runs}: deleted {summary.total_deleted} records " f"in {summary.duration_seconds:.2f}s")
253 except asyncio.CancelledError:
254 logger.debug("Cleanup loop cancelled")
255 raise
256 except Exception as e:
257 logger.error(f"Error in metrics cleanup loop: {e}", exc_info=True)
258 # Continue the loop despite errors
259 await asyncio.sleep(60)
261 async def cleanup_all(
262 self,
263 retention_days: Optional[int] = None,
264 include_rollup: bool = True,
265 ) -> CleanupSummary:
266 """Clean up all old metrics across all tables.
268 Args:
269 retention_days: Override retention period (optional). Use 0 to delete all.
270 include_rollup: Also clean up old rollup tables
272 Returns:
273 CleanupSummary: Summary of cleanup operations
274 """
275 started_at = datetime.now(timezone.utc)
276 start_time = time.monotonic()
278 # Use provided retention_days if set (including 0), otherwise use default
279 retention = retention_days if retention_days is not None else self.retention_days
280 # For retention=0 (delete all), use current time without hour-alignment to delete everything
281 # Otherwise, hour-align cutoff to match query service's get_retention_cutoff() and prevent gaps
282 if retention == 0:
283 cutoff = datetime.now(timezone.utc) + timedelta(hours=1) # Future time ensures all data is deleted
284 else:
285 cutoff = (datetime.now(timezone.utc) - timedelta(days=retention)).replace(minute=0, second=0, microsecond=0)
287 results: Dict[str, CleanupResult] = {}
288 total_deleted = 0
290 # Clean up raw metrics tables
291 for table_name, model_class, _, _ in self.METRIC_TABLES:
292 result = await asyncio.to_thread(
293 self._cleanup_table,
294 model_class,
295 table_name,
296 cutoff,
297 )
298 results[table_name] = result
299 total_deleted += result.deleted_count
301 # Clean up rollup tables if enabled
302 if include_rollup: 302 ↛ 321line 302 didn't jump to line 321 because the condition on line 302 was always true
303 # If retention_days is explicitly 0 (delete all), also delete all rollups
304 # Otherwise use the rollup retention period
305 if retention_days is not None and retention_days == 0:
306 rollup_cutoff = datetime.now(timezone.utc) # Delete all rollups too
307 else:
308 rollup_cutoff = datetime.now(timezone.utc) - timedelta(days=self.rollup_retention_days)
309 for table_name, _, hourly_model_class, _ in self.METRIC_TABLES:
310 hourly_table_name = f"{table_name}_hourly"
311 result = await asyncio.to_thread(
312 self._cleanup_table,
313 hourly_model_class,
314 hourly_table_name,
315 rollup_cutoff,
316 timestamp_column="hour_start",
317 )
318 results[hourly_table_name] = result
319 total_deleted += result.deleted_count
321 duration = time.monotonic() - start_time
322 completed_at = datetime.now(timezone.utc)
324 return CleanupSummary(
325 total_deleted=total_deleted,
326 tables=results,
327 duration_seconds=duration,
328 started_at=started_at,
329 completed_at=completed_at,
330 )
332 def _cleanup_table(
333 self,
334 model_class,
335 table_name: str,
336 cutoff: datetime,
337 timestamp_column: str = "timestamp",
338 ) -> CleanupResult:
339 """Clean up old records from a single table using batched deletion.
341 Args:
342 model_class: SQLAlchemy model class
343 table_name: Table name for logging
344 cutoff: Delete records older than this timestamp
345 timestamp_column: Name of the timestamp column
347 Returns:
348 CleanupResult: Result of the cleanup operation
349 """
350 start_time = time.monotonic()
351 total_deleted = 0
352 error_msg = None
354 try:
355 with fresh_db_session() as db:
356 timestamp_col = getattr(model_class, timestamp_column)
358 # Delete in batches to prevent long locks
359 while True:
360 # Get batch of IDs to delete
361 stmt = select(model_class.id).where(timestamp_col < cutoff).limit(self.batch_size)
362 ids_to_delete = [row[0] for row in db.execute(stmt).fetchall()]
364 if not ids_to_delete:
365 break
367 # Delete batch
368 delete_stmt = delete(model_class).where(model_class.id.in_(ids_to_delete))
369 result = db.execute(delete_stmt)
370 db.commit()
372 batch_deleted = result.rowcount
373 total_deleted += batch_deleted
375 logger.debug(f"Cleaned {batch_deleted} records from {table_name}")
377 # If we deleted less than batch size, we're done
378 if batch_deleted < self.batch_size: 378 ↛ 359line 378 didn't jump to line 359 because the condition on line 378 was always true
379 break
381 # Get remaining count
382 remaining_count = db.execute(select(func.count()).select_from(model_class)).scalar() or 0 # pylint: disable=not-callable
384 except Exception as e:
385 logger.error(f"Error cleaning up {table_name}: {e}")
386 error_msg = str(e)
387 remaining_count = -1
389 duration = time.monotonic() - start_time
391 if total_deleted > 0:
392 logger.info(f"Cleaned {total_deleted} records from {table_name} (cutoff: {cutoff})")
394 return CleanupResult(
395 table_name=table_name,
396 deleted_count=total_deleted,
397 remaining_count=remaining_count,
398 cutoff_date=cutoff,
399 duration_seconds=duration,
400 error=error_msg,
401 )
403 async def cleanup_table(
404 self,
405 table_type: str,
406 retention_days: Optional[int] = None,
407 ) -> CleanupResult:
408 """Clean up old records from a specific table.
410 Args:
411 table_type: One of 'tool', 'resource', 'prompt', 'server', 'a2a_agent'
412 retention_days: Override retention period (optional). Use 0 to delete all.
414 Returns:
415 CleanupResult: Result of the cleanup operation
417 Raises:
418 ValueError: If table_type is not recognized
419 """
420 table_map = {
421 "tool": ("tool_metrics", ToolMetric),
422 "resource": ("resource_metrics", ResourceMetric),
423 "prompt": ("prompt_metrics", PromptMetric),
424 "server": ("server_metrics", ServerMetric),
425 "a2a_agent": ("a2a_agent_metrics", A2AAgentMetric),
426 }
428 if table_type not in table_map:
429 raise ValueError(f"Unknown table type: {table_type}. Must be one of: {list(table_map.keys())}")
431 table_name, model_class = table_map[table_type]
432 # Use provided retention_days if set (including 0), otherwise use default
433 retention = retention_days if retention_days is not None else self.retention_days
434 # For retention=0 (delete all), use current time without hour-alignment to delete everything
435 # Otherwise, hour-align cutoff to match query service's get_retention_cutoff() and prevent gaps
436 if retention == 0:
437 cutoff = datetime.now(timezone.utc) + timedelta(hours=1) # Future time ensures all data is deleted
438 else:
439 cutoff = (datetime.now(timezone.utc) - timedelta(days=retention)).replace(minute=0, second=0, microsecond=0)
441 return await asyncio.to_thread(
442 self._cleanup_table,
443 model_class,
444 table_name,
445 cutoff,
446 )
448 def get_stats(self) -> dict:
449 """Get cleanup service statistics.
451 Returns:
452 dict: Cleanup statistics
453 """
454 return {
455 "enabled": self.enabled,
456 "retention_days": self.retention_days,
457 "rollup_retention_days": self.rollup_retention_days,
458 "batch_size": self.batch_size,
459 "cleanup_interval_hours": self.cleanup_interval_hours,
460 "total_cleaned": self._total_cleaned,
461 "cleanup_runs": self._cleanup_runs,
462 "delete_raw_after_rollup": self.delete_raw_after_rollup,
463 "delete_raw_after_rollup_hours": self.delete_raw_after_rollup_hours,
464 }
466 async def get_table_sizes(self) -> Dict[str, int]:
467 """Get the current size of all metrics tables.
469 Returns:
470 Dict[str, int]: Table name to row count mapping
471 """
472 sizes = {}
474 def _get_sizes() -> Dict[str, int]:
475 with fresh_db_session() as db:
476 for table_name, model_class, hourly_class, _ in self.METRIC_TABLES:
477 # pylint: disable=not-callable
478 sizes[table_name] = db.execute(select(func.count()).select_from(model_class)).scalar() or 0
479 hourly_name = f"{table_name}_hourly"
480 sizes[hourly_name] = db.execute(select(func.count()).select_from(hourly_class)).scalar() or 0
481 return sizes
483 return await asyncio.to_thread(_get_sizes)
486# Singleton instance
487_metrics_cleanup_service: Optional[MetricsCleanupService] = None
490def get_metrics_cleanup_service() -> MetricsCleanupService:
491 """Get or create the singleton MetricsCleanupService instance.
493 Returns:
494 MetricsCleanupService: The singleton cleanup service instance
495 """
496 global _metrics_cleanup_service # pylint: disable=global-statement
497 if _metrics_cleanup_service is None:
498 _metrics_cleanup_service = MetricsCleanupService()
499 return _metrics_cleanup_service