Coverage for mcpgateway / services / metrics_rollup_service.py: 98%
345 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Metrics Rollup Service for hourly aggregation of raw metrics.
4This service provides automatic rollup of raw metrics into hourly summaries,
5enabling efficient historical queries without scanning millions of raw records.
7Features:
8- Hourly aggregation with percentile calculation
9- Upsert logic to handle re-runs safely
10- Background task for periodic rollup
11- Optional deletion of raw metrics after rollup
12- PostgreSQL and SQLite support
14Copyright 2025
15SPDX-License-Identifier: Apache-2.0
16"""
18# Standard
19import asyncio
20from contextlib import contextmanager
21from dataclasses import dataclass
22from datetime import datetime, timedelta, timezone
23import logging
24import threading
25import time
26from typing import Any, Dict, List, Optional, Tuple, Type
28# Third-Party
29from sqlalchemy import and_, case, delete, func, select
30from sqlalchemy.dialects.postgresql import insert as pg_insert
31from sqlalchemy.dialects.sqlite import insert as sqlite_insert
32from sqlalchemy.exc import IntegrityError, SQLAlchemyError
33from sqlalchemy.orm import Session
35# First-Party
36from mcpgateway.config import settings
37from mcpgateway.db import (
38 A2AAgent,
39 A2AAgentMetric,
40 A2AAgentMetricsHourly,
41 fresh_db_session,
42 Prompt,
43 PromptMetric,
44 PromptMetricsHourly,
45 Resource,
46 ResourceMetric,
47 ResourceMetricsHourly,
48 Server,
49 ServerMetric,
50 ServerMetricsHourly,
51 Tool,
52 ToolMetric,
53 ToolMetricsHourly,
54)
56logger = logging.getLogger(__name__)
59@dataclass
60class RollupResult:
61 """Result of a rollup operation for a single table."""
63 table_name: str
64 hours_processed: int
65 records_aggregated: int
66 rollups_created: int
67 rollups_updated: int
68 raw_deleted: int
69 duration_seconds: float
70 error: Optional[str] = None
73@dataclass
74class RollupSummary:
75 """Summary of all rollup operations."""
77 total_hours_processed: int
78 total_records_aggregated: int
79 total_rollups_created: int
80 total_rollups_updated: int
81 tables: Dict[str, RollupResult]
82 duration_seconds: float
83 started_at: datetime
84 completed_at: datetime
87@dataclass
88class HourlyAggregation:
89 """Aggregated metrics for a single hour."""
91 entity_id: str
92 entity_name: str
93 hour_start: datetime
94 total_count: int
95 success_count: int
96 failure_count: int
97 min_response_time: Optional[float]
98 max_response_time: Optional[float]
99 avg_response_time: Optional[float]
100 p50_response_time: Optional[float]
101 p95_response_time: Optional[float]
102 p99_response_time: Optional[float]
103 interaction_type: Optional[str] = None # For A2A agents
106class MetricsRollupService:
107 """Service for rolling up raw metrics into hourly summaries.
109 This service provides:
110 - Hourly aggregation of raw metrics into summary tables
111 - Percentile calculation (p50, p95, p99)
112 - Upsert logic to handle re-runs safely
113 - Optional deletion of raw metrics after rollup
114 - Background task for periodic rollup
116 Configuration (via environment variables):
117 - METRICS_ROLLUP_ENABLED: Enable automatic rollup (default: True)
118 - METRICS_ROLLUP_INTERVAL_HOURS: Hours between rollup runs (default: 1)
119 - METRICS_DELETE_RAW_AFTER_ROLLUP: Delete raw after rollup (default: True)
120 - METRICS_DELETE_RAW_AFTER_ROLLUP_HOURS: Hours after which to delete if rollup exists (default: 1)
121 """
123 # Table configuration: (name, raw_model, hourly_model, entity_model, entity_id_col, entity_name_col)
124 METRIC_TABLES = [
125 ("tool_metrics", ToolMetric, ToolMetricsHourly, Tool, "tool_id", "name"),
126 ("resource_metrics", ResourceMetric, ResourceMetricsHourly, Resource, "resource_id", "name"),
127 ("prompt_metrics", PromptMetric, PromptMetricsHourly, Prompt, "prompt_id", "name"),
128 ("server_metrics", ServerMetric, ServerMetricsHourly, Server, "server_id", "name"),
129 ("a2a_agent_metrics", A2AAgentMetric, A2AAgentMetricsHourly, A2AAgent, "a2a_agent_id", "name"),
130 ]
132 def __init__(
133 self,
134 rollup_interval_hours: Optional[int] = None,
135 enabled: Optional[bool] = None,
136 delete_raw_after_rollup: Optional[bool] = None,
137 delete_raw_after_rollup_hours: Optional[int] = None,
138 ):
139 """Initialize the metrics rollup service.
141 Args:
142 rollup_interval_hours: Hours between rollup runs (default: from settings or 1)
143 enabled: Whether rollup is enabled (default: from settings or True)
144 delete_raw_after_rollup: Delete raw metrics after rollup (default: from settings or True)
145 delete_raw_after_rollup_hours: Hours after which to delete raw if rollup exists (default: from settings or 1)
146 """
147 self.rollup_interval_hours = rollup_interval_hours or getattr(settings, "metrics_rollup_interval_hours", 1)
148 self.enabled = enabled if enabled is not None else getattr(settings, "metrics_rollup_enabled", True)
149 self.delete_raw_after_rollup = delete_raw_after_rollup if delete_raw_after_rollup is not None else getattr(settings, "metrics_delete_raw_after_rollup", True)
150 self.delete_raw_after_rollup_hours = delete_raw_after_rollup_hours or getattr(settings, "metrics_delete_raw_after_rollup_hours", 1)
152 # Check if using PostgreSQL
153 self._is_postgresql = settings.database_url.startswith("postgresql")
155 # Background task
156 self._rollup_task: Optional[asyncio.Task] = None
157 self._shutdown_event = asyncio.Event()
158 self._pause_event = asyncio.Event()
159 self._pause_lock = threading.Lock()
160 self._pause_count = 0
161 self._pause_reason: Optional[str] = None
163 # Stats
164 self._total_rollups = 0
165 self._rollup_runs = 0
167 logger.info(
168 f"MetricsRollupService initialized: enabled={self.enabled}, "
169 f"interval_hours={self.rollup_interval_hours}, "
170 f"delete_raw={self.delete_raw_after_rollup}, "
171 f"postgresql={self._is_postgresql}"
172 )
174 def pause(self, reason: str = "maintenance") -> None:
175 """Pause background rollup execution.
177 Args:
178 reason: Reason for pausing the rollup task.
179 """
180 with self._pause_lock:
181 self._pause_count += 1
182 self._pause_reason = reason
183 self._pause_event.set()
185 def resume(self) -> None:
186 """Resume background rollup execution."""
187 with self._pause_lock:
188 if self._pause_count > 0: 188 ↛ 190line 188 didn't jump to line 190 because the condition on line 188 was always true
189 self._pause_count -= 1
190 if self._pause_count <= 0:
191 self._pause_count = 0
192 self._pause_reason = None
193 self._pause_event.clear()
195 @contextmanager
196 def pause_during(self, reason: str = "maintenance"):
197 """Pause rollups for the duration of the context manager.
199 Args:
200 reason: Reason for pausing the rollup task.
202 Yields:
203 None
204 """
205 self.pause(reason)
206 try:
207 yield
208 finally:
209 self.resume()
211 async def start(self) -> None:
212 """Start the background rollup task."""
213 if not self.enabled:
214 logger.info("MetricsRollupService disabled, skipping start")
215 return
217 if self._rollup_task is None or self._rollup_task.done(): 217 ↛ exitline 217 didn't return from function 'start' because the condition on line 217 was always true
218 self._shutdown_event.clear()
219 self._rollup_task = asyncio.create_task(self._rollup_loop())
220 logger.info("MetricsRollupService background task started")
222 async def shutdown(self) -> None:
223 """Shutdown the rollup service."""
224 logger.info("MetricsRollupService shutting down...")
226 # Signal shutdown
227 self._shutdown_event.set()
229 # Cancel the rollup task
230 if self._rollup_task: 230 ↛ 237line 230 didn't jump to line 237 because the condition on line 230 was always true
231 self._rollup_task.cancel()
232 try:
233 await self._rollup_task
234 except asyncio.CancelledError:
235 pass
237 logger.info(f"MetricsRollupService shutdown complete: " f"total_rollups={self._total_rollups}, rollup_runs={self._rollup_runs}")
239 async def _rollup_loop(self) -> None:
240 """Background task that periodically rolls up metrics.
242 Includes smart backfill detection: if the service has been down for more
243 than 24 hours, it will automatically detect the gap and roll up all
244 unprocessed hours up to the configured maximum (retention period).
246 Raises:
247 asyncio.CancelledError: When the task is cancelled during shutdown.
248 """
249 logger.info(f"Metrics rollup loop started (interval={self.rollup_interval_hours}h)")
251 # Calculate interval in seconds
252 interval_seconds = self.rollup_interval_hours * 3600
253 # On first run, do a backfill check
254 first_run = True
256 while not self._shutdown_event.is_set():
257 try:
258 # Wait for interval or shutdown (skip wait on first run for immediate backfill)
259 if not first_run:
260 try:
261 await asyncio.wait_for(
262 self._shutdown_event.wait(),
263 timeout=interval_seconds,
264 )
265 # Shutdown signaled
266 break
267 except asyncio.TimeoutError:
268 # Normal timeout, proceed to rollup
269 pass
271 if self._pause_event.is_set():
272 logger.info(f"Metrics rollup paused ({self._pause_reason or 'maintenance'}), skipping this cycle")
273 try:
274 await asyncio.wait_for(self._shutdown_event.wait(), timeout=5)
275 except asyncio.TimeoutError:
276 pass
277 continue
279 # Determine hours_back based on whether this is first run or normal run
280 if first_run:
281 # On first run, detect backfill gap (may scan entire retention period)
282 hours_back = await asyncio.to_thread(self._detect_backfill_hours)
283 if hours_back > 24:
284 logger.info(f"Backfill detected: rolling up {hours_back} hours of unprocessed metrics")
285 first_run = False
286 else:
287 # Normal runs: only process recent hours to catch late-arriving data
288 # Configurable via METRICS_ROLLUP_LATE_DATA_HOURS (default: 1 hour)
289 # This avoids walking through entire retention period every interval
290 hours_back = getattr(settings, "metrics_rollup_late_data_hours", 1)
292 # Run rollup for the calculated time range
293 summary = await self.rollup_all(hours_back=hours_back)
294 self._rollup_runs += 1
295 self._total_rollups += summary.total_rollups_created
297 if summary.total_rollups_created > 0 or summary.total_rollups_updated > 0:
298 logger.info(
299 f"Metrics rollup #{self._rollup_runs}: created {summary.total_rollups_created}, "
300 f"updated {summary.total_rollups_updated} rollups "
301 f"from {summary.total_records_aggregated} records in {summary.duration_seconds:.2f}s"
302 )
304 except asyncio.CancelledError:
305 logger.debug("Rollup loop cancelled")
306 raise
307 except Exception as e:
308 logger.error(f"Error in metrics rollup loop: {e}", exc_info=True)
309 # Continue the loop despite errors
310 await asyncio.sleep(60)
312 def _detect_backfill_hours(self) -> int:
313 """Detect how many hours back we need to roll up.
315 Checks for the earliest unprocessed raw metric and calculates how many
316 hours of backfill are needed. Returns a minimum of 24 hours and caps at
317 the retention period to avoid excessive processing.
319 Returns:
320 int: Number of hours to roll up (minimum 24, maximum retention days * 24)
321 """
322 retention_days = getattr(settings, "metrics_retention_days", 30)
323 max_hours = retention_days * 24 # Cap at retention period
325 try:
326 with fresh_db_session() as db:
327 # Find the earliest raw metric timestamp across all tables
328 earliest_raw = None
330 for _, raw_model, _hourly_model, _, _, _ in self.METRIC_TABLES:
331 # Get earliest unprocessed raw metric (where no rollup exists for that hour)
332 result = db.execute(select(func.min(raw_model.timestamp))).scalar()
334 if result and (earliest_raw is None or result < earliest_raw):
335 earliest_raw = result
337 if earliest_raw is None:
338 # No raw metrics, use default
339 return 24
341 # Calculate hours since earliest raw metric
342 now = datetime.now(timezone.utc)
343 hours_since_earliest = int((now - earliest_raw).total_seconds() / 3600) + 1
345 # Clamp between 24 and max_hours
346 return max(24, min(hours_since_earliest, max_hours))
348 except Exception as e:
349 logger.warning(f"Error detecting backfill hours: {e}, using default 24")
350 return 24
352 async def rollup_all(
353 self,
354 hours_back: int = 24,
355 force_reprocess: bool = False,
356 ) -> RollupSummary:
357 """Roll up all metrics tables for the specified time range.
359 Args:
360 hours_back: How many hours back to look for unprocessed metrics (default: 24)
361 force_reprocess: Reprocess even if rollup already exists (default: False)
363 Returns:
364 RollupSummary: Summary of rollup operations
365 """
366 started_at = datetime.now(timezone.utc)
367 start_time = time.monotonic()
369 # Calculate time range (process completed hours only, not current hour)
370 now = datetime.now(timezone.utc)
371 # Round down to start of current hour
372 current_hour_start = now.replace(minute=0, second=0, microsecond=0)
373 # Go back hours_back from the start of current hour
374 start_hour = current_hour_start - timedelta(hours=hours_back)
375 results: Dict[str, RollupResult] = {}
376 total_hours = 0
377 total_records = 0
378 total_created = 0
379 total_updated = 0
381 for table_name, raw_model, hourly_model, entity_model, entity_id_col, entity_name_col in self.METRIC_TABLES:
382 result = await asyncio.to_thread(
383 self._rollup_table,
384 table_name,
385 raw_model,
386 hourly_model,
387 entity_model,
388 entity_id_col,
389 entity_name_col,
390 start_hour,
391 current_hour_start,
392 force_reprocess,
393 )
394 results[table_name] = result
395 total_hours += result.hours_processed
396 total_records += result.records_aggregated
397 total_created += result.rollups_created
398 total_updated += result.rollups_updated
400 duration = time.monotonic() - start_time
401 completed_at = datetime.now(timezone.utc)
403 return RollupSummary(
404 total_hours_processed=total_hours,
405 total_records_aggregated=total_records,
406 total_rollups_created=total_created,
407 total_rollups_updated=total_updated,
408 tables=results,
409 duration_seconds=duration,
410 started_at=started_at,
411 completed_at=completed_at,
412 )
414 def _rollup_table(
415 self,
416 table_name: str,
417 raw_model: Type,
418 hourly_model: Type,
419 entity_model: Type,
420 entity_id_col: str,
421 entity_name_col: str,
422 start_hour: datetime,
423 end_hour: datetime,
424 force_reprocess: bool, # pylint: disable=unused-argument
425 ) -> RollupResult:
426 """Roll up metrics for a single table.
428 Note: As of the late-data fix, rollup always re-aggregates when raw data exists,
429 regardless of whether a rollup already exists. This ensures late-arriving metrics
430 are properly included. The force_reprocess parameter is kept for API compatibility.
432 Args:
433 table_name: Name of the table being processed
434 raw_model: SQLAlchemy model for raw metrics
435 hourly_model: SQLAlchemy model for hourly rollups
436 entity_model: SQLAlchemy model for the entity (Tool, Resource, etc.)
437 entity_id_col: Name of the entity ID column in raw model
438 entity_name_col: Name of the entity name column in entity model
439 start_hour: Start of time range
440 end_hour: End of time range (exclusive)
441 force_reprocess: Kept for API compatibility (behavior now always re-processes)
443 Returns:
444 RollupResult: Result of the rollup operation
445 """
446 start_time = time.monotonic()
447 hours_processed = 0
448 records_aggregated = 0
449 rollups_created = 0
450 rollups_updated = 0
451 raw_deleted = 0
452 error_msg = None
454 is_a2a = table_name == "a2a_agent_metrics"
456 try:
457 with fresh_db_session() as db:
458 # Process each hour in the range
459 current = start_hour
460 while current < end_hour:
461 hour_end = current + timedelta(hours=1)
463 # Check if we have raw metrics for this hour
464 # pylint: disable=not-callable
465 raw_count = (
466 db.execute(
467 select(func.count())
468 .select_from(raw_model)
469 .where(
470 and_(
471 raw_model.timestamp >= current,
472 raw_model.timestamp < hour_end,
473 )
474 )
475 ).scalar()
476 or 0
477 )
479 if raw_count > 0: 479 ↛ 518line 479 didn't jump to line 518 because the condition on line 479 was always true
480 # Always re-aggregate when there's raw data, even if rollup exists.
481 # This ensures late-arriving metrics (buffer flush, ingestion lag) are included.
482 # The _aggregate_hour queries ALL raw data for the hour, and _upsert_rollup
483 # handles updating existing rollups correctly.
485 # Aggregate metrics for this hour
486 aggregations = self._aggregate_hour(
487 db,
488 raw_model,
489 entity_model,
490 entity_id_col,
491 entity_name_col,
492 current,
493 hour_end,
494 is_a2a,
495 )
496 # Upsert rollups
497 for agg in aggregations:
498 created, updated = self._upsert_rollup(
499 db,
500 hourly_model,
501 entity_id_col,
502 agg,
503 is_a2a,
504 )
505 rollups_created += created
506 rollups_updated += updated
507 records_aggregated += agg.total_count
509 hours_processed += 1
511 # Delete raw metrics if configured
512 if self.delete_raw_after_rollup: 512 ↛ 518line 512 didn't jump to line 518 because the condition on line 512 was always true
513 delete_cutoff = datetime.now(timezone.utc) - timedelta(hours=self.delete_raw_after_rollup_hours)
514 if hour_end < delete_cutoff: 514 ↛ 518line 514 didn't jump to line 518 because the condition on line 514 was always true
515 deleted = self._delete_raw_metrics(db, raw_model, current, hour_end)
516 raw_deleted += deleted
518 current = hour_end
520 db.commit()
522 except Exception as e:
523 logger.error(f"Error rolling up {table_name}: {e}", exc_info=True)
524 error_msg = str(e)
526 duration = time.monotonic() - start_time
528 if rollups_created + rollups_updated > 0:
529 logger.debug(f"Rolled up {table_name}: {records_aggregated} records -> " f"{rollups_created} new, {rollups_updated} updated rollups")
531 return RollupResult(
532 table_name=table_name,
533 hours_processed=hours_processed,
534 records_aggregated=records_aggregated,
535 rollups_created=rollups_created,
536 rollups_updated=rollups_updated,
537 raw_deleted=raw_deleted,
538 duration_seconds=duration,
539 error=error_msg,
540 )
542 def _aggregate_hour(
543 self,
544 db: Session,
545 raw_model: Type,
546 entity_model: Type,
547 entity_id_col: str,
548 entity_name_col: str,
549 hour_start: datetime,
550 hour_end: datetime,
551 is_a2a: bool,
552 ) -> List[HourlyAggregation]:
553 """Aggregate raw metrics for a single hour using optimized bulk queries.
555 Uses a single GROUP BY query to get basic aggregations (count, min, max, avg,
556 success count) for all entities at once, minimizing database round trips.
557 Percentiles are calculated by loading response times in a single bulk query.
559 Args:
560 db: Database session
561 raw_model: SQLAlchemy model for raw metrics
562 entity_model: SQLAlchemy model for the entity
563 entity_id_col: Name of the entity ID column
564 entity_name_col: Name of the entity name column
565 hour_start: Start of the hour
566 hour_end: End of the hour
567 is_a2a: Whether this is A2A agent metrics (has interaction_type)
569 Returns:
570 List[HourlyAggregation]: Aggregated metrics for each entity
572 Raises:
573 Exception: If aggregation fails due to a database query or processing error.
574 """
575 try:
576 entity_id_attr = getattr(raw_model, entity_id_col)
577 entity_name_attr = getattr(entity_model, entity_name_col)
579 time_filter = and_(
580 raw_model.timestamp >= hour_start,
581 raw_model.timestamp < hour_end,
582 )
584 aggregations: list = []
585 if self._is_postgresql and settings.use_postgresdb_percentiles:
586 # ---- build SELECT and GROUP BY dynamically (CRITICAL FIX) ----
587 select_cols = [
588 entity_id_attr.label("entity_id"),
589 func.coalesce(entity_name_attr, "unknown").label("entity_name"),
590 ]
591 group_by_cols = [
592 entity_id_attr,
593 entity_name_attr,
594 ]
596 if is_a2a:
597 select_cols.append(raw_model.interaction_type.label("interaction_type"))
598 group_by_cols.append(raw_model.interaction_type)
600 # pylint: disable=not-callable
601 agg_query = (
602 select(
603 *select_cols,
604 func.count(raw_model.id).label("total_count"),
605 func.sum(case((raw_model.is_success.is_(True), 1), else_=0)).label("success_count"),
606 func.min(raw_model.response_time).label("min_rt"),
607 func.max(raw_model.response_time).label("max_rt"),
608 func.avg(raw_model.response_time).label("avg_rt"),
609 func.percentile_cont(0.50).within_group(raw_model.response_time).label("p50_rt"),
610 func.percentile_cont(0.95).within_group(raw_model.response_time).label("p95_rt"),
611 func.percentile_cont(0.99).within_group(raw_model.response_time).label("p99_rt"),
612 )
613 .select_from(raw_model)
614 .join(entity_model, entity_model.id == entity_id_attr, isouter=True)
615 .where(time_filter)
616 .group_by(*group_by_cols)
617 )
618 # pylint: enable=not-callable
619 for row in db.execute(agg_query).yield_per(settings.yield_batch_size):
620 aggregations.append(
621 HourlyAggregation(
622 entity_id=row.entity_id,
623 entity_name=row.entity_name,
624 hour_start=hour_start,
625 total_count=row.total_count,
626 success_count=row.success_count,
627 failure_count=row.total_count - row.success_count,
628 min_response_time=row.min_rt,
629 max_response_time=row.max_rt,
630 avg_response_time=row.avg_rt,
631 p50_response_time=row.p50_rt,
632 p95_response_time=row.p95_rt,
633 p99_response_time=row.p99_rt,
634 interaction_type=row.interaction_type if is_a2a else None,
635 )
636 )
637 else:
638 # Build group by columns
639 if is_a2a:
640 group_cols = [entity_id_attr, raw_model.interaction_type]
641 else:
642 group_cols = [entity_id_attr]
644 # Time filter for this hour
645 time_filter = and_(
646 raw_model.timestamp >= hour_start,
647 raw_model.timestamp < hour_end,
648 )
650 # OPTIMIZED: Single bulk query for basic aggregations per entity
651 # pylint: disable=not-callable
652 agg_query = (
653 select(
654 *group_cols,
655 func.count(raw_model.id).label("total_count"),
656 func.sum(case((raw_model.is_success.is_(True), 1), else_=0)).label("success_count"),
657 func.min(raw_model.response_time).label("min_rt"),
658 func.max(raw_model.response_time).label("max_rt"),
659 func.avg(raw_model.response_time).label("avg_rt"),
660 )
661 .where(time_filter)
662 .group_by(*group_cols)
663 )
665 # Store aggregation results by entity key
666 agg_results = {}
667 for row in db.execute(agg_query).yield_per(settings.yield_batch_size):
668 entity_id = row[0]
669 interaction_type = row[1] if is_a2a else None
670 key = (entity_id, interaction_type) if is_a2a else entity_id
672 agg_results[key] = {
673 "entity_id": entity_id,
674 "interaction_type": interaction_type,
675 "total_count": row.total_count or 0,
676 "success_count": row.success_count or 0,
677 "min_rt": row.min_rt,
678 "max_rt": row.max_rt,
679 "avg_rt": row.avg_rt,
680 }
682 if not agg_results:
683 return []
685 # OPTIMIZED: Bulk load entity names in one query
686 entity_ids = list(set(r["entity_id"] for r in agg_results.values()))
687 entity_names = {}
688 if entity_ids: 688 ↛ 694line 688 didn't jump to line 694 because the condition on line 688 was always true
689 entities = db.execute(select(entity_model.id, getattr(entity_model, entity_name_col)).where(entity_model.id.in_(entity_ids))) # .fetchall()
690 entity_names = {e[0]: e[1] for e in entities}
692 # OPTIMIZED: Bulk load all response times for percentile calculation
693 # Load all response times for the hour in one query, grouped by entity
694 rt_query = (
695 select(
696 *group_cols,
697 raw_model.response_time,
698 )
699 .where(time_filter)
700 .order_by(*group_cols, raw_model.response_time)
701 )
703 # Group response times by entity
704 response_times_by_entity: Dict[Any, List[float]] = {}
705 for row in db.execute(rt_query).yield_per(settings.yield_batch_size):
706 entity_id = row[0]
707 interaction_type = row[1] if is_a2a else None
708 key = (entity_id, interaction_type) if is_a2a else entity_id
709 rt = row.response_time if not is_a2a else row[2]
711 if key not in response_times_by_entity:
712 response_times_by_entity[key] = []
713 if rt is not None: 713 ↛ 705line 713 didn't jump to line 705 because the condition on line 713 was always true
714 response_times_by_entity[key].append(rt)
716 # Build aggregation results with percentiles
717 aggregations = []
718 for key, agg in agg_results.items():
719 entity_id = agg["entity_id"]
720 interaction_type = agg["interaction_type"]
722 # Get entity name
723 entity_name = entity_names.get(entity_id, "unknown")
725 # Get response times for percentile calculation
726 response_times = response_times_by_entity.get(key, [])
728 # Calculate percentiles (response_times are already sorted from ORDER BY)
729 if response_times:
730 p50_rt = self._percentile(response_times, 50)
731 p95_rt = self._percentile(response_times, 95)
732 p99_rt = self._percentile(response_times, 99)
733 else:
734 p50_rt = p95_rt = p99_rt = None
736 aggregations.append(
737 HourlyAggregation(
738 entity_id=entity_id,
739 entity_name=entity_name,
740 hour_start=hour_start,
741 total_count=agg["total_count"],
742 success_count=agg["success_count"],
743 failure_count=agg["total_count"] - agg["success_count"],
744 min_response_time=agg["min_rt"],
745 max_response_time=agg["max_rt"],
746 avg_response_time=agg["avg_rt"],
747 p50_response_time=p50_rt,
748 p95_response_time=p95_rt,
749 p99_response_time=p99_rt,
750 interaction_type=interaction_type,
751 )
752 )
753 return aggregations
754 except Exception:
755 logger.exception(
756 "Failed to aggregate hourly metrics",
757 extra={
758 "hour_start": hour_start,
759 "hour_end": hour_end,
760 "raw_model": raw_model.__name__,
761 "entity_model": entity_model.__name__,
762 "is_a2a": is_a2a,
763 },
764 )
765 raise
767 def _percentile(self, sorted_data: List[float], percentile: int) -> float:
768 """Calculate percentile from sorted data.
770 Args:
771 sorted_data: Sorted list of values
772 percentile: Percentile to calculate (0-100)
774 Returns:
775 float: The percentile value
776 """
777 if not sorted_data:
778 return 0.0
780 k = (len(sorted_data) - 1) * percentile / 100
781 f = int(k)
782 c = f + 1 if f + 1 < len(sorted_data) else f
784 if f == c:
785 return sorted_data[f]
786 return sorted_data[f] + (k - f) * (sorted_data[c] - sorted_data[f])
788 def _upsert_rollup(
789 self,
790 db: Session,
791 hourly_model: Type,
792 entity_id_col: str,
793 agg: HourlyAggregation,
794 is_a2a: bool,
795 ) -> Tuple[int, int]:
796 """
797 Insert or update a single hourly rollup record using a DB-aware UPSERT.
798 This function is concurrency-safe for PostgreSQL and SQLite.
799 Falls back to Python SELECT+UPDATE/INSERT for unsupported DBs
801 This function is concurrency-safe and enforces uniqueness at the database level.
803 Args:
804 db (Session): Active SQLAlchemy database session.
805 hourly_model (Type): ORM model representing the hourly rollup table.
806 entity_id_col (str): Name of the entity ID column (e.g. "tool_id", "agent_id").
807 agg (HourlyAggregation): Aggregated hourly metrics for a single entity.
808 is_a2a (bool): Whether interaction_type should be included in the uniqueness key.
810 Returns:
811 Tuple[int, int]: Best-effort (inserted_count, updated_count) values for logging only.
813 Raises:
814 SQLAlchemyError: If the database UPSERT operation fails.
815 """
816 try:
817 # Resolve name column
818 name_col_map = {
819 "tool_id": "tool_name",
820 "resource_id": "resource_name",
821 "prompt_id": "prompt_name",
822 "server_id": "server_name",
823 }
824 name_col = name_col_map.get(entity_id_col, "agent_name")
826 # Normalizing
827 hour_start = agg.hour_start.replace(minute=0, second=0, microsecond=0)
829 values = {
830 entity_id_col: agg.entity_id,
831 name_col: agg.entity_name,
832 "hour_start": hour_start,
833 "total_count": agg.total_count,
834 "success_count": agg.success_count,
835 "failure_count": agg.failure_count,
836 "min_response_time": agg.min_response_time,
837 "max_response_time": agg.max_response_time,
838 "avg_response_time": agg.avg_response_time,
839 "p50_response_time": agg.p50_response_time,
840 "p95_response_time": agg.p95_response_time,
841 "p99_response_time": agg.p99_response_time,
842 }
844 if is_a2a:
845 values["interaction_type"] = agg.interaction_type
847 dialect = db.bind.dialect.name if db.bind else "unknown"
848 conflict_cols = [
849 getattr(hourly_model, entity_id_col),
850 hourly_model.hour_start,
851 ]
853 if is_a2a:
854 conflict_cols.append(hourly_model.interaction_type)
856 logger.debug(
857 "Upserting hourly rollup",
858 extra={
859 "dialect": dialect,
860 "entity_id_col": entity_id_col,
861 "entity_id": agg.entity_id,
862 "hour_start": hour_start.isoformat(),
863 "is_a2a": is_a2a,
864 },
865 )
867 if dialect == "postgresql":
868 # =======================
869 # PostgreSQL
870 # =======================
871 stmt = pg_insert(hourly_model).values(**values)
872 update_cols = {k: stmt.excluded[k] for k in values if k not in (entity_id_col, "hour_start", "interaction_type")}
873 stmt = stmt.on_conflict_do_update(
874 index_elements=conflict_cols,
875 set_=update_cols,
876 )
878 db.execute(stmt)
879 return (0, 1)
881 if "sqlite" in dialect:
882 # =======================
883 # SQLite
884 # =======================
885 stmt = sqlite_insert(hourly_model).values(**values)
887 update_cols = {k: stmt.excluded[k] for k in values if k not in (entity_id_col, "hour_start", "interaction_type")}
889 stmt = stmt.on_conflict_do_update(
890 index_elements=conflict_cols,
891 set_=update_cols,
892 )
894 db.execute(stmt)
895 return (0, 1)
897 logger.warning(
898 "Dialect does not support native UPSERT. Using Python fallback with conflict handling.",
899 extra={"dialect": dialect},
900 )
901 # Use savepoint to avoid rolling back the entire transaction on conflict
902 savepoint = db.begin_nested()
903 try:
904 db.add(hourly_model(**values))
905 db.flush() # Force INSERT now
906 savepoint.commit()
907 return (1, 0)
908 except IntegrityError:
909 savepoint.rollback() # Only roll back the savepoint, not the whole transaction
910 logger.info(
911 "Insert conflict detected in fallback path. Retrying as update.",
912 extra={
913 "entity_id_col": entity_id_col,
914 "entity_id": agg.entity_id,
915 "hour_start": hour_start.isoformat(),
916 "is_a2a": is_a2a,
917 },
918 )
920 entity_id_attr = getattr(hourly_model, entity_id_col)
922 filters = [
923 entity_id_attr == agg.entity_id,
924 hourly_model.hour_start == hour_start,
925 ]
927 if is_a2a:
928 filters.append(hourly_model.interaction_type == agg.interaction_type)
930 existing = db.execute(select(hourly_model).where(and_(*filters))).scalar_one()
932 for key, value in values.items():
933 if key not in (entity_id_col, "hour_start", "interaction_type"):
934 setattr(existing, key, value)
936 return (0, 1)
938 except SQLAlchemyError:
939 logger.exception(
940 "Failed to upsert hourly rollup",
941 extra={
942 "entity_id_col": entity_id_col,
943 "entity_id": agg.entity_id,
944 "hour_start": hour_start.isoformat(),
945 "is_a2a": is_a2a,
946 },
947 )
948 raise
950 def _delete_raw_metrics(
951 self,
952 db: Session,
953 raw_model: Type,
954 hour_start: datetime,
955 hour_end: datetime,
956 ) -> int:
957 """Delete raw metrics for a given hour after rollup.
959 Args:
960 db: Database session
961 raw_model: SQLAlchemy model for raw metrics
962 hour_start: Start of the hour
963 hour_end: End of the hour
965 Returns:
966 int: Number of records deleted
967 """
968 result = db.execute(
969 delete(raw_model).where(
970 and_(
971 raw_model.timestamp >= hour_start,
972 raw_model.timestamp < hour_end,
973 )
974 )
975 )
976 return result.rowcount
978 def get_stats(self) -> dict:
979 """Get rollup service statistics.
981 Returns:
982 dict: Rollup statistics
983 """
984 return {
985 "enabled": self.enabled,
986 "rollup_interval_hours": self.rollup_interval_hours,
987 "delete_raw_after_rollup": self.delete_raw_after_rollup,
988 "delete_raw_after_rollup_hours": self.delete_raw_after_rollup_hours,
989 "total_rollups": self._total_rollups,
990 "rollup_runs": self._rollup_runs,
991 "is_postgresql": self._is_postgresql,
992 }
995# Singleton instance
996_metrics_rollup_service: Optional[MetricsRollupService] = None
999def get_metrics_rollup_service() -> MetricsRollupService:
1000 """Get or create the singleton MetricsRollupService instance.
1002 Returns:
1003 MetricsRollupService: The singleton rollup service instance
1004 """
1005 global _metrics_rollup_service # pylint: disable=global-statement
1006 if _metrics_rollup_service is None:
1007 _metrics_rollup_service = MetricsRollupService()
1008 return _metrics_rollup_service
1011def get_metrics_rollup_service_if_initialized() -> Optional[MetricsRollupService]:
1012 """Return the rollup service instance if it has been created.
1014 Returns:
1015 The rollup service instance or None if not initialized.
1016 """
1017 return _metrics_rollup_service