Coverage for mcpgateway / services / log_aggregator.py: 94%

355 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/log_aggregator.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5 

6Log Aggregation Service. 

7 

8This module provides aggregation of performance metrics from structured logs 

9into time-windowed statistics for analysis and monitoring. 

10""" 

11 

12# Standard 

13from datetime import datetime, timedelta, timezone 

14import logging 

15import math 

16import statistics 

17from typing import Any, Dict, List, Optional, Tuple 

18 

19# Third-Party 

20from sqlalchemy import and_, func, select, text 

21from sqlalchemy.orm import Session 

22 

23# First-Party 

24from mcpgateway.config import settings 

25from mcpgateway.db import engine, PerformanceMetric, SessionLocal, StructuredLogEntry 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30def _is_postgresql() -> bool: 

31 """Check if the database backend is PostgreSQL. 

32 

33 Returns: 

34 True if using PostgreSQL, False otherwise. 

35 """ 

36 return engine.dialect.name == "postgresql" 

37 

38 

39class LogAggregator: 

40 """Aggregates structured logs into performance metrics.""" 

41 

42 def __init__(self): 

43 """Initialize log aggregator.""" 

44 self.aggregation_window_minutes = getattr(settings, "metrics_aggregation_window_minutes", 5) 

45 self.enabled = getattr(settings, "metrics_aggregation_enabled", True) 

46 self._use_sql_percentiles = _is_postgresql() 

47 

48 def aggregate_performance_metrics( 

49 self, component: Optional[str], operation_type: Optional[str], window_start: Optional[datetime] = None, window_end: Optional[datetime] = None, db: Optional[Session] = None 

50 ) -> Optional[PerformanceMetric]: 

51 """Aggregate performance metrics for a component and operation. 

52 

53 Args: 

54 component: Component name 

55 operation_type: Operation name 

56 window_start: Start of aggregation window (defaults to N minutes ago) 

57 window_end: End of aggregation window (defaults to now) 

58 db: Optional database session 

59 

60 Returns: 

61 Created PerformanceMetric or None if no data 

62 """ 

63 if not self.enabled: 

64 return None 

65 if not component or not operation_type: 

66 return None 

67 

68 window_start, window_end = self._resolve_window_bounds(window_start, window_end) 

69 

70 should_close = False 

71 if db is None: 71 ↛ 75line 71 didn't jump to line 75 because the condition on line 71 was always true

72 db = SessionLocal() 

73 should_close = True 

74 

75 try: 

76 # Use SQL-based aggregation for PostgreSQL, Python fallback for SQLite 

77 if self._use_sql_percentiles: 

78 stats = self._compute_stats_postgresql(db, component, operation_type, window_start, window_end) 

79 else: 

80 stats = self._compute_stats_python(db, component, operation_type, window_start, window_end) 

81 

82 if stats is None: 

83 return None 

84 

85 count = stats["count"] 

86 avg_duration = stats["avg_duration"] 

87 min_duration = stats["min_duration"] 

88 max_duration = stats["max_duration"] 

89 p50 = stats["p50"] 

90 p95 = stats["p95"] 

91 p99 = stats["p99"] 

92 error_count = stats["error_count"] 

93 error_rate = error_count / count if count > 0 else 0.0 

94 

95 metric = self._upsert_metric( 

96 component=component, 

97 operation_type=operation_type, 

98 window_start=window_start, 

99 window_end=window_end, 

100 request_count=count, 

101 error_count=error_count, 

102 error_rate=error_rate, 

103 avg_duration_ms=avg_duration, 

104 min_duration_ms=min_duration, 

105 max_duration_ms=max_duration, 

106 p50_duration_ms=p50, 

107 p95_duration_ms=p95, 

108 p99_duration_ms=p99, 

109 metric_metadata={ 

110 "sample_size": count, 

111 "generated_at": datetime.now(timezone.utc).isoformat(), 

112 }, 

113 db=db, 

114 ) 

115 

116 logger.info(f"Aggregated performance metrics for {component}.{operation_type}: " f"{count} requests, {avg_duration:.2f}ms avg, {error_rate:.2%} error rate") 

117 

118 if should_close: 118 ↛ 120line 118 didn't jump to line 120 because the condition on line 118 was always true

119 db.commit() # Commit transaction on success 

120 return metric 

121 

122 except Exception as e: 

123 logger.error(f"Failed to aggregate performance metrics: {e}") 

124 if should_close and db: 124 ↛ 126line 124 didn't jump to line 126 because the condition on line 124 was always true

125 db.rollback() 

126 return None 

127 

128 finally: 

129 if should_close: 

130 db.close() 

131 

132 def aggregate_all_components_batch(self, window_starts: List[datetime], window_minutes: int, db: Optional[Session] = None) -> List[PerformanceMetric]: 

133 """Aggregate metrics for all components/operations for multiple windows in a single batch. 

134 

135 This reduces the number of database round-trips by fetching logs for the full 

136 time span once per component/operation and partitioning them into windows in 

137 Python, then upserting per-window metrics. 

138 

139 Args: 

140 window_starts: List of window start datetimes (UTC) 

141 window_minutes: Window size in minutes 

142 db: Optional database session 

143 

144 Returns: 

145 List of created/updated PerformanceMetric records 

146 

147 Raises: 

148 Exception: If a database operation fails during aggregation. 

149 """ 

150 if not self.enabled: 

151 return [] 

152 

153 if not window_starts: 

154 return [] 

155 

156 should_close = False 

157 if db is None: 

158 db = SessionLocal() 

159 should_close = True 

160 

161 try: 

162 window_delta = timedelta(minutes=window_minutes) 

163 # Determine full range to query once 

164 full_start = min(window_starts) 

165 full_end = max(window_starts) + window_delta 

166 

167 # Validate window_starts is contiguous - warn if sparse (batch generates all windows in range) 

168 expected_window_count = int((full_end - full_start).total_seconds() / (window_minutes * 60)) 

169 if len(window_starts) != expected_window_count: 

170 logger.warning( 

171 "Batch aggregation received %d windows but range spans %d; sparse windows will generate extra metrics", 

172 len(window_starts), 

173 expected_window_count, 

174 ) 

175 

176 # If PostgreSQL is available, use a single SQL rollup with generate_series and ordered-set aggregates 

177 if _is_postgresql(): 

178 sql = text( 

179 """ 

180 WITH windows AS ( 

181 SELECT generate_series(:full_start::timestamptz, (:full_end - (:window_minutes || ' minutes')::interval)::timestamptz, (:window_minutes || ' minutes')::interval) AS window_start 

182 ), pairs AS ( 

183 SELECT DISTINCT component, operation_type FROM structured_log_entries 

184 WHERE timestamp >= :full_start AND timestamp < :full_end 

185 AND duration_ms IS NOT NULL 

186 AND component IS NOT NULL AND component <> '' 

187 AND operation_type IS NOT NULL AND operation_type <> '' 

188 ) 

189 SELECT 

190 w.window_start, 

191 p.component, 

192 p.operation_type, 

193 COUNT(sle.duration_ms) AS cnt, 

194 AVG(sle.duration_ms) AS avg_duration, 

195 MIN(sle.duration_ms) AS min_duration, 

196 MAX(sle.duration_ms) AS max_duration, 

197 percentile_cont(0.50) WITHIN GROUP (ORDER BY sle.duration_ms) AS p50, 

198 percentile_cont(0.95) WITHIN GROUP (ORDER BY sle.duration_ms) AS p95, 

199 percentile_cont(0.99) WITHIN GROUP (ORDER BY sle.duration_ms) AS p99, 

200 SUM(CASE WHEN upper(sle.level) IN ('ERROR','CRITICAL') OR sle.error_details IS NOT NULL THEN 1 ELSE 0 END) AS error_count 

201 FROM windows w 

202 CROSS JOIN pairs p 

203 JOIN structured_log_entries sle 

204 ON sle.timestamp >= w.window_start AND sle.timestamp < w.window_start + (:window_minutes || ' minutes')::interval 

205 AND sle.component = p.component AND sle.operation_type = p.operation_type 

206 AND sle.duration_ms IS NOT NULL 

207 GROUP BY w.window_start, p.component, p.operation_type 

208 HAVING COUNT(sle.duration_ms) > 0 

209 ORDER BY w.window_start, p.component, p.operation_type 

210 """ 

211 ) 

212 

213 rows = db.execute( 

214 sql, 

215 { 

216 "full_start": full_start, 

217 "full_end": full_end, 

218 "window_minutes": str(window_minutes), 

219 }, 

220 ).fetchall() 

221 

222 created_metrics: List[PerformanceMetric] = [] 

223 for row in rows: 

224 ws = row.window_start if row.window_start.tzinfo else row.window_start.replace(tzinfo=timezone.utc) 

225 component = row.component 

226 operation = row.operation_type 

227 count = int(row.cnt) 

228 avg_duration = float(row.avg_duration) if row.avg_duration is not None else 0.0 

229 min_duration = float(row.min_duration) if row.min_duration is not None else 0.0 

230 max_duration = float(row.max_duration) if row.max_duration is not None else 0.0 

231 p50 = float(row.p50) if row.p50 is not None else 0.0 

232 p95 = float(row.p95) if row.p95 is not None else 0.0 

233 p99 = float(row.p99) if row.p99 is not None else 0.0 

234 error_count = int(row.error_count) if row.error_count is not None else 0 

235 

236 metric = self._upsert_metric( 

237 component=component, 

238 operation_type=operation, 

239 window_start=ws, 

240 window_end=ws + window_delta, 

241 request_count=count, 

242 error_count=error_count, 

243 error_rate=(error_count / count) if count else 0.0, 

244 avg_duration_ms=avg_duration, 

245 min_duration_ms=min_duration, 

246 max_duration_ms=max_duration, 

247 p50_duration_ms=p50, 

248 p95_duration_ms=p95, 

249 p99_duration_ms=p99, 

250 metric_metadata={ 

251 "sample_size": count, 

252 "generated_at": datetime.now(timezone.utc).isoformat(), 

253 }, 

254 db=db, 

255 ) 

256 if metric: 256 ↛ 223line 256 didn't jump to line 223 because the condition on line 256 was always true

257 created_metrics.append(metric) 

258 

259 if should_close: 

260 db.commit() 

261 return created_metrics 

262 

263 # Fallback: in-Python bucketing (previous implementation) 

264 # Warning: This path loads all entries into memory; for very large ranges this may spike memory usage 

265 range_hours = (full_end - full_start).total_seconds() / 3600 

266 if range_hours > 168: # > 1 week 

267 logger.warning("Large aggregation range (%.1f hours) may cause high memory usage in non-PostgreSQL fallback", range_hours) 

268 

269 # Get unique component/operation pairs for the full range 

270 pair_stmt = ( 

271 select(StructuredLogEntry.component, StructuredLogEntry.operation_type) 

272 .where( 

273 and_( 

274 StructuredLogEntry.timestamp >= full_start, 

275 StructuredLogEntry.timestamp < full_end, 

276 StructuredLogEntry.duration_ms.isnot(None), 

277 StructuredLogEntry.component.isnot(None), 

278 StructuredLogEntry.component != "", 

279 StructuredLogEntry.operation_type.isnot(None), 

280 StructuredLogEntry.operation_type != "", 

281 ) 

282 ) 

283 .distinct() 

284 ) 

285 

286 pairs = db.execute(pair_stmt).all() 

287 

288 created_metrics: List[PerformanceMetric] = [] 

289 

290 # helper to align timestamp to window start 

291 def _align_to_window_local(dt: datetime, minutes: int) -> datetime: 

292 """Align a datetime to the start of its aggregation window. 

293 

294 Args: 

295 dt: The datetime to align. 

296 minutes: The window size in minutes. 

297 

298 Returns: 

299 The datetime aligned to the start of the window. 

300 """ 

301 ts = dt.astimezone(timezone.utc) 

302 total_minutes = int(ts.timestamp() // 60) 

303 aligned_minutes = (total_minutes // minutes) * minutes 

304 return datetime.fromtimestamp(aligned_minutes * 60, tz=timezone.utc) 

305 

306 for component, operation in pairs: 

307 if not component or not operation: 

308 continue 

309 

310 # Fetch all relevant log entries for this component/operation in the full range 

311 entries_stmt = select(StructuredLogEntry).where( 

312 and_( 

313 StructuredLogEntry.component == component, 

314 StructuredLogEntry.operation_type == operation, 

315 StructuredLogEntry.timestamp >= full_start, 

316 StructuredLogEntry.timestamp < full_end, 

317 StructuredLogEntry.duration_ms.isnot(None), 

318 ) 

319 ) 

320 

321 entries = db.execute(entries_stmt).scalars().all() 

322 if not entries: 

323 continue 

324 

325 # Bucket entries into windows 

326 buckets: Dict[datetime, List[StructuredLogEntry]] = {} 

327 for e in entries: 

328 ts = e.timestamp if e.timestamp.tzinfo else e.timestamp.replace(tzinfo=timezone.utc) 

329 bucket_start = _align_to_window_local(ts, window_minutes) 

330 if bucket_start not in buckets: 

331 buckets[bucket_start] = [] 

332 buckets[bucket_start].append(e) 

333 

334 # For each requested window, compute stats if we have data 

335 for window_start in window_starts: 

336 bucket_entries = buckets.get(window_start) 

337 if not bucket_entries: 

338 continue 

339 

340 durations = sorted([b.duration_ms for b in bucket_entries if b.duration_ms is not None]) 

341 if not durations: 

342 continue 

343 

344 count = len(durations) 

345 avg_duration = float(sum(durations) / count) if count else 0.0 

346 min_duration = float(durations[0]) 

347 max_duration = float(durations[-1]) 

348 p50 = float(self._percentile(durations, 0.5)) 

349 p95 = float(self._percentile(durations, 0.95)) 

350 p99 = float(self._percentile(durations, 0.99)) 

351 error_count = self._calculate_error_count(bucket_entries) 

352 

353 try: 

354 metric = self._upsert_metric( 

355 component=component, 

356 operation_type=operation, 

357 window_start=window_start, 

358 window_end=window_start + window_delta, 

359 request_count=count, 

360 error_count=error_count, 

361 error_rate=(error_count / count) if count else 0.0, 

362 avg_duration_ms=avg_duration, 

363 min_duration_ms=min_duration, 

364 max_duration_ms=max_duration, 

365 p50_duration_ms=p50, 

366 p95_duration_ms=p95, 

367 p99_duration_ms=p99, 

368 metric_metadata={ 

369 "sample_size": count, 

370 "generated_at": datetime.now(timezone.utc).isoformat(), 

371 }, 

372 db=db, 

373 ) 

374 if metric: 374 ↛ 335line 374 didn't jump to line 335 because the condition on line 374 was always true

375 created_metrics.append(metric) 

376 except Exception: 

377 logger.exception("Failed to upsert metric for %s.%s window %s", component, operation, window_start) 

378 

379 if should_close: 

380 db.commit() 

381 return created_metrics 

382 except Exception: 

383 if should_close: 383 ↛ 385line 383 didn't jump to line 385 because the condition on line 383 was always true

384 db.rollback() 

385 raise 

386 finally: 

387 if should_close: 

388 db.close() 

389 

390 def aggregate_all_components(self, window_start: Optional[datetime] = None, window_end: Optional[datetime] = None, db: Optional[Session] = None) -> List[PerformanceMetric]: 

391 """Aggregate metrics for all components and operations. 

392 

393 Args: 

394 window_start: Start of aggregation window 

395 window_end: End of aggregation window 

396 db: Optional database session 

397 

398 Returns: 

399 List of created PerformanceMetric records 

400 

401 Raises: 

402 Exception: If database operation fails 

403 """ 

404 if not self.enabled: 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true

405 return [] 

406 

407 should_close = False 

408 if db is None: 408 ↛ 412line 408 didn't jump to line 412 because the condition on line 408 was always true

409 db = SessionLocal() 

410 should_close = True 

411 

412 try: 

413 window_start, window_end = self._resolve_window_bounds(window_start, window_end) 

414 

415 stmt = ( 

416 select(StructuredLogEntry.component, StructuredLogEntry.operation_type) 

417 .where( 

418 and_( 

419 StructuredLogEntry.timestamp >= window_start, 

420 StructuredLogEntry.timestamp < window_end, 

421 StructuredLogEntry.duration_ms.isnot(None), 

422 StructuredLogEntry.operation_type.isnot(None), 

423 ) 

424 ) 

425 .distinct() 

426 ) 

427 

428 pairs = db.execute(stmt).all() 

429 

430 metrics = [] 

431 for component, operation in pairs: 

432 if component and operation: 

433 metric = self.aggregate_performance_metrics(component=component, operation_type=operation, window_start=window_start, window_end=window_end, db=db) 

434 if metric: 434 ↛ 431line 434 didn't jump to line 431 because the condition on line 434 was always true

435 metrics.append(metric) 

436 

437 if should_close: 437 ↛ 439line 437 didn't jump to line 439 because the condition on line 437 was always true

438 db.commit() # Commit on success 

439 return metrics 

440 

441 except Exception: 

442 if should_close: 442 ↛ 444line 442 didn't jump to line 444 because the condition on line 442 was always true

443 db.rollback() 

444 raise 

445 

446 finally: 

447 if should_close: 

448 db.close() 

449 

450 def get_recent_metrics(self, component: Optional[str] = None, operation: Optional[str] = None, hours: int = 24, db: Optional[Session] = None) -> List[PerformanceMetric]: 

451 """Get recent performance metrics. 

452 

453 Args: 

454 component: Optional component filter 

455 operation: Optional operation filter 

456 hours: Hours of history to retrieve 

457 db: Optional database session 

458 

459 Returns: 

460 List of PerformanceMetric records 

461 

462 Raises: 

463 Exception: If database operation fails 

464 """ 

465 should_close = False 

466 if db is None: 466 ↛ 470line 466 didn't jump to line 470 because the condition on line 466 was always true

467 db = SessionLocal() 

468 should_close = True 

469 

470 try: 

471 since = datetime.now(timezone.utc) - timedelta(hours=hours) 

472 

473 stmt = select(PerformanceMetric).where(PerformanceMetric.window_start >= since) 

474 

475 if component: 

476 stmt = stmt.where(PerformanceMetric.component == component) 

477 if operation: 

478 stmt = stmt.where(PerformanceMetric.operation_type == operation) 

479 

480 stmt = stmt.order_by(PerformanceMetric.window_start.desc()) 

481 

482 result = db.execute(stmt).scalars().all() 

483 if should_close: 483 ↛ 485line 483 didn't jump to line 485 because the condition on line 483 was always true

484 db.commit() # Commit on success 

485 return result 

486 

487 except Exception: 

488 if should_close: 488 ↛ 490line 488 didn't jump to line 490 because the condition on line 488 was always true

489 db.rollback() 

490 raise 

491 

492 finally: 

493 if should_close: 

494 db.close() 

495 

496 def get_degradation_alerts(self, threshold_multiplier: float = 1.5, hours: int = 24, db: Optional[Session] = None) -> List[Dict[str, Any]]: 

497 """Identify performance degradations by comparing recent vs baseline. 

498 

499 Args: 

500 threshold_multiplier: Alert if recent is X times slower than baseline 

501 hours: Hours of recent data to check 

502 db: Optional database session 

503 

504 Returns: 

505 List of degradation alerts with details 

506 

507 Raises: 

508 Exception: If database operation fails 

509 """ 

510 should_close = False 

511 if db is None: 

512 db = SessionLocal() 

513 should_close = True 

514 

515 try: 

516 recent_cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) 

517 baseline_cutoff = recent_cutoff - timedelta(hours=hours * 2) 

518 

519 # Get unique component/operation pairs 

520 stmt = select(PerformanceMetric.component, PerformanceMetric.operation_type).distinct() 

521 

522 pairs = db.execute(stmt).all() 

523 

524 alerts = [] 

525 for component, operation in pairs: 

526 # Get recent metrics 

527 recent_stmt = select(PerformanceMetric).where( 

528 and_(PerformanceMetric.component == component, PerformanceMetric.operation_type == operation, PerformanceMetric.window_start >= recent_cutoff) 

529 ) 

530 recent_metrics = db.execute(recent_stmt).scalars().all() 

531 

532 # Get baseline metrics 

533 baseline_stmt = select(PerformanceMetric).where( 

534 and_( 

535 PerformanceMetric.component == component, 

536 PerformanceMetric.operation_type == operation, 

537 PerformanceMetric.window_start >= baseline_cutoff, 

538 PerformanceMetric.window_start < recent_cutoff, 

539 ) 

540 ) 

541 baseline_metrics = db.execute(baseline_stmt).scalars().all() 

542 

543 if not recent_metrics or not baseline_metrics: 

544 continue 

545 

546 recent_avg = statistics.mean([m.avg_duration_ms for m in recent_metrics]) 

547 baseline_avg = statistics.mean([m.avg_duration_ms for m in baseline_metrics]) 

548 

549 if recent_avg > baseline_avg * threshold_multiplier: 549 ↛ 525line 549 didn't jump to line 525 because the condition on line 549 was always true

550 alerts.append( 

551 { 

552 "component": component, 

553 "operation": operation, 

554 "recent_avg_ms": recent_avg, 

555 "baseline_avg_ms": baseline_avg, 

556 "degradation_ratio": recent_avg / baseline_avg, 

557 "recent_error_rate": statistics.mean([m.error_rate for m in recent_metrics]), 

558 "baseline_error_rate": statistics.mean([m.error_rate for m in baseline_metrics]), 

559 } 

560 ) 

561 

562 if should_close: 

563 db.commit() # Commit on success 

564 return alerts 

565 

566 except Exception: 

567 if should_close: 

568 db.rollback() 

569 raise 

570 

571 finally: 

572 if should_close: 

573 db.close() 

574 

575 def backfill(self, hours: float, db: Optional[Session] = None) -> int: 

576 """Backfill metrics for a historical time range. 

577 

578 Args: 

579 hours: Number of hours of history to aggregate (supports fractional hours) 

580 db: Optional shared database session 

581 

582 Returns: 

583 Count of performance metric windows processed 

584 

585 Raises: 

586 Exception: If database operation fails 

587 """ 

588 if not self.enabled or hours <= 0: 

589 return 0 

590 

591 window_minutes = self.aggregation_window_minutes 

592 window_delta = timedelta(minutes=window_minutes) 

593 total_windows = max(1, math.ceil((hours * 60) / window_minutes)) 

594 

595 should_close = False 

596 if db is None: 596 ↛ 600line 596 didn't jump to line 600 because the condition on line 596 was always true

597 db = SessionLocal() 

598 should_close = True 

599 

600 try: 

601 _, latest_end = self._resolve_window_bounds(None, None) 

602 current_start = latest_end - (window_delta * total_windows) 

603 processed = 0 

604 

605 while current_start < latest_end: 

606 current_end = current_start + window_delta 

607 created = self.aggregate_all_components( 

608 window_start=current_start, 

609 window_end=current_end, 

610 db=db, 

611 ) 

612 if created: 612 ↛ 614line 612 didn't jump to line 614 because the condition on line 612 was always true

613 processed += 1 

614 current_start = current_end 

615 

616 if should_close: 616 ↛ 618line 616 didn't jump to line 618 because the condition on line 616 was always true

617 db.commit() # Commit on success 

618 return processed 

619 

620 except Exception: 

621 if should_close: 

622 db.rollback() 

623 raise 

624 

625 finally: 

626 if should_close: 

627 db.close() 

628 

629 @staticmethod 

630 def _percentile(sorted_values: List[float], percentile: float) -> float: 

631 """Calculate percentile from sorted values. 

632 

633 Args: 

634 sorted_values: Sorted list of values 

635 percentile: Percentile to calculate (0.0 to 1.0) 

636 

637 Returns: 

638 float: Calculated percentile value 

639 """ 

640 if not sorted_values: 

641 return 0.0 

642 

643 if len(sorted_values) == 1: 

644 return float(sorted_values[0]) 

645 

646 k = (len(sorted_values) - 1) * percentile 

647 f = math.floor(k) 

648 c = math.ceil(k) 

649 

650 if f == c: 

651 return float(sorted_values[int(k)]) 

652 

653 d0 = sorted_values[f] * (c - k) 

654 d1 = sorted_values[c] * (k - f) 

655 return float(d0 + d1) 

656 

657 @staticmethod 

658 def _calculate_error_count(entries: List[StructuredLogEntry]) -> int: 

659 """Calculate error occurrences for a batch of log entries. 

660 

661 Args: 

662 entries: List of log entries to analyze 

663 

664 Returns: 

665 int: Count of error entries 

666 """ 

667 error_levels = {"ERROR", "CRITICAL"} 

668 return sum(1 for entry in entries if (entry.level and entry.level.upper() in error_levels) or entry.error_details) 

669 

670 def _compute_stats_postgresql( 

671 self, 

672 db: Session, 

673 component: str, 

674 operation_type: str, 

675 window_start: datetime, 

676 window_end: datetime, 

677 ) -> Optional[Dict[str, Any]]: 

678 """Compute aggregation statistics using PostgreSQL SQL functions. 

679 

680 Uses PostgreSQL's percentile_cont for efficient in-database percentile 

681 computation, avoiding loading all rows into Python memory. 

682 

683 Args: 

684 db: Database session 

685 component: Component name to filter by 

686 operation_type: Operation type to filter by 

687 window_start: Start of the aggregation window 

688 window_end: End of the aggregation window 

689 

690 Returns: 

691 Dictionary with count, avg_duration, min_duration, max_duration, 

692 p50, p95, p99, and error_count, or None if no data. 

693 """ 

694 # Build base filter conditions 

695 base_conditions = and_( 

696 StructuredLogEntry.component == component, 

697 StructuredLogEntry.operation_type == operation_type, 

698 StructuredLogEntry.timestamp >= window_start, 

699 StructuredLogEntry.timestamp < window_end, 

700 StructuredLogEntry.duration_ms.isnot(None), 

701 ) 

702 

703 # First, check if there are any rows and get error count 

704 # (error count requires examining level/error_details which can't be done purely in SQL aggregate) 

705 count_stmt = select(func.count()).select_from(StructuredLogEntry).where(base_conditions) # pylint: disable=not-callable 

706 count_result = db.execute(count_stmt).scalar() 

707 

708 if not count_result or count_result == 0: 

709 return None 

710 

711 # PostgreSQL percentile_cont query using ordered-set aggregate functions 

712 # This computes all statistics in a single query 

713 stats_sql = text( 

714 """ 

715 SELECT 

716 COUNT(duration_ms) as cnt, 

717 AVG(duration_ms) as avg_duration, 

718 MIN(duration_ms) as min_duration, 

719 MAX(duration_ms) as max_duration, 

720 percentile_cont(0.50) WITHIN GROUP (ORDER BY duration_ms) as p50, 

721 percentile_cont(0.95) WITHIN GROUP (ORDER BY duration_ms) as p95, 

722 percentile_cont(0.99) WITHIN GROUP (ORDER BY duration_ms) as p99 

723 FROM structured_log_entries 

724 WHERE component = :component 

725 AND operation_type = :operation_type 

726 AND timestamp >= :window_start 

727 AND timestamp < :window_end 

728 AND duration_ms IS NOT NULL 

729 """ 

730 ) 

731 

732 result = db.execute( 

733 stats_sql, 

734 { 

735 "component": component, 

736 "operation_type": operation_type, 

737 "window_start": window_start, 

738 "window_end": window_end, 

739 }, 

740 ).fetchone() 

741 

742 if not result or result.cnt == 0: 

743 return None 

744 

745 # Get error count separately (requires level/error_details examination) 

746 error_stmt = ( 

747 select(func.count()) # pylint: disable=not-callable 

748 .select_from(StructuredLogEntry) 

749 .where( 

750 and_( 

751 base_conditions, 

752 ((func.upper(StructuredLogEntry.level).in_(["ERROR", "CRITICAL"])) | (StructuredLogEntry.error_details.isnot(None))), 

753 ) 

754 ) 

755 ) 

756 error_count = db.execute(error_stmt).scalar() or 0 

757 

758 return { 

759 "count": result.cnt, 

760 "avg_duration": float(result.avg_duration) if result.avg_duration else 0.0, 

761 "min_duration": float(result.min_duration) if result.min_duration else 0.0, 

762 "max_duration": float(result.max_duration) if result.max_duration else 0.0, 

763 "p50": float(result.p50) if result.p50 else 0.0, 

764 "p95": float(result.p95) if result.p95 else 0.0, 

765 "p99": float(result.p99) if result.p99 else 0.0, 

766 "error_count": error_count, 

767 } 

768 

769 def _compute_stats_python( 

770 self, 

771 db: Session, 

772 component: str, 

773 operation_type: str, 

774 window_start: datetime, 

775 window_end: datetime, 

776 ) -> Optional[Dict[str, Any]]: 

777 """Compute aggregation statistics using Python (fallback for SQLite). 

778 

779 Loads duration values into memory and computes statistics in Python. 

780 Used when database doesn't support native percentile functions. 

781 

782 Args: 

783 db: Database session 

784 component: Component name to filter by 

785 operation_type: Operation type to filter by 

786 window_start: Start of the aggregation window 

787 window_end: End of the aggregation window 

788 

789 Returns: 

790 Dictionary with count, avg_duration, min_duration, max_duration, 

791 p50, p95, p99, and error_count, or None if no data. 

792 """ 

793 # Query structured logs for this component/operation in time window 

794 stmt = select(StructuredLogEntry).where( 

795 and_( 

796 StructuredLogEntry.component == component, 

797 StructuredLogEntry.operation_type == operation_type, 

798 StructuredLogEntry.timestamp >= window_start, 

799 StructuredLogEntry.timestamp < window_end, 

800 StructuredLogEntry.duration_ms.isnot(None), 

801 ) 

802 ) 

803 

804 results = db.execute(stmt).scalars().all() 

805 

806 if not results: 

807 return None 

808 

809 # Extract durations 

810 durations = sorted(r.duration_ms for r in results if r.duration_ms is not None) 

811 

812 if not durations: 

813 return None 

814 

815 # Calculate statistics 

816 count = len(durations) 

817 avg_duration = statistics.fmean(durations) if hasattr(statistics, "fmean") else statistics.mean(durations) 

818 min_duration = durations[0] 

819 max_duration = durations[-1] 

820 

821 # Calculate percentiles 

822 p50 = self._percentile(durations, 0.50) 

823 p95 = self._percentile(durations, 0.95) 

824 p99 = self._percentile(durations, 0.99) 

825 

826 # Count errors 

827 error_count = self._calculate_error_count(results) 

828 

829 return { 

830 "count": count, 

831 "avg_duration": avg_duration, 

832 "min_duration": min_duration, 

833 "max_duration": max_duration, 

834 "p50": p50, 

835 "p95": p95, 

836 "p99": p99, 

837 "error_count": error_count, 

838 } 

839 

840 def _resolve_window_bounds( 

841 self, 

842 window_start: Optional[datetime], 

843 window_end: Optional[datetime], 

844 ) -> Tuple[datetime, datetime]: 

845 """Resolve and normalize aggregation window bounds. 

846 

847 Args: 

848 window_start: Start of window or None to calculate 

849 window_end: End of window or None for current time 

850 

851 Returns: 

852 Tuple[datetime, datetime]: Resolved window start and end 

853 """ 

854 window_delta = timedelta(minutes=self.aggregation_window_minutes) 

855 

856 if window_start is not None and window_end is not None: 

857 resolved_start = window_start.astimezone(timezone.utc) 

858 resolved_end = window_end.astimezone(timezone.utc) 

859 if resolved_end <= resolved_start: 

860 resolved_end = resolved_start + window_delta 

861 return resolved_start, resolved_end 

862 

863 if window_end is None: 

864 reference = datetime.now(timezone.utc) 

865 else: 

866 reference = window_end.astimezone(timezone.utc) 

867 

868 reference = reference.replace(second=0, microsecond=0) 

869 minutes_offset = reference.minute % self.aggregation_window_minutes 

870 if window_end is None and minutes_offset: 

871 reference = reference - timedelta(minutes=minutes_offset) 

872 

873 resolved_end = reference 

874 

875 if window_start is None: 

876 resolved_start = resolved_end - window_delta 

877 else: 

878 resolved_start = window_start.astimezone(timezone.utc) 

879 

880 if resolved_end <= resolved_start: 

881 resolved_start = resolved_end - window_delta 

882 

883 return resolved_start, resolved_end 

884 

885 def _upsert_metric( 

886 self, 

887 component: str, 

888 operation_type: str, 

889 window_start: datetime, 

890 window_end: datetime, 

891 request_count: int, 

892 error_count: int, 

893 error_rate: float, 

894 avg_duration_ms: float, 

895 min_duration_ms: float, 

896 max_duration_ms: float, 

897 p50_duration_ms: float, 

898 p95_duration_ms: float, 

899 p99_duration_ms: float, 

900 metric_metadata: Optional[Dict[str, Any]], 

901 db: Session, 

902 ) -> PerformanceMetric: 

903 """Create or update a performance metric window. 

904 

905 Args: 

906 component: Component name 

907 operation_type: Operation type 

908 window_start: Window start time 

909 window_end: Window end time 

910 request_count: Total request count 

911 error_count: Total error count 

912 error_rate: Error rate (0.0-1.0) 

913 avg_duration_ms: Average duration in milliseconds 

914 min_duration_ms: Minimum duration in milliseconds 

915 max_duration_ms: Maximum duration in milliseconds 

916 p50_duration_ms: 50th percentile duration 

917 p95_duration_ms: 95th percentile duration 

918 p99_duration_ms: 99th percentile duration 

919 metric_metadata: Additional metadata 

920 db: Database session 

921 

922 Returns: 

923 PerformanceMetric: Created or updated metric 

924 """ 

925 

926 existing_stmt = select(PerformanceMetric).where( 

927 and_( 

928 PerformanceMetric.component == component, 

929 PerformanceMetric.operation_type == operation_type, 

930 PerformanceMetric.window_start == window_start, 

931 PerformanceMetric.window_end == window_end, 

932 ) 

933 ) 

934 

935 existing_metrics = db.execute(existing_stmt).scalars().all() 

936 metric = existing_metrics[0] if existing_metrics else None 

937 

938 if len(existing_metrics) > 1: 

939 logger.warning( 

940 "Found %s duplicate performance metric rows for %s.%s window %s-%s; pruning extras", 

941 len(existing_metrics), 

942 component, 

943 operation_type, 

944 window_start.isoformat(), 

945 window_end.isoformat(), 

946 ) 

947 for duplicate in existing_metrics[1:]: 

948 db.delete(duplicate) 

949 

950 if metric is None: 

951 metric = PerformanceMetric( 

952 component=component, 

953 operation_type=operation_type, 

954 window_start=window_start, 

955 window_end=window_end, 

956 window_duration_seconds=int((window_end - window_start).total_seconds()), 

957 ) 

958 db.add(metric) 

959 

960 metric.request_count = request_count 

961 metric.error_count = error_count 

962 metric.error_rate = error_rate 

963 metric.avg_duration_ms = avg_duration_ms 

964 metric.min_duration_ms = min_duration_ms 

965 metric.max_duration_ms = max_duration_ms 

966 metric.p50_duration_ms = p50_duration_ms 

967 metric.p95_duration_ms = p95_duration_ms 

968 metric.p99_duration_ms = p99_duration_ms 

969 metric.metric_metadata = metric_metadata 

970 

971 db.commit() 

972 db.refresh(metric) 

973 return metric 

974 

975 

976# Global log aggregator instance 

977_log_aggregator: Optional[LogAggregator] = None 

978 

979 

980def get_log_aggregator() -> LogAggregator: 

981 """Get or create the global log aggregator instance. 

982 

983 Returns: 

984 Global LogAggregator instance 

985 """ 

986 global _log_aggregator # pylint: disable=global-statement 

987 if _log_aggregator is None: 

988 _log_aggregator = LogAggregator() 

989 return _log_aggregator