Coverage for mcpgateway / services / log_aggregator.py: 97%

355 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/log_aggregator.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5 

6Log Aggregation Service. 

7 

8This module provides aggregation of performance metrics from structured logs 

9into time-windowed statistics for analysis and monitoring. 

10""" 

11 

12# Standard 

13from datetime import datetime, timedelta, timezone 

14import logging 

15import math 

16import statistics 

17from typing import Any, Dict, List, Optional, Tuple 

18 

19# Third-Party 

20from sqlalchemy import and_, func, select, text 

21from sqlalchemy.orm import Session 

22 

23# First-Party 

24from mcpgateway.config import settings 

25from mcpgateway.db import engine, PerformanceMetric, SessionLocal, StructuredLogEntry 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30def _is_postgresql() -> bool: 

31 """Check if the database backend is PostgreSQL. 

32 

33 Returns: 

34 True if using PostgreSQL, False otherwise. 

35 """ 

36 return engine.dialect.name == "postgresql" 

37 

38 

39class LogAggregator: 

40 """Aggregates structured logs into performance metrics.""" 

41 

42 def __init__(self): 

43 """Initialize log aggregator.""" 

44 self.aggregation_window_minutes = getattr(settings, "metrics_aggregation_window_minutes", 5) 

45 self.enabled = getattr(settings, "metrics_aggregation_enabled", True) 

46 self._use_sql_percentiles = _is_postgresql() 

47 

48 def aggregate_performance_metrics( 

49 self, component: Optional[str], operation_type: Optional[str], window_start: Optional[datetime] = None, window_end: Optional[datetime] = None, db: Optional[Session] = None 

50 ) -> Optional[PerformanceMetric]: 

51 """Aggregate performance metrics for a component and operation. 

52 

53 Args: 

54 component: Component name 

55 operation_type: Operation name 

56 window_start: Start of aggregation window (defaults to N minutes ago) 

57 window_end: End of aggregation window (defaults to now) 

58 db: Optional database session 

59 

60 Returns: 

61 Created PerformanceMetric or None if no data 

62 """ 

63 if not self.enabled: 

64 return None 

65 if not component or not operation_type: 

66 return None 

67 

68 window_start, window_end = self._resolve_window_bounds(window_start, window_end) 

69 

70 should_close = False 

71 if db is None: 

72 db = SessionLocal() 

73 should_close = True 

74 

75 try: 

76 # Use SQL-based aggregation for PostgreSQL, Python fallback for SQLite 

77 if self._use_sql_percentiles: 

78 stats = self._compute_stats_postgresql(db, component, operation_type, window_start, window_end) 

79 else: 

80 stats = self._compute_stats_python(db, component, operation_type, window_start, window_end) 

81 

82 if stats is None: 

83 return None 

84 

85 count = stats["count"] 

86 avg_duration = stats["avg_duration"] 

87 min_duration = stats["min_duration"] 

88 max_duration = stats["max_duration"] 

89 p50 = stats["p50"] 

90 p95 = stats["p95"] 

91 p99 = stats["p99"] 

92 error_count = stats["error_count"] 

93 error_rate = error_count / count if count > 0 else 0.0 

94 

95 metric = self._upsert_metric( 

96 component=component, 

97 operation_type=operation_type, 

98 window_start=window_start, 

99 window_end=window_end, 

100 request_count=count, 

101 error_count=error_count, 

102 error_rate=error_rate, 

103 avg_duration_ms=avg_duration, 

104 min_duration_ms=min_duration, 

105 max_duration_ms=max_duration, 

106 p50_duration_ms=p50, 

107 p95_duration_ms=p95, 

108 p99_duration_ms=p99, 

109 metric_metadata={ 

110 "sample_size": count, 

111 "generated_at": datetime.now(timezone.utc).isoformat(), 

112 }, 

113 db=db, 

114 ) 

115 

116 logger.info(f"Aggregated performance metrics for {component}.{operation_type}: " f"{count} requests, {avg_duration:.2f}ms avg, {error_rate:.2%} error rate") 

117 

118 if should_close: 

119 db.commit() # Commit transaction on success 

120 return metric 

121 

122 except Exception as e: 

123 logger.error(f"Failed to aggregate performance metrics: {e}") 

124 if should_close and db: 

125 db.rollback() 

126 return None 

127 

128 finally: 

129 if should_close: 

130 db.close() 

131 

132 def aggregate_all_components_batch(self, window_starts: List[datetime], window_minutes: int, db: Optional[Session] = None) -> List[PerformanceMetric]: 

133 """Aggregate metrics for all components/operations for multiple windows in a single batch. 

134 

135 This reduces the number of database round-trips by fetching logs for the full 

136 time span once per component/operation and partitioning them into windows in 

137 Python, then upserting per-window metrics. 

138 

139 Args: 

140 window_starts: List of window start datetimes (UTC) 

141 window_minutes: Window size in minutes 

142 db: Optional database session 

143 

144 Returns: 

145 List of created/updated PerformanceMetric records 

146 

147 Raises: 

148 Exception: If a database operation fails during aggregation. 

149 """ 

150 if not self.enabled: 

151 return [] 

152 

153 if not window_starts: 

154 return [] 

155 

156 should_close = False 

157 if db is None: 

158 db = SessionLocal() 

159 should_close = True 

160 

161 try: 

162 window_delta = timedelta(minutes=window_minutes) 

163 # Determine full range to query once 

164 full_start = min(window_starts) 

165 full_end = max(window_starts) + window_delta 

166 

167 # Validate window_starts is contiguous - warn if sparse (batch generates all windows in range) 

168 expected_window_count = int((full_end - full_start).total_seconds() / (window_minutes * 60)) 

169 if len(window_starts) != expected_window_count: 

170 logger.warning( 

171 "Batch aggregation received %d windows but range spans %d; sparse windows will generate extra metrics", 

172 len(window_starts), 

173 expected_window_count, 

174 ) 

175 

176 # If PostgreSQL is available, use a single SQL rollup with generate_series and ordered-set aggregates 

177 if _is_postgresql(): 

178 sql = text(""" 

179 WITH windows AS ( 

180 SELECT generate_series(:full_start::timestamptz, (:full_end - (:window_minutes || ' minutes')::interval)::timestamptz, (:window_minutes || ' minutes')::interval) AS window_start 

181 ), pairs AS ( 

182 SELECT DISTINCT component, operation_type FROM structured_log_entries 

183 WHERE timestamp >= :full_start AND timestamp < :full_end 

184 AND duration_ms IS NOT NULL 

185 AND component IS NOT NULL AND component <> '' 

186 AND operation_type IS NOT NULL AND operation_type <> '' 

187 ) 

188 SELECT 

189 w.window_start, 

190 p.component, 

191 p.operation_type, 

192 COUNT(sle.duration_ms) AS cnt, 

193 AVG(sle.duration_ms) AS avg_duration, 

194 MIN(sle.duration_ms) AS min_duration, 

195 MAX(sle.duration_ms) AS max_duration, 

196 percentile_cont(0.50) WITHIN GROUP (ORDER BY sle.duration_ms) AS p50, 

197 percentile_cont(0.95) WITHIN GROUP (ORDER BY sle.duration_ms) AS p95, 

198 percentile_cont(0.99) WITHIN GROUP (ORDER BY sle.duration_ms) AS p99, 

199 SUM(CASE WHEN upper(sle.level) IN ('ERROR','CRITICAL') OR sle.error_details IS NOT NULL THEN 1 ELSE 0 END) AS error_count 

200 FROM windows w 

201 CROSS JOIN pairs p 

202 JOIN structured_log_entries sle 

203 ON sle.timestamp >= w.window_start AND sle.timestamp < w.window_start + (:window_minutes || ' minutes')::interval 

204 AND sle.component = p.component AND sle.operation_type = p.operation_type 

205 AND sle.duration_ms IS NOT NULL 

206 GROUP BY w.window_start, p.component, p.operation_type 

207 HAVING COUNT(sle.duration_ms) > 0 

208 ORDER BY w.window_start, p.component, p.operation_type 

209 """) 

210 

211 rows = db.execute( 

212 sql, 

213 { 

214 "full_start": full_start, 

215 "full_end": full_end, 

216 "window_minutes": str(window_minutes), 

217 }, 

218 ).fetchall() 

219 

220 created_metrics: List[PerformanceMetric] = [] 

221 for row in rows: 

222 ws = row.window_start if row.window_start.tzinfo else row.window_start.replace(tzinfo=timezone.utc) 

223 component = row.component 

224 operation = row.operation_type 

225 count = int(row.cnt) 

226 avg_duration = float(row.avg_duration) if row.avg_duration is not None else 0.0 

227 min_duration = float(row.min_duration) if row.min_duration is not None else 0.0 

228 max_duration = float(row.max_duration) if row.max_duration is not None else 0.0 

229 p50 = float(row.p50) if row.p50 is not None else 0.0 

230 p95 = float(row.p95) if row.p95 is not None else 0.0 

231 p99 = float(row.p99) if row.p99 is not None else 0.0 

232 error_count = int(row.error_count) if row.error_count is not None else 0 

233 

234 metric = self._upsert_metric( 

235 component=component, 

236 operation_type=operation, 

237 window_start=ws, 

238 window_end=ws + window_delta, 

239 request_count=count, 

240 error_count=error_count, 

241 error_rate=(error_count / count) if count else 0.0, 

242 avg_duration_ms=avg_duration, 

243 min_duration_ms=min_duration, 

244 max_duration_ms=max_duration, 

245 p50_duration_ms=p50, 

246 p95_duration_ms=p95, 

247 p99_duration_ms=p99, 

248 metric_metadata={ 

249 "sample_size": count, 

250 "generated_at": datetime.now(timezone.utc).isoformat(), 

251 }, 

252 db=db, 

253 ) 

254 if metric: 

255 created_metrics.append(metric) 

256 

257 if should_close: 

258 db.commit() 

259 return created_metrics 

260 

261 # Fallback: in-Python bucketing (previous implementation) 

262 # Warning: This path loads all entries into memory; for very large ranges this may spike memory usage 

263 range_hours = (full_end - full_start).total_seconds() / 3600 

264 if range_hours > 168: # > 1 week 

265 logger.warning("Large aggregation range (%.1f hours) may cause high memory usage in non-PostgreSQL fallback", range_hours) 

266 

267 # Get unique component/operation pairs for the full range 

268 pair_stmt = ( 

269 select(StructuredLogEntry.component, StructuredLogEntry.operation_type) 

270 .where( 

271 and_( 

272 StructuredLogEntry.timestamp >= full_start, 

273 StructuredLogEntry.timestamp < full_end, 

274 StructuredLogEntry.duration_ms.isnot(None), 

275 StructuredLogEntry.component.isnot(None), 

276 StructuredLogEntry.component != "", 

277 StructuredLogEntry.operation_type.isnot(None), 

278 StructuredLogEntry.operation_type != "", 

279 ) 

280 ) 

281 .distinct() 

282 ) 

283 

284 pairs = db.execute(pair_stmt).all() 

285 

286 created_metrics: List[PerformanceMetric] = [] 

287 

288 # helper to align timestamp to window start 

289 def _align_to_window_local(dt: datetime, minutes: int) -> datetime: 

290 """Align a datetime to the start of its aggregation window. 

291 

292 Args: 

293 dt: The datetime to align. 

294 minutes: The window size in minutes. 

295 

296 Returns: 

297 The datetime aligned to the start of the window. 

298 """ 

299 ts = dt.astimezone(timezone.utc) 

300 total_minutes = int(ts.timestamp() // 60) 

301 aligned_minutes = (total_minutes // minutes) * minutes 

302 return datetime.fromtimestamp(aligned_minutes * 60, tz=timezone.utc) 

303 

304 for component, operation in pairs: 

305 if not component or not operation: 

306 continue 

307 

308 # Fetch all relevant log entries for this component/operation in the full range 

309 entries_stmt = select(StructuredLogEntry).where( 

310 and_( 

311 StructuredLogEntry.component == component, 

312 StructuredLogEntry.operation_type == operation, 

313 StructuredLogEntry.timestamp >= full_start, 

314 StructuredLogEntry.timestamp < full_end, 

315 StructuredLogEntry.duration_ms.isnot(None), 

316 ) 

317 ) 

318 

319 entries = db.execute(entries_stmt).scalars().all() 

320 if not entries: 

321 continue 

322 

323 # Bucket entries into windows 

324 buckets: Dict[datetime, List[StructuredLogEntry]] = {} 

325 for e in entries: 

326 ts = e.timestamp if e.timestamp.tzinfo else e.timestamp.replace(tzinfo=timezone.utc) 

327 bucket_start = _align_to_window_local(ts, window_minutes) 

328 if bucket_start not in buckets: 

329 buckets[bucket_start] = [] 

330 buckets[bucket_start].append(e) 

331 

332 # For each requested window, compute stats if we have data 

333 for window_start in window_starts: 

334 bucket_entries = buckets.get(window_start) 

335 if not bucket_entries: 

336 continue 

337 

338 durations = sorted([b.duration_ms for b in bucket_entries if b.duration_ms is not None]) 

339 if not durations: 

340 continue 

341 

342 count = len(durations) 

343 avg_duration = float(sum(durations) / count) if count else 0.0 

344 min_duration = float(durations[0]) 

345 max_duration = float(durations[-1]) 

346 p50 = float(self._percentile(durations, 0.5)) 

347 p95 = float(self._percentile(durations, 0.95)) 

348 p99 = float(self._percentile(durations, 0.99)) 

349 error_count = self._calculate_error_count(bucket_entries) 

350 

351 try: 

352 metric = self._upsert_metric( 

353 component=component, 

354 operation_type=operation, 

355 window_start=window_start, 

356 window_end=window_start + window_delta, 

357 request_count=count, 

358 error_count=error_count, 

359 error_rate=(error_count / count) if count else 0.0, 

360 avg_duration_ms=avg_duration, 

361 min_duration_ms=min_duration, 

362 max_duration_ms=max_duration, 

363 p50_duration_ms=p50, 

364 p95_duration_ms=p95, 

365 p99_duration_ms=p99, 

366 metric_metadata={ 

367 "sample_size": count, 

368 "generated_at": datetime.now(timezone.utc).isoformat(), 

369 }, 

370 db=db, 

371 ) 

372 if metric: 

373 created_metrics.append(metric) 

374 except Exception: 

375 logger.exception("Failed to upsert metric for %s.%s window %s", component, operation, window_start) 

376 

377 if should_close: 

378 db.commit() 

379 return created_metrics 

380 except Exception: 

381 if should_close: 

382 db.rollback() 

383 raise 

384 finally: 

385 if should_close: 

386 db.close() 

387 

388 def aggregate_all_components(self, window_start: Optional[datetime] = None, window_end: Optional[datetime] = None, db: Optional[Session] = None) -> List[PerformanceMetric]: 

389 """Aggregate metrics for all components and operations. 

390 

391 Args: 

392 window_start: Start of aggregation window 

393 window_end: End of aggregation window 

394 db: Optional database session 

395 

396 Returns: 

397 List of created PerformanceMetric records 

398 

399 Raises: 

400 Exception: If database operation fails 

401 """ 

402 if not self.enabled: 

403 return [] 

404 

405 should_close = False 

406 if db is None: 

407 db = SessionLocal() 

408 should_close = True 

409 

410 try: 

411 window_start, window_end = self._resolve_window_bounds(window_start, window_end) 

412 

413 stmt = ( 

414 select(StructuredLogEntry.component, StructuredLogEntry.operation_type) 

415 .where( 

416 and_( 

417 StructuredLogEntry.timestamp >= window_start, 

418 StructuredLogEntry.timestamp < window_end, 

419 StructuredLogEntry.duration_ms.isnot(None), 

420 StructuredLogEntry.operation_type.isnot(None), 

421 ) 

422 ) 

423 .distinct() 

424 ) 

425 

426 pairs = db.execute(stmt).all() 

427 

428 metrics = [] 

429 for component, operation in pairs: 

430 if component and operation: 

431 metric = self.aggregate_performance_metrics(component=component, operation_type=operation, window_start=window_start, window_end=window_end, db=db) 

432 if metric: 

433 metrics.append(metric) 

434 

435 if should_close: 

436 db.commit() # Commit on success 

437 return metrics 

438 

439 except Exception: 

440 if should_close: 

441 db.rollback() 

442 raise 

443 

444 finally: 

445 if should_close: 

446 db.close() 

447 

448 def get_recent_metrics(self, component: Optional[str] = None, operation: Optional[str] = None, hours: int = 24, db: Optional[Session] = None) -> List[PerformanceMetric]: 

449 """Get recent performance metrics. 

450 

451 Args: 

452 component: Optional component filter 

453 operation: Optional operation filter 

454 hours: Hours of history to retrieve 

455 db: Optional database session 

456 

457 Returns: 

458 List of PerformanceMetric records 

459 

460 Raises: 

461 Exception: If database operation fails 

462 """ 

463 should_close = False 

464 if db is None: 

465 db = SessionLocal() 

466 should_close = True 

467 

468 try: 

469 since = datetime.now(timezone.utc) - timedelta(hours=hours) 

470 

471 stmt = select(PerformanceMetric).where(PerformanceMetric.window_start >= since) 

472 

473 if component: 

474 stmt = stmt.where(PerformanceMetric.component == component) 

475 if operation: 

476 stmt = stmt.where(PerformanceMetric.operation_type == operation) 

477 

478 stmt = stmt.order_by(PerformanceMetric.window_start.desc()) 

479 

480 result = db.execute(stmt).scalars().all() 

481 if should_close: 

482 db.commit() # Commit on success 

483 return result 

484 

485 except Exception: 

486 if should_close: 

487 db.rollback() 

488 raise 

489 

490 finally: 

491 if should_close: 

492 db.close() 

493 

494 def get_degradation_alerts(self, threshold_multiplier: float = 1.5, hours: int = 24, db: Optional[Session] = None) -> List[Dict[str, Any]]: 

495 """Identify performance degradations by comparing recent vs baseline. 

496 

497 Args: 

498 threshold_multiplier: Alert if recent is X times slower than baseline 

499 hours: Hours of recent data to check 

500 db: Optional database session 

501 

502 Returns: 

503 List of degradation alerts with details 

504 

505 Raises: 

506 Exception: If database operation fails 

507 """ 

508 should_close = False 

509 if db is None: 

510 db = SessionLocal() 

511 should_close = True 

512 

513 try: 

514 recent_cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) 

515 baseline_cutoff = recent_cutoff - timedelta(hours=hours * 2) 

516 

517 # Get unique component/operation pairs 

518 stmt = select(PerformanceMetric.component, PerformanceMetric.operation_type).distinct() 

519 

520 pairs = db.execute(stmt).all() 

521 

522 alerts = [] 

523 for component, operation in pairs: 

524 # Get recent metrics 

525 recent_stmt = select(PerformanceMetric).where( 

526 and_(PerformanceMetric.component == component, PerformanceMetric.operation_type == operation, PerformanceMetric.window_start >= recent_cutoff) 

527 ) 

528 recent_metrics = db.execute(recent_stmt).scalars().all() 

529 

530 # Get baseline metrics 

531 baseline_stmt = select(PerformanceMetric).where( 

532 and_( 

533 PerformanceMetric.component == component, 

534 PerformanceMetric.operation_type == operation, 

535 PerformanceMetric.window_start >= baseline_cutoff, 

536 PerformanceMetric.window_start < recent_cutoff, 

537 ) 

538 ) 

539 baseline_metrics = db.execute(baseline_stmt).scalars().all() 

540 

541 if not recent_metrics or not baseline_metrics: 

542 continue 

543 

544 recent_avg = statistics.mean([m.avg_duration_ms for m in recent_metrics]) 

545 baseline_avg = statistics.mean([m.avg_duration_ms for m in baseline_metrics]) 

546 

547 if recent_avg > baseline_avg * threshold_multiplier: 

548 alerts.append( 

549 { 

550 "component": component, 

551 "operation": operation, 

552 "recent_avg_ms": recent_avg, 

553 "baseline_avg_ms": baseline_avg, 

554 "degradation_ratio": recent_avg / baseline_avg, 

555 "recent_error_rate": statistics.mean([m.error_rate for m in recent_metrics]), 

556 "baseline_error_rate": statistics.mean([m.error_rate for m in baseline_metrics]), 

557 } 

558 ) 

559 

560 if should_close: 

561 db.commit() # Commit on success 

562 return alerts 

563 

564 except Exception: 

565 if should_close: 

566 db.rollback() 

567 raise 

568 

569 finally: 

570 if should_close: 

571 db.close() 

572 

573 def backfill(self, hours: float, db: Optional[Session] = None) -> int: 

574 """Backfill metrics for a historical time range. 

575 

576 Args: 

577 hours: Number of hours of history to aggregate (supports fractional hours) 

578 db: Optional shared database session 

579 

580 Returns: 

581 Count of performance metric windows processed 

582 

583 Raises: 

584 Exception: If database operation fails 

585 """ 

586 if not self.enabled or hours <= 0: 

587 return 0 

588 

589 window_minutes = self.aggregation_window_minutes 

590 window_delta = timedelta(minutes=window_minutes) 

591 total_windows = max(1, math.ceil((hours * 60) / window_minutes)) 

592 

593 should_close = False 

594 if db is None: 

595 db = SessionLocal() 

596 should_close = True 

597 

598 try: 

599 _, latest_end = self._resolve_window_bounds(None, None) 

600 current_start = latest_end - (window_delta * total_windows) 

601 processed = 0 

602 

603 while current_start < latest_end: 

604 current_end = current_start + window_delta 

605 created = self.aggregate_all_components( 

606 window_start=current_start, 

607 window_end=current_end, 

608 db=db, 

609 ) 

610 if created: 

611 processed += 1 

612 current_start = current_end 

613 

614 if should_close: 

615 db.commit() # Commit on success 

616 return processed 

617 

618 except Exception: 

619 if should_close: 

620 db.rollback() 

621 raise 

622 

623 finally: 

624 if should_close: 

625 db.close() 

626 

627 @staticmethod 

628 def _percentile(sorted_values: List[float], percentile: float) -> float: 

629 """Calculate percentile from sorted values. 

630 

631 Args: 

632 sorted_values: Sorted list of values 

633 percentile: Percentile to calculate (0.0 to 1.0) 

634 

635 Returns: 

636 float: Calculated percentile value 

637 """ 

638 if not sorted_values: 

639 return 0.0 

640 

641 if len(sorted_values) == 1: 

642 return float(sorted_values[0]) 

643 

644 k = (len(sorted_values) - 1) * percentile 

645 f = math.floor(k) 

646 c = math.ceil(k) 

647 

648 if f == c: 

649 return float(sorted_values[int(k)]) 

650 

651 d0 = sorted_values[f] * (c - k) 

652 d1 = sorted_values[c] * (k - f) 

653 return float(d0 + d1) 

654 

655 @staticmethod 

656 def _calculate_error_count(entries: List[StructuredLogEntry]) -> int: 

657 """Calculate error occurrences for a batch of log entries. 

658 

659 Args: 

660 entries: List of log entries to analyze 

661 

662 Returns: 

663 int: Count of error entries 

664 """ 

665 error_levels = {"ERROR", "CRITICAL"} 

666 return sum(1 for entry in entries if (entry.level and entry.level.upper() in error_levels) or entry.error_details) 

667 

668 def _compute_stats_postgresql( 

669 self, 

670 db: Session, 

671 component: str, 

672 operation_type: str, 

673 window_start: datetime, 

674 window_end: datetime, 

675 ) -> Optional[Dict[str, Any]]: 

676 """Compute aggregation statistics using PostgreSQL SQL functions. 

677 

678 Uses PostgreSQL's percentile_cont for efficient in-database percentile 

679 computation, avoiding loading all rows into Python memory. 

680 

681 Args: 

682 db: Database session 

683 component: Component name to filter by 

684 operation_type: Operation type to filter by 

685 window_start: Start of the aggregation window 

686 window_end: End of the aggregation window 

687 

688 Returns: 

689 Dictionary with count, avg_duration, min_duration, max_duration, 

690 p50, p95, p99, and error_count, or None if no data. 

691 """ 

692 # Build base filter conditions 

693 base_conditions = and_( 

694 StructuredLogEntry.component == component, 

695 StructuredLogEntry.operation_type == operation_type, 

696 StructuredLogEntry.timestamp >= window_start, 

697 StructuredLogEntry.timestamp < window_end, 

698 StructuredLogEntry.duration_ms.isnot(None), 

699 ) 

700 

701 # First, check if there are any rows and get error count 

702 # (error count requires examining level/error_details which can't be done purely in SQL aggregate) 

703 count_stmt = select(func.count()).select_from(StructuredLogEntry).where(base_conditions) # pylint: disable=not-callable 

704 count_result = db.execute(count_stmt).scalar() 

705 

706 if not count_result or count_result == 0: 

707 return None 

708 

709 # PostgreSQL percentile_cont query using ordered-set aggregate functions 

710 # This computes all statistics in a single query 

711 stats_sql = text(""" 

712 SELECT 

713 COUNT(duration_ms) as cnt, 

714 AVG(duration_ms) as avg_duration, 

715 MIN(duration_ms) as min_duration, 

716 MAX(duration_ms) as max_duration, 

717 percentile_cont(0.50) WITHIN GROUP (ORDER BY duration_ms) as p50, 

718 percentile_cont(0.95) WITHIN GROUP (ORDER BY duration_ms) as p95, 

719 percentile_cont(0.99) WITHIN GROUP (ORDER BY duration_ms) as p99 

720 FROM structured_log_entries 

721 WHERE component = :component 

722 AND operation_type = :operation_type 

723 AND timestamp >= :window_start 

724 AND timestamp < :window_end 

725 AND duration_ms IS NOT NULL 

726 """) 

727 

728 result = db.execute( 

729 stats_sql, 

730 { 

731 "component": component, 

732 "operation_type": operation_type, 

733 "window_start": window_start, 

734 "window_end": window_end, 

735 }, 

736 ).fetchone() 

737 

738 if not result or result.cnt == 0: 

739 return None 

740 

741 # Get error count separately (requires level/error_details examination) 

742 error_stmt = ( 

743 select(func.count()) # pylint: disable=not-callable 

744 .select_from(StructuredLogEntry) 

745 .where( 

746 and_( 

747 base_conditions, 

748 ((func.upper(StructuredLogEntry.level).in_(["ERROR", "CRITICAL"])) | (StructuredLogEntry.error_details.isnot(None))), 

749 ) 

750 ) 

751 ) 

752 error_count = db.execute(error_stmt).scalar() or 0 

753 

754 return { 

755 "count": result.cnt, 

756 "avg_duration": float(result.avg_duration) if result.avg_duration else 0.0, 

757 "min_duration": float(result.min_duration) if result.min_duration else 0.0, 

758 "max_duration": float(result.max_duration) if result.max_duration else 0.0, 

759 "p50": float(result.p50) if result.p50 else 0.0, 

760 "p95": float(result.p95) if result.p95 else 0.0, 

761 "p99": float(result.p99) if result.p99 else 0.0, 

762 "error_count": error_count, 

763 } 

764 

765 def _compute_stats_python( 

766 self, 

767 db: Session, 

768 component: str, 

769 operation_type: str, 

770 window_start: datetime, 

771 window_end: datetime, 

772 ) -> Optional[Dict[str, Any]]: 

773 """Compute aggregation statistics using Python (fallback for SQLite). 

774 

775 Loads duration values into memory and computes statistics in Python. 

776 Used when database doesn't support native percentile functions. 

777 

778 Args: 

779 db: Database session 

780 component: Component name to filter by 

781 operation_type: Operation type to filter by 

782 window_start: Start of the aggregation window 

783 window_end: End of the aggregation window 

784 

785 Returns: 

786 Dictionary with count, avg_duration, min_duration, max_duration, 

787 p50, p95, p99, and error_count, or None if no data. 

788 """ 

789 # Query structured logs for this component/operation in time window 

790 stmt = select(StructuredLogEntry).where( 

791 and_( 

792 StructuredLogEntry.component == component, 

793 StructuredLogEntry.operation_type == operation_type, 

794 StructuredLogEntry.timestamp >= window_start, 

795 StructuredLogEntry.timestamp < window_end, 

796 StructuredLogEntry.duration_ms.isnot(None), 

797 ) 

798 ) 

799 

800 results = db.execute(stmt).scalars().all() 

801 

802 if not results: 

803 return None 

804 

805 # Extract durations 

806 durations = sorted(r.duration_ms for r in results if r.duration_ms is not None) 

807 

808 if not durations: 

809 return None 

810 

811 # Calculate statistics 

812 count = len(durations) 

813 avg_duration = statistics.fmean(durations) if hasattr(statistics, "fmean") else statistics.mean(durations) 

814 min_duration = durations[0] 

815 max_duration = durations[-1] 

816 

817 # Calculate percentiles 

818 p50 = self._percentile(durations, 0.50) 

819 p95 = self._percentile(durations, 0.95) 

820 p99 = self._percentile(durations, 0.99) 

821 

822 # Count errors 

823 error_count = self._calculate_error_count(results) 

824 

825 return { 

826 "count": count, 

827 "avg_duration": avg_duration, 

828 "min_duration": min_duration, 

829 "max_duration": max_duration, 

830 "p50": p50, 

831 "p95": p95, 

832 "p99": p99, 

833 "error_count": error_count, 

834 } 

835 

836 def _resolve_window_bounds( 

837 self, 

838 window_start: Optional[datetime], 

839 window_end: Optional[datetime], 

840 ) -> Tuple[datetime, datetime]: 

841 """Resolve and normalize aggregation window bounds. 

842 

843 Args: 

844 window_start: Start of window or None to calculate 

845 window_end: End of window or None for current time 

846 

847 Returns: 

848 Tuple[datetime, datetime]: Resolved window start and end 

849 """ 

850 window_delta = timedelta(minutes=self.aggregation_window_minutes) 

851 

852 if window_start is not None and window_end is not None: 

853 resolved_start = window_start.astimezone(timezone.utc) 

854 resolved_end = window_end.astimezone(timezone.utc) 

855 if resolved_end <= resolved_start: 

856 resolved_end = resolved_start + window_delta 

857 return resolved_start, resolved_end 

858 

859 if window_end is None: 

860 reference = datetime.now(timezone.utc) 

861 else: 

862 reference = window_end.astimezone(timezone.utc) 

863 

864 reference = reference.replace(second=0, microsecond=0) 

865 minutes_offset = reference.minute % self.aggregation_window_minutes 

866 if window_end is None and minutes_offset: 

867 reference = reference - timedelta(minutes=minutes_offset) 

868 

869 resolved_end = reference 

870 

871 if window_start is None: 

872 resolved_start = resolved_end - window_delta 

873 else: 

874 resolved_start = window_start.astimezone(timezone.utc) 

875 

876 if resolved_end <= resolved_start: 

877 resolved_start = resolved_end - window_delta 

878 

879 return resolved_start, resolved_end 

880 

881 def _upsert_metric( 

882 self, 

883 component: str, 

884 operation_type: str, 

885 window_start: datetime, 

886 window_end: datetime, 

887 request_count: int, 

888 error_count: int, 

889 error_rate: float, 

890 avg_duration_ms: float, 

891 min_duration_ms: float, 

892 max_duration_ms: float, 

893 p50_duration_ms: float, 

894 p95_duration_ms: float, 

895 p99_duration_ms: float, 

896 metric_metadata: Optional[Dict[str, Any]], 

897 db: Session, 

898 ) -> PerformanceMetric: 

899 """Create or update a performance metric window. 

900 

901 Args: 

902 component: Component name 

903 operation_type: Operation type 

904 window_start: Window start time 

905 window_end: Window end time 

906 request_count: Total request count 

907 error_count: Total error count 

908 error_rate: Error rate (0.0-1.0) 

909 avg_duration_ms: Average duration in milliseconds 

910 min_duration_ms: Minimum duration in milliseconds 

911 max_duration_ms: Maximum duration in milliseconds 

912 p50_duration_ms: 50th percentile duration 

913 p95_duration_ms: 95th percentile duration 

914 p99_duration_ms: 99th percentile duration 

915 metric_metadata: Additional metadata 

916 db: Database session 

917 

918 Returns: 

919 PerformanceMetric: Created or updated metric 

920 """ 

921 

922 existing_stmt = select(PerformanceMetric).where( 

923 and_( 

924 PerformanceMetric.component == component, 

925 PerformanceMetric.operation_type == operation_type, 

926 PerformanceMetric.window_start == window_start, 

927 PerformanceMetric.window_end == window_end, 

928 ) 

929 ) 

930 

931 existing_metrics = db.execute(existing_stmt).scalars().all() 

932 metric = existing_metrics[0] if existing_metrics else None 

933 

934 if len(existing_metrics) > 1: 

935 logger.warning( 

936 "Found %s duplicate performance metric rows for %s.%s window %s-%s; pruning extras", 

937 len(existing_metrics), 

938 component, 

939 operation_type, 

940 window_start.isoformat(), 

941 window_end.isoformat(), 

942 ) 

943 for duplicate in existing_metrics[1:]: 

944 db.delete(duplicate) 

945 

946 if metric is None: 

947 metric = PerformanceMetric( 

948 component=component, 

949 operation_type=operation_type, 

950 window_start=window_start, 

951 window_end=window_end, 

952 window_duration_seconds=int((window_end - window_start).total_seconds()), 

953 ) 

954 db.add(metric) 

955 

956 metric.request_count = request_count 

957 metric.error_count = error_count 

958 metric.error_rate = error_rate 

959 metric.avg_duration_ms = avg_duration_ms 

960 metric.min_duration_ms = min_duration_ms 

961 metric.max_duration_ms = max_duration_ms 

962 metric.p50_duration_ms = p50_duration_ms 

963 metric.p95_duration_ms = p95_duration_ms 

964 metric.p99_duration_ms = p99_duration_ms 

965 metric.metric_metadata = metric_metadata 

966 

967 db.commit() 

968 db.refresh(metric) 

969 return metric 

970 

971 

972# Global log aggregator instance 

973_log_aggregator: Optional[LogAggregator] = None 

974 

975 

976def get_log_aggregator() -> LogAggregator: 

977 """Get or create the global log aggregator instance. 

978 

979 Returns: 

980 Global LogAggregator instance 

981 """ 

982 global _log_aggregator # pylint: disable=global-statement 

983 if _log_aggregator is None: 

984 _log_aggregator = LogAggregator() 

985 return _log_aggregator