Coverage for mcpgateway / services / metrics_rollup_service.py: 98%

345 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Metrics Rollup Service for hourly aggregation of raw metrics. 

3 

4This service provides automatic rollup of raw metrics into hourly summaries, 

5enabling efficient historical queries without scanning millions of raw records. 

6 

7Features: 

8- Hourly aggregation with percentile calculation 

9- Upsert logic to handle re-runs safely 

10- Background task for periodic rollup 

11- Optional deletion of raw metrics after rollup 

12- PostgreSQL and SQLite support 

13 

14Copyright 2025 

15SPDX-License-Identifier: Apache-2.0 

16""" 

17 

18# Standard 

19import asyncio 

20from contextlib import contextmanager 

21from dataclasses import dataclass 

22from datetime import datetime, timedelta, timezone 

23import logging 

24import threading 

25import time 

26from typing import Any, Dict, List, Optional, Tuple, Type 

27 

28# Third-Party 

29from sqlalchemy import and_, case, delete, func, select 

30from sqlalchemy.dialects.postgresql import insert as pg_insert 

31from sqlalchemy.dialects.sqlite import insert as sqlite_insert 

32from sqlalchemy.exc import IntegrityError, SQLAlchemyError 

33from sqlalchemy.orm import Session 

34 

35# First-Party 

36from mcpgateway.config import settings 

37from mcpgateway.db import ( 

38 A2AAgent, 

39 A2AAgentMetric, 

40 A2AAgentMetricsHourly, 

41 fresh_db_session, 

42 Prompt, 

43 PromptMetric, 

44 PromptMetricsHourly, 

45 Resource, 

46 ResourceMetric, 

47 ResourceMetricsHourly, 

48 Server, 

49 ServerMetric, 

50 ServerMetricsHourly, 

51 Tool, 

52 ToolMetric, 

53 ToolMetricsHourly, 

54) 

55 

56logger = logging.getLogger(__name__) 

57 

58 

59@dataclass 

60class RollupResult: 

61 """Result of a rollup operation for a single table.""" 

62 

63 table_name: str 

64 hours_processed: int 

65 records_aggregated: int 

66 rollups_created: int 

67 rollups_updated: int 

68 raw_deleted: int 

69 duration_seconds: float 

70 error: Optional[str] = None 

71 

72 

73@dataclass 

74class RollupSummary: 

75 """Summary of all rollup operations.""" 

76 

77 total_hours_processed: int 

78 total_records_aggregated: int 

79 total_rollups_created: int 

80 total_rollups_updated: int 

81 tables: Dict[str, RollupResult] 

82 duration_seconds: float 

83 started_at: datetime 

84 completed_at: datetime 

85 

86 

87@dataclass 

88class HourlyAggregation: 

89 """Aggregated metrics for a single hour.""" 

90 

91 entity_id: str 

92 entity_name: str 

93 hour_start: datetime 

94 total_count: int 

95 success_count: int 

96 failure_count: int 

97 min_response_time: Optional[float] 

98 max_response_time: Optional[float] 

99 avg_response_time: Optional[float] 

100 p50_response_time: Optional[float] 

101 p95_response_time: Optional[float] 

102 p99_response_time: Optional[float] 

103 interaction_type: Optional[str] = None # For A2A agents 

104 

105 

106class MetricsRollupService: 

107 """Service for rolling up raw metrics into hourly summaries. 

108 

109 This service provides: 

110 - Hourly aggregation of raw metrics into summary tables 

111 - Percentile calculation (p50, p95, p99) 

112 - Upsert logic to handle re-runs safely 

113 - Optional deletion of raw metrics after rollup 

114 - Background task for periodic rollup 

115 

116 Configuration (via environment variables): 

117 - METRICS_ROLLUP_ENABLED: Enable automatic rollup (default: True) 

118 - METRICS_ROLLUP_INTERVAL_HOURS: Hours between rollup runs (default: 1) 

119 - METRICS_DELETE_RAW_AFTER_ROLLUP: Delete raw after rollup (default: True) 

120 - METRICS_DELETE_RAW_AFTER_ROLLUP_HOURS: Hours after which to delete if rollup exists (default: 1) 

121 """ 

122 

123 # Table configuration: (name, raw_model, hourly_model, entity_model, entity_id_col, entity_name_col) 

124 METRIC_TABLES = [ 

125 ("tool_metrics", ToolMetric, ToolMetricsHourly, Tool, "tool_id", "name"), 

126 ("resource_metrics", ResourceMetric, ResourceMetricsHourly, Resource, "resource_id", "name"), 

127 ("prompt_metrics", PromptMetric, PromptMetricsHourly, Prompt, "prompt_id", "name"), 

128 ("server_metrics", ServerMetric, ServerMetricsHourly, Server, "server_id", "name"), 

129 ("a2a_agent_metrics", A2AAgentMetric, A2AAgentMetricsHourly, A2AAgent, "a2a_agent_id", "name"), 

130 ] 

131 

132 def __init__( 

133 self, 

134 rollup_interval_hours: Optional[int] = None, 

135 enabled: Optional[bool] = None, 

136 delete_raw_after_rollup: Optional[bool] = None, 

137 delete_raw_after_rollup_hours: Optional[int] = None, 

138 ): 

139 """Initialize the metrics rollup service. 

140 

141 Args: 

142 rollup_interval_hours: Hours between rollup runs (default: from settings or 1) 

143 enabled: Whether rollup is enabled (default: from settings or True) 

144 delete_raw_after_rollup: Delete raw metrics after rollup (default: from settings or True) 

145 delete_raw_after_rollup_hours: Hours after which to delete raw if rollup exists (default: from settings or 1) 

146 """ 

147 self.rollup_interval_hours = rollup_interval_hours or getattr(settings, "metrics_rollup_interval_hours", 1) 

148 self.enabled = enabled if enabled is not None else getattr(settings, "metrics_rollup_enabled", True) 

149 self.delete_raw_after_rollup = delete_raw_after_rollup if delete_raw_after_rollup is not None else getattr(settings, "metrics_delete_raw_after_rollup", True) 

150 self.delete_raw_after_rollup_hours = delete_raw_after_rollup_hours or getattr(settings, "metrics_delete_raw_after_rollup_hours", 1) 

151 

152 # Check if using PostgreSQL 

153 self._is_postgresql = settings.database_url.startswith("postgresql") 

154 

155 # Background task 

156 self._rollup_task: Optional[asyncio.Task] = None 

157 self._shutdown_event = asyncio.Event() 

158 self._pause_event = asyncio.Event() 

159 self._pause_lock = threading.Lock() 

160 self._pause_count = 0 

161 self._pause_reason: Optional[str] = None 

162 

163 # Stats 

164 self._total_rollups = 0 

165 self._rollup_runs = 0 

166 

167 logger.info( 

168 f"MetricsRollupService initialized: enabled={self.enabled}, " 

169 f"interval_hours={self.rollup_interval_hours}, " 

170 f"delete_raw={self.delete_raw_after_rollup}, " 

171 f"postgresql={self._is_postgresql}" 

172 ) 

173 

174 def pause(self, reason: str = "maintenance") -> None: 

175 """Pause background rollup execution. 

176 

177 Args: 

178 reason: Reason for pausing the rollup task. 

179 """ 

180 with self._pause_lock: 

181 self._pause_count += 1 

182 self._pause_reason = reason 

183 self._pause_event.set() 

184 

185 def resume(self) -> None: 

186 """Resume background rollup execution.""" 

187 with self._pause_lock: 

188 if self._pause_count > 0: 188 ↛ 190line 188 didn't jump to line 190 because the condition on line 188 was always true

189 self._pause_count -= 1 

190 if self._pause_count <= 0: 

191 self._pause_count = 0 

192 self._pause_reason = None 

193 self._pause_event.clear() 

194 

195 @contextmanager 

196 def pause_during(self, reason: str = "maintenance"): 

197 """Pause rollups for the duration of the context manager. 

198 

199 Args: 

200 reason: Reason for pausing the rollup task. 

201 

202 Yields: 

203 None 

204 """ 

205 self.pause(reason) 

206 try: 

207 yield 

208 finally: 

209 self.resume() 

210 

211 async def start(self) -> None: 

212 """Start the background rollup task.""" 

213 if not self.enabled: 

214 logger.info("MetricsRollupService disabled, skipping start") 

215 return 

216 

217 if self._rollup_task is None or self._rollup_task.done(): 217 ↛ exitline 217 didn't return from function 'start' because the condition on line 217 was always true

218 self._shutdown_event.clear() 

219 self._rollup_task = asyncio.create_task(self._rollup_loop()) 

220 logger.info("MetricsRollupService background task started") 

221 

222 async def shutdown(self) -> None: 

223 """Shutdown the rollup service.""" 

224 logger.info("MetricsRollupService shutting down...") 

225 

226 # Signal shutdown 

227 self._shutdown_event.set() 

228 

229 # Cancel the rollup task 

230 if self._rollup_task: 230 ↛ 237line 230 didn't jump to line 237 because the condition on line 230 was always true

231 self._rollup_task.cancel() 

232 try: 

233 await self._rollup_task 

234 except asyncio.CancelledError: 

235 pass 

236 

237 logger.info(f"MetricsRollupService shutdown complete: " f"total_rollups={self._total_rollups}, rollup_runs={self._rollup_runs}") 

238 

239 async def _rollup_loop(self) -> None: 

240 """Background task that periodically rolls up metrics. 

241 

242 Includes smart backfill detection: if the service has been down for more 

243 than 24 hours, it will automatically detect the gap and roll up all 

244 unprocessed hours up to the configured maximum (retention period). 

245 

246 Raises: 

247 asyncio.CancelledError: When the task is cancelled during shutdown. 

248 """ 

249 logger.info(f"Metrics rollup loop started (interval={self.rollup_interval_hours}h)") 

250 

251 # Calculate interval in seconds 

252 interval_seconds = self.rollup_interval_hours * 3600 

253 # On first run, do a backfill check 

254 first_run = True 

255 

256 while not self._shutdown_event.is_set(): 

257 try: 

258 # Wait for interval or shutdown (skip wait on first run for immediate backfill) 

259 if not first_run: 

260 try: 

261 await asyncio.wait_for( 

262 self._shutdown_event.wait(), 

263 timeout=interval_seconds, 

264 ) 

265 # Shutdown signaled 

266 break 

267 except asyncio.TimeoutError: 

268 # Normal timeout, proceed to rollup 

269 pass 

270 

271 if self._pause_event.is_set(): 

272 logger.info(f"Metrics rollup paused ({self._pause_reason or 'maintenance'}), skipping this cycle") 

273 try: 

274 await asyncio.wait_for(self._shutdown_event.wait(), timeout=5) 

275 except asyncio.TimeoutError: 

276 pass 

277 continue 

278 

279 # Determine hours_back based on whether this is first run or normal run 

280 if first_run: 

281 # On first run, detect backfill gap (may scan entire retention period) 

282 hours_back = await asyncio.to_thread(self._detect_backfill_hours) 

283 if hours_back > 24: 

284 logger.info(f"Backfill detected: rolling up {hours_back} hours of unprocessed metrics") 

285 first_run = False 

286 else: 

287 # Normal runs: only process recent hours to catch late-arriving data 

288 # Configurable via METRICS_ROLLUP_LATE_DATA_HOURS (default: 1 hour) 

289 # This avoids walking through entire retention period every interval 

290 hours_back = getattr(settings, "metrics_rollup_late_data_hours", 1) 

291 

292 # Run rollup for the calculated time range 

293 summary = await self.rollup_all(hours_back=hours_back) 

294 self._rollup_runs += 1 

295 self._total_rollups += summary.total_rollups_created 

296 

297 if summary.total_rollups_created > 0 or summary.total_rollups_updated > 0: 

298 logger.info( 

299 f"Metrics rollup #{self._rollup_runs}: created {summary.total_rollups_created}, " 

300 f"updated {summary.total_rollups_updated} rollups " 

301 f"from {summary.total_records_aggregated} records in {summary.duration_seconds:.2f}s" 

302 ) 

303 

304 except asyncio.CancelledError: 

305 logger.debug("Rollup loop cancelled") 

306 raise 

307 except Exception as e: 

308 logger.error(f"Error in metrics rollup loop: {e}", exc_info=True) 

309 # Continue the loop despite errors 

310 await asyncio.sleep(60) 

311 

312 def _detect_backfill_hours(self) -> int: 

313 """Detect how many hours back we need to roll up. 

314 

315 Checks for the earliest unprocessed raw metric and calculates how many 

316 hours of backfill are needed. Returns a minimum of 24 hours and caps at 

317 the retention period to avoid excessive processing. 

318 

319 Returns: 

320 int: Number of hours to roll up (minimum 24, maximum retention days * 24) 

321 """ 

322 retention_days = getattr(settings, "metrics_retention_days", 30) 

323 max_hours = retention_days * 24 # Cap at retention period 

324 

325 try: 

326 with fresh_db_session() as db: 

327 # Find the earliest raw metric timestamp across all tables 

328 earliest_raw = None 

329 

330 for _, raw_model, _hourly_model, _, _, _ in self.METRIC_TABLES: 

331 # Get earliest unprocessed raw metric (where no rollup exists for that hour) 

332 result = db.execute(select(func.min(raw_model.timestamp))).scalar() 

333 

334 if result and (earliest_raw is None or result < earliest_raw): 

335 earliest_raw = result 

336 

337 if earliest_raw is None: 

338 # No raw metrics, use default 

339 return 24 

340 

341 # Calculate hours since earliest raw metric 

342 now = datetime.now(timezone.utc) 

343 hours_since_earliest = int((now - earliest_raw).total_seconds() / 3600) + 1 

344 

345 # Clamp between 24 and max_hours 

346 return max(24, min(hours_since_earliest, max_hours)) 

347 

348 except Exception as e: 

349 logger.warning(f"Error detecting backfill hours: {e}, using default 24") 

350 return 24 

351 

352 async def rollup_all( 

353 self, 

354 hours_back: int = 24, 

355 force_reprocess: bool = False, 

356 ) -> RollupSummary: 

357 """Roll up all metrics tables for the specified time range. 

358 

359 Args: 

360 hours_back: How many hours back to look for unprocessed metrics (default: 24) 

361 force_reprocess: Reprocess even if rollup already exists (default: False) 

362 

363 Returns: 

364 RollupSummary: Summary of rollup operations 

365 """ 

366 started_at = datetime.now(timezone.utc) 

367 start_time = time.monotonic() 

368 

369 # Calculate time range (process completed hours only, not current hour) 

370 now = datetime.now(timezone.utc) 

371 # Round down to start of current hour 

372 current_hour_start = now.replace(minute=0, second=0, microsecond=0) 

373 # Go back hours_back from the start of current hour 

374 start_hour = current_hour_start - timedelta(hours=hours_back) 

375 results: Dict[str, RollupResult] = {} 

376 total_hours = 0 

377 total_records = 0 

378 total_created = 0 

379 total_updated = 0 

380 

381 for table_name, raw_model, hourly_model, entity_model, entity_id_col, entity_name_col in self.METRIC_TABLES: 

382 result = await asyncio.to_thread( 

383 self._rollup_table, 

384 table_name, 

385 raw_model, 

386 hourly_model, 

387 entity_model, 

388 entity_id_col, 

389 entity_name_col, 

390 start_hour, 

391 current_hour_start, 

392 force_reprocess, 

393 ) 

394 results[table_name] = result 

395 total_hours += result.hours_processed 

396 total_records += result.records_aggregated 

397 total_created += result.rollups_created 

398 total_updated += result.rollups_updated 

399 

400 duration = time.monotonic() - start_time 

401 completed_at = datetime.now(timezone.utc) 

402 

403 return RollupSummary( 

404 total_hours_processed=total_hours, 

405 total_records_aggregated=total_records, 

406 total_rollups_created=total_created, 

407 total_rollups_updated=total_updated, 

408 tables=results, 

409 duration_seconds=duration, 

410 started_at=started_at, 

411 completed_at=completed_at, 

412 ) 

413 

414 def _rollup_table( 

415 self, 

416 table_name: str, 

417 raw_model: Type, 

418 hourly_model: Type, 

419 entity_model: Type, 

420 entity_id_col: str, 

421 entity_name_col: str, 

422 start_hour: datetime, 

423 end_hour: datetime, 

424 force_reprocess: bool, # pylint: disable=unused-argument 

425 ) -> RollupResult: 

426 """Roll up metrics for a single table. 

427 

428 Note: As of the late-data fix, rollup always re-aggregates when raw data exists, 

429 regardless of whether a rollup already exists. This ensures late-arriving metrics 

430 are properly included. The force_reprocess parameter is kept for API compatibility. 

431 

432 Args: 

433 table_name: Name of the table being processed 

434 raw_model: SQLAlchemy model for raw metrics 

435 hourly_model: SQLAlchemy model for hourly rollups 

436 entity_model: SQLAlchemy model for the entity (Tool, Resource, etc.) 

437 entity_id_col: Name of the entity ID column in raw model 

438 entity_name_col: Name of the entity name column in entity model 

439 start_hour: Start of time range 

440 end_hour: End of time range (exclusive) 

441 force_reprocess: Kept for API compatibility (behavior now always re-processes) 

442 

443 Returns: 

444 RollupResult: Result of the rollup operation 

445 """ 

446 start_time = time.monotonic() 

447 hours_processed = 0 

448 records_aggregated = 0 

449 rollups_created = 0 

450 rollups_updated = 0 

451 raw_deleted = 0 

452 error_msg = None 

453 

454 is_a2a = table_name == "a2a_agent_metrics" 

455 

456 try: 

457 with fresh_db_session() as db: 

458 # Process each hour in the range 

459 current = start_hour 

460 while current < end_hour: 

461 hour_end = current + timedelta(hours=1) 

462 

463 # Check if we have raw metrics for this hour 

464 # pylint: disable=not-callable 

465 raw_count = ( 

466 db.execute( 

467 select(func.count()) 

468 .select_from(raw_model) 

469 .where( 

470 and_( 

471 raw_model.timestamp >= current, 

472 raw_model.timestamp < hour_end, 

473 ) 

474 ) 

475 ).scalar() 

476 or 0 

477 ) 

478 

479 if raw_count > 0: 479 ↛ 518line 479 didn't jump to line 518 because the condition on line 479 was always true

480 # Always re-aggregate when there's raw data, even if rollup exists. 

481 # This ensures late-arriving metrics (buffer flush, ingestion lag) are included. 

482 # The _aggregate_hour queries ALL raw data for the hour, and _upsert_rollup 

483 # handles updating existing rollups correctly. 

484 

485 # Aggregate metrics for this hour 

486 aggregations = self._aggregate_hour( 

487 db, 

488 raw_model, 

489 entity_model, 

490 entity_id_col, 

491 entity_name_col, 

492 current, 

493 hour_end, 

494 is_a2a, 

495 ) 

496 # Upsert rollups 

497 for agg in aggregations: 

498 created, updated = self._upsert_rollup( 

499 db, 

500 hourly_model, 

501 entity_id_col, 

502 agg, 

503 is_a2a, 

504 ) 

505 rollups_created += created 

506 rollups_updated += updated 

507 records_aggregated += agg.total_count 

508 

509 hours_processed += 1 

510 

511 # Delete raw metrics if configured 

512 if self.delete_raw_after_rollup: 512 ↛ 518line 512 didn't jump to line 518 because the condition on line 512 was always true

513 delete_cutoff = datetime.now(timezone.utc) - timedelta(hours=self.delete_raw_after_rollup_hours) 

514 if hour_end < delete_cutoff: 514 ↛ 518line 514 didn't jump to line 518 because the condition on line 514 was always true

515 deleted = self._delete_raw_metrics(db, raw_model, current, hour_end) 

516 raw_deleted += deleted 

517 

518 current = hour_end 

519 

520 db.commit() 

521 

522 except Exception as e: 

523 logger.error(f"Error rolling up {table_name}: {e}", exc_info=True) 

524 error_msg = str(e) 

525 

526 duration = time.monotonic() - start_time 

527 

528 if rollups_created + rollups_updated > 0: 

529 logger.debug(f"Rolled up {table_name}: {records_aggregated} records -> " f"{rollups_created} new, {rollups_updated} updated rollups") 

530 

531 return RollupResult( 

532 table_name=table_name, 

533 hours_processed=hours_processed, 

534 records_aggregated=records_aggregated, 

535 rollups_created=rollups_created, 

536 rollups_updated=rollups_updated, 

537 raw_deleted=raw_deleted, 

538 duration_seconds=duration, 

539 error=error_msg, 

540 ) 

541 

542 def _aggregate_hour( 

543 self, 

544 db: Session, 

545 raw_model: Type, 

546 entity_model: Type, 

547 entity_id_col: str, 

548 entity_name_col: str, 

549 hour_start: datetime, 

550 hour_end: datetime, 

551 is_a2a: bool, 

552 ) -> List[HourlyAggregation]: 

553 """Aggregate raw metrics for a single hour using optimized bulk queries. 

554 

555 Uses a single GROUP BY query to get basic aggregations (count, min, max, avg, 

556 success count) for all entities at once, minimizing database round trips. 

557 Percentiles are calculated by loading response times in a single bulk query. 

558 

559 Args: 

560 db: Database session 

561 raw_model: SQLAlchemy model for raw metrics 

562 entity_model: SQLAlchemy model for the entity 

563 entity_id_col: Name of the entity ID column 

564 entity_name_col: Name of the entity name column 

565 hour_start: Start of the hour 

566 hour_end: End of the hour 

567 is_a2a: Whether this is A2A agent metrics (has interaction_type) 

568 

569 Returns: 

570 List[HourlyAggregation]: Aggregated metrics for each entity 

571 

572 Raises: 

573 Exception: If aggregation fails due to a database query or processing error. 

574 """ 

575 try: 

576 entity_id_attr = getattr(raw_model, entity_id_col) 

577 entity_name_attr = getattr(entity_model, entity_name_col) 

578 

579 time_filter = and_( 

580 raw_model.timestamp >= hour_start, 

581 raw_model.timestamp < hour_end, 

582 ) 

583 

584 aggregations: list = [] 

585 if self._is_postgresql and settings.use_postgresdb_percentiles: 

586 # ---- build SELECT and GROUP BY dynamically (CRITICAL FIX) ---- 

587 select_cols = [ 

588 entity_id_attr.label("entity_id"), 

589 func.coalesce(entity_name_attr, "unknown").label("entity_name"), 

590 ] 

591 group_by_cols = [ 

592 entity_id_attr, 

593 entity_name_attr, 

594 ] 

595 

596 if is_a2a: 

597 select_cols.append(raw_model.interaction_type.label("interaction_type")) 

598 group_by_cols.append(raw_model.interaction_type) 

599 

600 # pylint: disable=not-callable 

601 agg_query = ( 

602 select( 

603 *select_cols, 

604 func.count(raw_model.id).label("total_count"), 

605 func.sum(case((raw_model.is_success.is_(True), 1), else_=0)).label("success_count"), 

606 func.min(raw_model.response_time).label("min_rt"), 

607 func.max(raw_model.response_time).label("max_rt"), 

608 func.avg(raw_model.response_time).label("avg_rt"), 

609 func.percentile_cont(0.50).within_group(raw_model.response_time).label("p50_rt"), 

610 func.percentile_cont(0.95).within_group(raw_model.response_time).label("p95_rt"), 

611 func.percentile_cont(0.99).within_group(raw_model.response_time).label("p99_rt"), 

612 ) 

613 .select_from(raw_model) 

614 .join(entity_model, entity_model.id == entity_id_attr, isouter=True) 

615 .where(time_filter) 

616 .group_by(*group_by_cols) 

617 ) 

618 # pylint: enable=not-callable 

619 for row in db.execute(agg_query).yield_per(settings.yield_batch_size): 

620 aggregations.append( 

621 HourlyAggregation( 

622 entity_id=row.entity_id, 

623 entity_name=row.entity_name, 

624 hour_start=hour_start, 

625 total_count=row.total_count, 

626 success_count=row.success_count, 

627 failure_count=row.total_count - row.success_count, 

628 min_response_time=row.min_rt, 

629 max_response_time=row.max_rt, 

630 avg_response_time=row.avg_rt, 

631 p50_response_time=row.p50_rt, 

632 p95_response_time=row.p95_rt, 

633 p99_response_time=row.p99_rt, 

634 interaction_type=row.interaction_type if is_a2a else None, 

635 ) 

636 ) 

637 else: 

638 # Build group by columns 

639 if is_a2a: 

640 group_cols = [entity_id_attr, raw_model.interaction_type] 

641 else: 

642 group_cols = [entity_id_attr] 

643 

644 # Time filter for this hour 

645 time_filter = and_( 

646 raw_model.timestamp >= hour_start, 

647 raw_model.timestamp < hour_end, 

648 ) 

649 

650 # OPTIMIZED: Single bulk query for basic aggregations per entity 

651 # pylint: disable=not-callable 

652 agg_query = ( 

653 select( 

654 *group_cols, 

655 func.count(raw_model.id).label("total_count"), 

656 func.sum(case((raw_model.is_success.is_(True), 1), else_=0)).label("success_count"), 

657 func.min(raw_model.response_time).label("min_rt"), 

658 func.max(raw_model.response_time).label("max_rt"), 

659 func.avg(raw_model.response_time).label("avg_rt"), 

660 ) 

661 .where(time_filter) 

662 .group_by(*group_cols) 

663 ) 

664 

665 # Store aggregation results by entity key 

666 agg_results = {} 

667 for row in db.execute(agg_query).yield_per(settings.yield_batch_size): 

668 entity_id = row[0] 

669 interaction_type = row[1] if is_a2a else None 

670 key = (entity_id, interaction_type) if is_a2a else entity_id 

671 

672 agg_results[key] = { 

673 "entity_id": entity_id, 

674 "interaction_type": interaction_type, 

675 "total_count": row.total_count or 0, 

676 "success_count": row.success_count or 0, 

677 "min_rt": row.min_rt, 

678 "max_rt": row.max_rt, 

679 "avg_rt": row.avg_rt, 

680 } 

681 

682 if not agg_results: 

683 return [] 

684 

685 # OPTIMIZED: Bulk load entity names in one query 

686 entity_ids = list(set(r["entity_id"] for r in agg_results.values())) 

687 entity_names = {} 

688 if entity_ids: 688 ↛ 694line 688 didn't jump to line 694 because the condition on line 688 was always true

689 entities = db.execute(select(entity_model.id, getattr(entity_model, entity_name_col)).where(entity_model.id.in_(entity_ids))) # .fetchall() 

690 entity_names = {e[0]: e[1] for e in entities} 

691 

692 # OPTIMIZED: Bulk load all response times for percentile calculation 

693 # Load all response times for the hour in one query, grouped by entity 

694 rt_query = ( 

695 select( 

696 *group_cols, 

697 raw_model.response_time, 

698 ) 

699 .where(time_filter) 

700 .order_by(*group_cols, raw_model.response_time) 

701 ) 

702 

703 # Group response times by entity 

704 response_times_by_entity: Dict[Any, List[float]] = {} 

705 for row in db.execute(rt_query).yield_per(settings.yield_batch_size): 

706 entity_id = row[0] 

707 interaction_type = row[1] if is_a2a else None 

708 key = (entity_id, interaction_type) if is_a2a else entity_id 

709 rt = row.response_time if not is_a2a else row[2] 

710 

711 if key not in response_times_by_entity: 

712 response_times_by_entity[key] = [] 

713 if rt is not None: 713 ↛ 705line 713 didn't jump to line 705 because the condition on line 713 was always true

714 response_times_by_entity[key].append(rt) 

715 

716 # Build aggregation results with percentiles 

717 aggregations = [] 

718 for key, agg in agg_results.items(): 

719 entity_id = agg["entity_id"] 

720 interaction_type = agg["interaction_type"] 

721 

722 # Get entity name 

723 entity_name = entity_names.get(entity_id, "unknown") 

724 

725 # Get response times for percentile calculation 

726 response_times = response_times_by_entity.get(key, []) 

727 

728 # Calculate percentiles (response_times are already sorted from ORDER BY) 

729 if response_times: 

730 p50_rt = self._percentile(response_times, 50) 

731 p95_rt = self._percentile(response_times, 95) 

732 p99_rt = self._percentile(response_times, 99) 

733 else: 

734 p50_rt = p95_rt = p99_rt = None 

735 

736 aggregations.append( 

737 HourlyAggregation( 

738 entity_id=entity_id, 

739 entity_name=entity_name, 

740 hour_start=hour_start, 

741 total_count=agg["total_count"], 

742 success_count=agg["success_count"], 

743 failure_count=agg["total_count"] - agg["success_count"], 

744 min_response_time=agg["min_rt"], 

745 max_response_time=agg["max_rt"], 

746 avg_response_time=agg["avg_rt"], 

747 p50_response_time=p50_rt, 

748 p95_response_time=p95_rt, 

749 p99_response_time=p99_rt, 

750 interaction_type=interaction_type, 

751 ) 

752 ) 

753 return aggregations 

754 except Exception: 

755 logger.exception( 

756 "Failed to aggregate hourly metrics", 

757 extra={ 

758 "hour_start": hour_start, 

759 "hour_end": hour_end, 

760 "raw_model": raw_model.__name__, 

761 "entity_model": entity_model.__name__, 

762 "is_a2a": is_a2a, 

763 }, 

764 ) 

765 raise 

766 

767 def _percentile(self, sorted_data: List[float], percentile: int) -> float: 

768 """Calculate percentile from sorted data. 

769 

770 Args: 

771 sorted_data: Sorted list of values 

772 percentile: Percentile to calculate (0-100) 

773 

774 Returns: 

775 float: The percentile value 

776 """ 

777 if not sorted_data: 

778 return 0.0 

779 

780 k = (len(sorted_data) - 1) * percentile / 100 

781 f = int(k) 

782 c = f + 1 if f + 1 < len(sorted_data) else f 

783 

784 if f == c: 

785 return sorted_data[f] 

786 return sorted_data[f] + (k - f) * (sorted_data[c] - sorted_data[f]) 

787 

788 def _upsert_rollup( 

789 self, 

790 db: Session, 

791 hourly_model: Type, 

792 entity_id_col: str, 

793 agg: HourlyAggregation, 

794 is_a2a: bool, 

795 ) -> Tuple[int, int]: 

796 """ 

797 Insert or update a single hourly rollup record using a DB-aware UPSERT. 

798 This function is concurrency-safe for PostgreSQL and SQLite. 

799 Falls back to Python SELECT+UPDATE/INSERT for unsupported DBs 

800 

801 This function is concurrency-safe and enforces uniqueness at the database level. 

802 

803 Args: 

804 db (Session): Active SQLAlchemy database session. 

805 hourly_model (Type): ORM model representing the hourly rollup table. 

806 entity_id_col (str): Name of the entity ID column (e.g. "tool_id", "agent_id"). 

807 agg (HourlyAggregation): Aggregated hourly metrics for a single entity. 

808 is_a2a (bool): Whether interaction_type should be included in the uniqueness key. 

809 

810 Returns: 

811 Tuple[int, int]: Best-effort (inserted_count, updated_count) values for logging only. 

812 

813 Raises: 

814 SQLAlchemyError: If the database UPSERT operation fails. 

815 """ 

816 try: 

817 # Resolve name column 

818 name_col_map = { 

819 "tool_id": "tool_name", 

820 "resource_id": "resource_name", 

821 "prompt_id": "prompt_name", 

822 "server_id": "server_name", 

823 } 

824 name_col = name_col_map.get(entity_id_col, "agent_name") 

825 

826 # Normalizing 

827 hour_start = agg.hour_start.replace(minute=0, second=0, microsecond=0) 

828 

829 values = { 

830 entity_id_col: agg.entity_id, 

831 name_col: agg.entity_name, 

832 "hour_start": hour_start, 

833 "total_count": agg.total_count, 

834 "success_count": agg.success_count, 

835 "failure_count": agg.failure_count, 

836 "min_response_time": agg.min_response_time, 

837 "max_response_time": agg.max_response_time, 

838 "avg_response_time": agg.avg_response_time, 

839 "p50_response_time": agg.p50_response_time, 

840 "p95_response_time": agg.p95_response_time, 

841 "p99_response_time": agg.p99_response_time, 

842 } 

843 

844 if is_a2a: 

845 values["interaction_type"] = agg.interaction_type 

846 

847 dialect = db.bind.dialect.name if db.bind else "unknown" 

848 conflict_cols = [ 

849 getattr(hourly_model, entity_id_col), 

850 hourly_model.hour_start, 

851 ] 

852 

853 if is_a2a: 

854 conflict_cols.append(hourly_model.interaction_type) 

855 

856 logger.debug( 

857 "Upserting hourly rollup", 

858 extra={ 

859 "dialect": dialect, 

860 "entity_id_col": entity_id_col, 

861 "entity_id": agg.entity_id, 

862 "hour_start": hour_start.isoformat(), 

863 "is_a2a": is_a2a, 

864 }, 

865 ) 

866 

867 if dialect == "postgresql": 

868 # ======================= 

869 # PostgreSQL 

870 # ======================= 

871 stmt = pg_insert(hourly_model).values(**values) 

872 update_cols = {k: stmt.excluded[k] for k in values if k not in (entity_id_col, "hour_start", "interaction_type")} 

873 stmt = stmt.on_conflict_do_update( 

874 index_elements=conflict_cols, 

875 set_=update_cols, 

876 ) 

877 

878 db.execute(stmt) 

879 return (0, 1) 

880 

881 if "sqlite" in dialect: 

882 # ======================= 

883 # SQLite 

884 # ======================= 

885 stmt = sqlite_insert(hourly_model).values(**values) 

886 

887 update_cols = {k: stmt.excluded[k] for k in values if k not in (entity_id_col, "hour_start", "interaction_type")} 

888 

889 stmt = stmt.on_conflict_do_update( 

890 index_elements=conflict_cols, 

891 set_=update_cols, 

892 ) 

893 

894 db.execute(stmt) 

895 return (0, 1) 

896 

897 logger.warning( 

898 "Dialect does not support native UPSERT. Using Python fallback with conflict handling.", 

899 extra={"dialect": dialect}, 

900 ) 

901 # Use savepoint to avoid rolling back the entire transaction on conflict 

902 savepoint = db.begin_nested() 

903 try: 

904 db.add(hourly_model(**values)) 

905 db.flush() # Force INSERT now 

906 savepoint.commit() 

907 return (1, 0) 

908 except IntegrityError: 

909 savepoint.rollback() # Only roll back the savepoint, not the whole transaction 

910 logger.info( 

911 "Insert conflict detected in fallback path. Retrying as update.", 

912 extra={ 

913 "entity_id_col": entity_id_col, 

914 "entity_id": agg.entity_id, 

915 "hour_start": hour_start.isoformat(), 

916 "is_a2a": is_a2a, 

917 }, 

918 ) 

919 

920 entity_id_attr = getattr(hourly_model, entity_id_col) 

921 

922 filters = [ 

923 entity_id_attr == agg.entity_id, 

924 hourly_model.hour_start == hour_start, 

925 ] 

926 

927 if is_a2a: 

928 filters.append(hourly_model.interaction_type == agg.interaction_type) 

929 

930 existing = db.execute(select(hourly_model).where(and_(*filters))).scalar_one() 

931 

932 for key, value in values.items(): 

933 if key not in (entity_id_col, "hour_start", "interaction_type"): 

934 setattr(existing, key, value) 

935 

936 return (0, 1) 

937 

938 except SQLAlchemyError: 

939 logger.exception( 

940 "Failed to upsert hourly rollup", 

941 extra={ 

942 "entity_id_col": entity_id_col, 

943 "entity_id": agg.entity_id, 

944 "hour_start": hour_start.isoformat(), 

945 "is_a2a": is_a2a, 

946 }, 

947 ) 

948 raise 

949 

950 def _delete_raw_metrics( 

951 self, 

952 db: Session, 

953 raw_model: Type, 

954 hour_start: datetime, 

955 hour_end: datetime, 

956 ) -> int: 

957 """Delete raw metrics for a given hour after rollup. 

958 

959 Args: 

960 db: Database session 

961 raw_model: SQLAlchemy model for raw metrics 

962 hour_start: Start of the hour 

963 hour_end: End of the hour 

964 

965 Returns: 

966 int: Number of records deleted 

967 """ 

968 result = db.execute( 

969 delete(raw_model).where( 

970 and_( 

971 raw_model.timestamp >= hour_start, 

972 raw_model.timestamp < hour_end, 

973 ) 

974 ) 

975 ) 

976 return result.rowcount 

977 

978 def get_stats(self) -> dict: 

979 """Get rollup service statistics. 

980 

981 Returns: 

982 dict: Rollup statistics 

983 """ 

984 return { 

985 "enabled": self.enabled, 

986 "rollup_interval_hours": self.rollup_interval_hours, 

987 "delete_raw_after_rollup": self.delete_raw_after_rollup, 

988 "delete_raw_after_rollup_hours": self.delete_raw_after_rollup_hours, 

989 "total_rollups": self._total_rollups, 

990 "rollup_runs": self._rollup_runs, 

991 "is_postgresql": self._is_postgresql, 

992 } 

993 

994 

995# Singleton instance 

996_metrics_rollup_service: Optional[MetricsRollupService] = None 

997 

998 

999def get_metrics_rollup_service() -> MetricsRollupService: 

1000 """Get or create the singleton MetricsRollupService instance. 

1001 

1002 Returns: 

1003 MetricsRollupService: The singleton rollup service instance 

1004 """ 

1005 global _metrics_rollup_service # pylint: disable=global-statement 

1006 if _metrics_rollup_service is None: 

1007 _metrics_rollup_service = MetricsRollupService() 

1008 return _metrics_rollup_service 

1009 

1010 

1011def get_metrics_rollup_service_if_initialized() -> Optional[MetricsRollupService]: 

1012 """Return the rollup service instance if it has been created. 

1013 

1014 Returns: 

1015 The rollup service instance or None if not initialized. 

1016 """ 

1017 return _metrics_rollup_service