Coverage for mcpgateway / services / metrics_query_service.py: 99%

172 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Metrics Query Service for combined raw + rollup queries. 

3 

4This service provides unified metrics queries that combine recent raw metrics 

5with historical hourly rollups for complete historical coverage. 

6 

7Copyright 2025 

8SPDX-License-Identifier: Apache-2.0 

9""" 

10 

11# Standard 

12from dataclasses import dataclass 

13from datetime import datetime, timedelta, timezone 

14import logging 

15from typing import Any, Dict, List, Optional, Type 

16 

17# Third-Party 

18from sqlalchemy import and_, case, func, literal, select, union_all 

19from sqlalchemy.orm import Session 

20 

21# First-Party 

22from mcpgateway.config import settings 

23from mcpgateway.db import ( 

24 A2AAgentMetric, 

25 A2AAgentMetricsHourly, 

26 PromptMetric, 

27 PromptMetricsHourly, 

28 ResourceMetric, 

29 ResourceMetricsHourly, 

30 ServerMetric, 

31 ServerMetricsHourly, 

32 ToolMetric, 

33 ToolMetricsHourly, 

34) 

35 

36logger = logging.getLogger(__name__) 

37 

38 

39@dataclass 

40class AggregatedMetrics: 

41 """Aggregated metrics result combining raw and rollup data.""" 

42 

43 total_executions: int 

44 successful_executions: int 

45 failed_executions: int 

46 failure_rate: float 

47 min_response_time: Optional[float] 

48 max_response_time: Optional[float] 

49 avg_response_time: Optional[float] 

50 last_execution_time: Optional[datetime] 

51 # Source breakdown for debugging 

52 raw_count: int = 0 

53 rollup_count: int = 0 

54 

55 def to_dict(self) -> Dict[str, Any]: 

56 """Convert to dictionary format for API response. 

57 

58 Returns: 

59 Dict[str, Any]: Dictionary representation of the metrics. 

60 """ 

61 return { 

62 "total_executions": self.total_executions, 

63 "successful_executions": self.successful_executions, 

64 "failed_executions": self.failed_executions, 

65 "failure_rate": self.failure_rate, 

66 "min_response_time": self.min_response_time, 

67 "max_response_time": self.max_response_time, 

68 "avg_response_time": self.avg_response_time, 

69 "last_execution_time": self.last_execution_time, 

70 } 

71 

72 

73@dataclass 

74class TopPerformerResult: 

75 """Result object for top performer queries, compatible with build_top_performers.""" 

76 

77 id: str 

78 name: str 

79 execution_count: int 

80 avg_response_time: Optional[float] 

81 success_rate: Optional[float] 

82 last_execution: Optional[datetime] 

83 

84 

85# Mapping of metric types to their raw and hourly models 

86# Format: (RawModel, HourlyModel, entity_id_column, preserved_name_column) 

87METRIC_MODELS = { 

88 "tool": (ToolMetric, ToolMetricsHourly, "tool_id", "tool_name"), 

89 "resource": (ResourceMetric, ResourceMetricsHourly, "resource_id", "resource_name"), 

90 "prompt": (PromptMetric, PromptMetricsHourly, "prompt_id", "prompt_name"), 

91 "server": (ServerMetric, ServerMetricsHourly, "server_id", "server_name"), 

92 "a2a_agent": (A2AAgentMetric, A2AAgentMetricsHourly, "a2a_agent_id", "agent_name"), 

93} 

94 

95 

96def get_current_hour_start() -> datetime: 

97 """Get the start of the current hour (UTC). 

98 

99 Returns: 

100 datetime: Start of current hour, aligned to hour boundary. 

101 """ 

102 now = datetime.now(timezone.utc) 

103 return now.replace(minute=0, second=0, microsecond=0) 

104 

105 

106def _merge_min(a: Optional[float], b: Optional[float]) -> Optional[float]: 

107 """Merge two optional minimum values, returning the smaller one. 

108 

109 Args: 

110 a: First optional float value. 

111 b: Second optional float value. 

112 

113 Returns: 

114 The smaller of the two values, or the non-None value if one is None. 

115 """ 

116 if a is not None and b is not None: 

117 return min(a, b) 

118 return a if a is not None else b 

119 

120 

121def _merge_max(a: Optional[float], b: Optional[float]) -> Optional[float]: 

122 """Merge two optional maximum values, returning the larger one. 

123 

124 Args: 

125 a: First optional float value. 

126 b: Second optional float value. 

127 

128 Returns: 

129 The larger of the two values, or the non-None value if one is None. 

130 """ 

131 if a is not None and b is not None: 

132 return max(a, b) 

133 return a if a is not None else b 

134 

135 

136def _merge_weighted_avg(avg1: Optional[float], count1: int, avg2: Optional[float], count2: int) -> Optional[float]: 

137 """Merge two weighted averages. 

138 

139 Args: 

140 avg1: First average value (or None). 

141 count1: Count for first average. 

142 avg2: Second average value (or None). 

143 count2: Count for second average. 

144 

145 Returns: 

146 Weighted average of the two, or None if both are None. 

147 """ 

148 total = count1 + count2 

149 if total == 0: 

150 return None 

151 if count1 > 0 and count2 > 0 and avg1 is not None and avg2 is not None: 

152 return (avg1 * count1 + avg2 * count2) / total 

153 if avg1 is not None and count1 > 0: 

154 return avg1 

155 if avg2 is not None and count2 > 0: 

156 return avg2 

157 return None 

158 

159 

160def _merge_last_time(a: Optional[datetime], b: Optional[datetime]) -> Optional[datetime]: 

161 """Merge two optional timestamps, returning the most recent one. 

162 

163 Args: 

164 a: First optional datetime value. 

165 b: Second optional datetime value. 

166 

167 Returns: 

168 The more recent of the two timestamps, or the non-None value if one is None. 

169 """ 

170 if a is not None and b is not None: 

171 return max(a, b) 

172 return a if a is not None else b 

173 

174 

175def get_retention_cutoff() -> datetime: 

176 """Get the cutoff datetime for raw metrics retention, aligned to hour boundary. 

177 

178 This considers both the configured retention period AND the delete_raw_after_rollup 

179 setting to ensure we query rollups for any period where raw data may have been deleted. 

180 

181 The cutoff is aligned to the start of the hour to prevent double-counting: 

182 - Raw data uses: timestamp >= cutoff (data from cutoff hour onward) 

183 - Rollups use: hour_start < cutoff (rollups before cutoff hour) 

184 

185 Returns: 

186 datetime: The cutoff point (hour-aligned) - data older than this comes from rollups. 

187 """ 

188 retention_days = getattr(settings, "metrics_retention_days", 7) 

189 now = datetime.now(timezone.utc) 

190 cutoff = now - timedelta(days=retention_days) 

191 

192 # If raw data is deleted after rollup, use the more recent cutoff 

193 # to ensure rollups cover any deleted raw data 

194 delete_raw_enabled = getattr(settings, "metrics_delete_raw_after_rollup", False) 

195 if delete_raw_enabled: 

196 delete_raw_hours = getattr(settings, "metrics_delete_raw_after_rollup_hours", 1) 

197 delete_cutoff = now - timedelta(hours=delete_raw_hours) 

198 cutoff = max(cutoff, delete_cutoff) 

199 

200 # Align to hour boundary (round down) to prevent double-counting at the boundary 

201 # Raw query uses >= cutoff, rollup query uses < cutoff, so no overlap 

202 return cutoff.replace(minute=0, second=0, microsecond=0) 

203 

204 

205def get_current_hour_aggregation( 

206 db: Session, 

207 metric_type: str, 

208 entity_id: Optional[str] = None, 

209) -> Optional[AggregatedMetrics]: 

210 """Aggregate raw metrics for the current incomplete hour only. 

211 

212 This function queries raw metrics from the start of the current hour 

213 to now, providing real-time visibility into metrics that haven't been 

214 rolled up yet. 

215 

216 Args: 

217 db: Database session. 

218 metric_type: Type of metric ('tool', 'resource', 'prompt', 'server', 'a2a_agent'). 

219 entity_id: Optional entity ID to filter by. 

220 

221 Returns: 

222 AggregatedMetrics for the current hour, or None if no data exists. 

223 

224 Raises: 

225 ValueError: If metric_type is not recognized. 

226 """ 

227 if metric_type not in METRIC_MODELS: 

228 raise ValueError(f"Unknown metric type: {metric_type}") 

229 

230 raw_model, _, id_col, _ = METRIC_MODELS[metric_type] 

231 current_hour_start = get_current_hour_start() 

232 

233 # Query current hour raw metrics 

234 filters = [raw_model.timestamp >= current_hour_start] 

235 if entity_id is not None: 

236 filters.append(getattr(raw_model, id_col) == entity_id) 

237 

238 # pylint: disable=not-callable 

239 result = db.execute( 

240 select( 

241 func.count(raw_model.id).label("total"), 

242 func.sum(case((raw_model.is_success.is_(True), 1), else_=0)).label("successful"), 

243 func.sum(case((raw_model.is_success.is_(False), 1), else_=0)).label("failed"), 

244 func.min(raw_model.response_time).label("min_rt"), 

245 func.max(raw_model.response_time).label("max_rt"), 

246 func.avg(raw_model.response_time).label("avg_rt"), 

247 func.max(raw_model.timestamp).label("last_time"), 

248 ).where(and_(*filters)) 

249 ).one() 

250 

251 total = result.total or 0 

252 if total == 0: 

253 return None 

254 

255 successful = result.successful or 0 

256 failed = result.failed or 0 

257 

258 return AggregatedMetrics( 

259 total_executions=total, 

260 successful_executions=successful, 

261 failed_executions=failed, 

262 failure_rate=failed / total if total > 0 else 0.0, 

263 min_response_time=result.min_rt, 

264 max_response_time=result.max_rt, 

265 avg_response_time=result.avg_rt, 

266 last_execution_time=result.last_time, 

267 raw_count=total, 

268 rollup_count=0, 

269 ) 

270 

271 

272def aggregate_metrics_combined( 

273 db: Session, 

274 metric_type: str, 

275 entity_id: Optional[str] = None, 

276) -> AggregatedMetrics: 

277 """Aggregate metrics combining three data sources for complete coverage. 

278 

279 This function queries: 

280 1. Hourly rollup table (for data older than retention cutoff) 

281 2. Raw metrics table (for completed hours within retention period) 

282 3. Current hour raw metrics (for the incomplete current hour) 

283 

284 This three-source approach ensures metrics are available immediately during 

285 benchmarks, even before the hourly rollup job has processed the current hour. 

286 

287 Args: 

288 db: Database session 

289 metric_type: Type of metric ('tool', 'resource', 'prompt', 'server', 'a2a_agent') 

290 entity_id: Optional entity ID to filter by (e.g., specific tool_id) 

291 

292 Returns: 

293 AggregatedMetrics: Combined metrics from all three sources 

294 

295 Raises: 

296 ValueError: If metric_type is not recognized. 

297 """ 

298 if metric_type not in METRIC_MODELS: 

299 raise ValueError(f"Unknown metric type: {metric_type}") 

300 

301 raw_model, hourly_model, id_col, _ = METRIC_MODELS[metric_type] 

302 cutoff = get_retention_cutoff() 

303 current_hour_start = get_current_hour_start() 

304 

305 # Query 1: Historical rollup data (older than retention cutoff) 

306 rollup_filters = [hourly_model.hour_start < cutoff] 

307 if entity_id is not None: 

308 rollup_filters.append(getattr(hourly_model, id_col) == entity_id) 

309 

310 # pylint: disable=not-callable 

311 rollup_result = db.execute( 

312 select( 

313 func.sum(hourly_model.total_count).label("total"), 

314 func.sum(hourly_model.success_count).label("successful"), 

315 func.sum(hourly_model.failure_count).label("failed"), 

316 func.min(hourly_model.min_response_time).label("min_rt"), 

317 func.max(hourly_model.max_response_time).label("max_rt"), 

318 # Weighted average: sum(avg * count) / sum(count) 

319 (func.sum(hourly_model.avg_response_time * hourly_model.total_count) / func.nullif(func.sum(hourly_model.total_count), 0)).label("avg_rt"), 

320 func.max(hourly_model.hour_start).label("last_time"), 

321 ).where(and_(*rollup_filters)) 

322 ).one() 

323 

324 rollup_total = rollup_result.total or 0 

325 rollup_successful = rollup_result.successful or 0 

326 rollup_failed = rollup_result.failed or 0 

327 rollup_min_rt = rollup_result.min_rt 

328 rollup_max_rt = rollup_result.max_rt 

329 rollup_avg_rt = rollup_result.avg_rt 

330 rollup_last_time = rollup_result.last_time 

331 

332 # Query 2: Raw metrics for completed hours (cutoff <= timestamp < current_hour_start) 

333 # This covers the gap between rollup data and the current incomplete hour 

334 raw_filters = [ 

335 raw_model.timestamp >= cutoff, 

336 raw_model.timestamp < current_hour_start, 

337 ] 

338 if entity_id is not None: 

339 raw_filters.append(getattr(raw_model, id_col) == entity_id) 

340 

341 raw_result = db.execute( 

342 select( 

343 func.count(raw_model.id).label("total"), 

344 func.sum(case((raw_model.is_success.is_(True), 1), else_=0)).label("successful"), 

345 func.sum(case((raw_model.is_success.is_(False), 1), else_=0)).label("failed"), 

346 func.min(raw_model.response_time).label("min_rt"), 

347 func.max(raw_model.response_time).label("max_rt"), 

348 func.avg(raw_model.response_time).label("avg_rt"), 

349 func.max(raw_model.timestamp).label("last_time"), 

350 ).where(and_(*raw_filters)) 

351 ).one() 

352 

353 raw_total = raw_result.total or 0 

354 raw_successful = raw_result.successful or 0 

355 raw_failed = raw_result.failed or 0 

356 raw_min_rt = raw_result.min_rt 

357 raw_max_rt = raw_result.max_rt 

358 raw_avg_rt = raw_result.avg_rt 

359 raw_last_time = raw_result.last_time 

360 

361 # Query 3: Current hour raw metrics (timestamp >= current_hour_start) 

362 # This provides immediate visibility into metrics that haven't been rolled up yet 

363 current_filters = [raw_model.timestamp >= current_hour_start] 

364 if entity_id is not None: 

365 current_filters.append(getattr(raw_model, id_col) == entity_id) 

366 

367 current_result = db.execute( 

368 select( 

369 func.count(raw_model.id).label("total"), 

370 func.sum(case((raw_model.is_success.is_(True), 1), else_=0)).label("successful"), 

371 func.sum(case((raw_model.is_success.is_(False), 1), else_=0)).label("failed"), 

372 func.min(raw_model.response_time).label("min_rt"), 

373 func.max(raw_model.response_time).label("max_rt"), 

374 func.avg(raw_model.response_time).label("avg_rt"), 

375 func.max(raw_model.timestamp).label("last_time"), 

376 ).where(and_(*current_filters)) 

377 ).one() 

378 

379 current_total = current_result.total or 0 

380 current_successful = current_result.successful or 0 

381 current_failed = current_result.failed or 0 

382 current_min_rt = current_result.min_rt 

383 current_max_rt = current_result.max_rt 

384 current_avg_rt = current_result.avg_rt 

385 current_last_time = current_result.last_time 

386 

387 # Merge all three sources 

388 total = rollup_total + raw_total + current_total 

389 successful = rollup_successful + raw_successful + current_successful 

390 failed = rollup_failed + raw_failed + current_failed 

391 failure_rate = failed / total if total > 0 else 0.0 

392 

393 # Min/max across all sources 

394 min_rt = _merge_min(_merge_min(rollup_min_rt, raw_min_rt), current_min_rt) 

395 max_rt = _merge_max(_merge_max(rollup_max_rt, raw_max_rt), current_max_rt) 

396 

397 # Weighted average across all sources 

398 avg_rt = _merge_weighted_avg( 

399 _merge_weighted_avg(rollup_avg_rt, rollup_total, raw_avg_rt, raw_total), 

400 rollup_total + raw_total, 

401 current_avg_rt, 

402 current_total, 

403 ) 

404 

405 # Last execution time (most recent from any source) 

406 last_time = _merge_last_time(_merge_last_time(rollup_last_time, raw_last_time), current_last_time) 

407 

408 return AggregatedMetrics( 

409 total_executions=total, 

410 successful_executions=successful, 

411 failed_executions=failed, 

412 failure_rate=failure_rate, 

413 min_response_time=min_rt, 

414 max_response_time=max_rt, 

415 avg_response_time=avg_rt, 

416 last_execution_time=last_time, 

417 raw_count=raw_total + current_total, 

418 rollup_count=rollup_total, 

419 ) 

420 

421 

422def get_top_entities_combined( 

423 db: Session, 

424 metric_type: str, 

425 entity_model: Type, 

426 limit: int = 10, 

427 order_by: str = "execution_count", 

428 name_column: str = "name", 

429 include_deleted: bool = False, 

430) -> List[Dict[str, Any]]: 

431 """Get top entities by metric counts, combining three data sources. 

432 

433 This function queries: 

434 1. Hourly rollup table (for data older than retention cutoff) 

435 2. Raw metrics table (for completed hours within retention period) 

436 3. Current hour raw metrics (for the incomplete current hour) 

437 

438 This three-source approach ensures top performers are available immediately 

439 during benchmarks, even before the hourly rollup job has processed the current hour. 

440 

441 Args: 

442 db: Database session 

443 metric_type: Type of metric ('tool', 'resource', 'prompt', 'server', 'a2a_agent') 

444 entity_model: SQLAlchemy model for the entity (Tool, Resource, etc.) 

445 limit: Maximum number of results 

446 order_by: Field to order by ('execution_count', 'avg_response_time', 'failure_rate') 

447 name_column: Name of the column to use as entity name (default: 'name') 

448 include_deleted: Whether to include deleted entities from rollups 

449 

450 Returns: 

451 List of entity metrics dictionaries 

452 

453 Raises: 

454 ValueError: If metric_type is not recognized. 

455 """ 

456 if metric_type not in METRIC_MODELS: 

457 raise ValueError(f"Unknown metric type: {metric_type}") 

458 

459 raw_model, hourly_model, id_col, preserved_name_col = METRIC_MODELS[metric_type] 

460 cutoff = get_retention_cutoff() 

461 current_hour_start = get_current_hour_start() 

462 

463 # Get all entity IDs with their combined metrics from three sources 

464 # This query includes both existing entities and deleted entities (via rollup name preservation) 

465 

466 # Subquery 1: Rollup metrics aggregated by entity (data older than cutoff) 

467 # Group by BOTH entity_id AND preserved_name to keep deleted entities separate 

468 # (when entity is deleted, entity_id becomes NULL, but preserved_name keeps them distinct) 

469 # pylint: disable=not-callable 

470 rollup_subq = ( 

471 select( 

472 getattr(hourly_model, id_col).label("entity_id"), 

473 getattr(hourly_model, preserved_name_col).label("preserved_name"), 

474 func.sum(hourly_model.total_count).label("total"), 

475 func.sum(hourly_model.success_count).label("successful"), 

476 func.sum(hourly_model.failure_count).label("failed"), 

477 # Weighted average for rollups: sum(avg * count) / sum(count) 

478 (func.sum(hourly_model.avg_response_time * hourly_model.total_count) / func.nullif(func.sum(hourly_model.total_count), 0)).label("avg_rt"), 

479 func.max(hourly_model.hour_start).label("last_time"), 

480 ) 

481 .where(hourly_model.hour_start < cutoff) 

482 .group_by(getattr(hourly_model, id_col), getattr(hourly_model, preserved_name_col)) 

483 .subquery() 

484 ) 

485 

486 # Subquery 2: Raw metrics for completed hours (cutoff <= timestamp < current_hour_start) 

487 raw_subq = ( 

488 select( 

489 getattr(raw_model, id_col).label("entity_id"), 

490 func.count(raw_model.id).label("total"), 

491 func.sum(case((raw_model.is_success.is_(True), 1), else_=0)).label("successful"), 

492 func.sum(case((raw_model.is_success.is_(False), 1), else_=0)).label("failed"), 

493 func.avg(raw_model.response_time).label("avg_rt"), 

494 func.max(raw_model.timestamp).label("last_time"), 

495 ) 

496 .where(and_(raw_model.timestamp >= cutoff, raw_model.timestamp < current_hour_start)) 

497 .group_by(getattr(raw_model, id_col)) 

498 .subquery() 

499 ) 

500 

501 # Subquery 3: Current hour raw metrics (timestamp >= current_hour_start) 

502 current_subq = ( 

503 select( 

504 getattr(raw_model, id_col).label("entity_id"), 

505 func.count(raw_model.id).label("total"), 

506 func.sum(case((raw_model.is_success.is_(True), 1), else_=0)).label("successful"), 

507 func.sum(case((raw_model.is_success.is_(False), 1), else_=0)).label("failed"), 

508 func.avg(raw_model.response_time).label("avg_rt"), 

509 func.max(raw_model.timestamp).label("last_time"), 

510 ) 

511 .where(raw_model.timestamp >= current_hour_start) 

512 .group_by(getattr(raw_model, id_col)) 

513 .subquery() 

514 ) 

515 

516 # Get the name column from entity model 

517 entity_name_col = getattr(entity_model, name_column) 

518 

519 # Compute combined totals from all three sources 

520 total_count_expr = func.coalesce(rollup_subq.c.total, 0) + func.coalesce(raw_subq.c.total, 0) + func.coalesce(current_subq.c.total, 0) 

521 successful_expr = func.coalesce(rollup_subq.c.successful, 0) + func.coalesce(raw_subq.c.successful, 0) + func.coalesce(current_subq.c.successful, 0) 

522 failed_expr = func.coalesce(rollup_subq.c.failed, 0) + func.coalesce(raw_subq.c.failed, 0) + func.coalesce(current_subq.c.failed, 0) 

523 

524 # Weighted average across all three sources 

525 # Formula: (avg1 * count1 + avg2 * count2 + avg3 * count3) / (count1 + count2 + count3) 

526 weighted_avg_expr = ( 

527 func.coalesce(rollup_subq.c.avg_rt * func.coalesce(rollup_subq.c.total, 0), 0) 

528 + func.coalesce(raw_subq.c.avg_rt * func.coalesce(raw_subq.c.total, 0), 0) 

529 + func.coalesce(current_subq.c.avg_rt * func.coalesce(current_subq.c.total, 0), 0) 

530 ) / func.nullif(total_count_expr, 0) 

531 

532 # Last execution time (most recent from any source) using GREATEST-like logic 

533 # SQLAlchemy doesn't have a portable GREATEST, so we use COALESCE with preference order 

534 # pylint: disable-next=assignment-from-no-return 

535 last_time_expr = func.coalesce(current_subq.c.last_time, raw_subq.c.last_time, rollup_subq.c.last_time) 

536 

537 # Query: Existing entities with combined metrics from all three sources 

538 existing_entities_query = ( 

539 select( 

540 entity_model.id.label("id"), 

541 func.coalesce(entity_name_col, rollup_subq.c.preserved_name).label("name"), 

542 total_count_expr.label("execution_count"), 

543 successful_expr.label("successful"), 

544 failed_expr.label("failed"), 

545 weighted_avg_expr.label("avg_response_time"), 

546 last_time_expr.label("last_execution"), 

547 literal(False).label("is_deleted"), 

548 ) 

549 .outerjoin(rollup_subq, entity_model.id == rollup_subq.c.entity_id) 

550 .outerjoin(raw_subq, entity_model.id == raw_subq.c.entity_id) 

551 .outerjoin(current_subq, entity_model.id == current_subq.c.entity_id) 

552 .where( 

553 # Only include entities that have metrics in any source 

554 (rollup_subq.c.total.isnot(None)) 

555 | (raw_subq.c.total.isnot(None)) 

556 | (current_subq.c.total.isnot(None)) 

557 ) 

558 ) 

559 

560 if include_deleted: 

561 # Query for deleted entities (exist in rollup but not in entity table) 

562 # Handle NULL properly: entity_id IS NULL (deleted via SET NULL) OR entity_id not in existing entities 

563 existing_ids_subq = select(entity_model.id).subquery() 

564 deleted_entities_query = select( 

565 rollup_subq.c.entity_id.label("id"), 

566 rollup_subq.c.preserved_name.label("name"), 

567 rollup_subq.c.total.label("execution_count"), 

568 rollup_subq.c.successful.label("successful"), 

569 rollup_subq.c.failed.label("failed"), 

570 rollup_subq.c.avg_rt.label("avg_response_time"), 

571 rollup_subq.c.last_time.label("last_execution"), 

572 literal(True).label("is_deleted"), 

573 ).where( 

574 # Include entities with NULL id (deleted via SET NULL) OR entities not in entity table 

575 (rollup_subq.c.entity_id.is_(None)) 

576 | (rollup_subq.c.entity_id.notin_(existing_ids_subq)) 

577 ) 

578 

579 # Combine existing and deleted entities 

580 combined_query = union_all(existing_entities_query, deleted_entities_query).subquery() 

581 else: 

582 combined_query = existing_entities_query.subquery() 

583 

584 # Apply ordering and limit to the combined results 

585 if order_by == "avg_response_time": 

586 final_query = select(combined_query).order_by(combined_query.c.avg_response_time.desc().nullslast()) 

587 elif order_by == "failure_rate": 

588 # Order by failure rate (failed / total) 

589 final_query = select(combined_query).order_by((combined_query.c.failed * 1.0 / func.nullif(combined_query.c.execution_count, 0)).desc().nullslast()) 

590 else: # default: execution_count 

591 final_query = select(combined_query).order_by(combined_query.c.execution_count.desc()) 

592 

593 final_query = final_query.limit(limit) 

594 

595 results = [] 

596 for row in db.execute(final_query).fetchall(): 

597 total = row.execution_count or 0 

598 successful = row.successful or 0 

599 failed = row.failed or 0 

600 success_rate = (successful / total * 100) if total > 0 else None 

601 result_dict = { 

602 "id": row.id, 

603 "name": row.name, 

604 "execution_count": total, 

605 "successful_executions": successful, 

606 "failed_executions": failed, 

607 "failure_rate": failed / total if total > 0 else 0.0, 

608 "success_rate": success_rate, 

609 "avg_response_time": row.avg_response_time, 

610 "last_execution": row.last_execution, 

611 } 

612 # Mark deleted entities so UI can optionally style them differently 

613 if row.is_deleted: 613 ↛ 615line 613 didn't jump to line 615 because the condition on line 613 was always true

614 result_dict["is_deleted"] = True 

615 results.append(result_dict) 

616 

617 return results 

618 

619 

620def get_top_performers_combined( 

621 db: Session, 

622 metric_type: str, 

623 entity_model: Type, 

624 limit: int = 10, 

625 name_column: str = "name", 

626 include_deleted: bool = False, 

627) -> List[TopPerformerResult]: 

628 """Get top performers combining raw and rollup data. 

629 

630 This function wraps get_top_entities_combined and returns TopPerformerResult 

631 objects that are compatible with build_top_performers(). 

632 

633 Args: 

634 db: Database session 

635 metric_type: Type of metric ('tool', 'resource', 'prompt', 'server', 'a2a_agent') 

636 entity_model: SQLAlchemy model for the entity (Tool, Resource, etc.) 

637 limit: Maximum number of results 

638 name_column: Name of the column to use as entity name (default: 'name') 

639 include_deleted: Whether to include deleted entities from rollups 

640 

641 Returns: 

642 List[TopPerformerResult]: List of top performer results 

643 """ 

644 raw_results = get_top_entities_combined( 

645 db=db, 

646 metric_type=metric_type, 

647 entity_model=entity_model, 

648 limit=limit, 

649 order_by="execution_count", 

650 name_column=name_column, 

651 include_deleted=include_deleted, 

652 ) 

653 

654 return [ 

655 TopPerformerResult( 

656 id=r["id"], 

657 name=r["name"], 

658 execution_count=r["execution_count"], 

659 avg_response_time=r["avg_response_time"], 

660 success_rate=r["success_rate"], 

661 last_execution=r["last_execution"], 

662 ) 

663 for r in raw_results 

664 ]