Coverage for mcpgateway / services / log_aggregator.py: 94%
355 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/log_aggregator.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
6Log Aggregation Service.
8This module provides aggregation of performance metrics from structured logs
9into time-windowed statistics for analysis and monitoring.
10"""
12# Standard
13from datetime import datetime, timedelta, timezone
14import logging
15import math
16import statistics
17from typing import Any, Dict, List, Optional, Tuple
19# Third-Party
20from sqlalchemy import and_, func, select, text
21from sqlalchemy.orm import Session
23# First-Party
24from mcpgateway.config import settings
25from mcpgateway.db import engine, PerformanceMetric, SessionLocal, StructuredLogEntry
27logger = logging.getLogger(__name__)
30def _is_postgresql() -> bool:
31 """Check if the database backend is PostgreSQL.
33 Returns:
34 True if using PostgreSQL, False otherwise.
35 """
36 return engine.dialect.name == "postgresql"
39class LogAggregator:
40 """Aggregates structured logs into performance metrics."""
42 def __init__(self):
43 """Initialize log aggregator."""
44 self.aggregation_window_minutes = getattr(settings, "metrics_aggregation_window_minutes", 5)
45 self.enabled = getattr(settings, "metrics_aggregation_enabled", True)
46 self._use_sql_percentiles = _is_postgresql()
48 def aggregate_performance_metrics(
49 self, component: Optional[str], operation_type: Optional[str], window_start: Optional[datetime] = None, window_end: Optional[datetime] = None, db: Optional[Session] = None
50 ) -> Optional[PerformanceMetric]:
51 """Aggregate performance metrics for a component and operation.
53 Args:
54 component: Component name
55 operation_type: Operation name
56 window_start: Start of aggregation window (defaults to N minutes ago)
57 window_end: End of aggregation window (defaults to now)
58 db: Optional database session
60 Returns:
61 Created PerformanceMetric or None if no data
62 """
63 if not self.enabled:
64 return None
65 if not component or not operation_type:
66 return None
68 window_start, window_end = self._resolve_window_bounds(window_start, window_end)
70 should_close = False
71 if db is None: 71 ↛ 75line 71 didn't jump to line 75 because the condition on line 71 was always true
72 db = SessionLocal()
73 should_close = True
75 try:
76 # Use SQL-based aggregation for PostgreSQL, Python fallback for SQLite
77 if self._use_sql_percentiles:
78 stats = self._compute_stats_postgresql(db, component, operation_type, window_start, window_end)
79 else:
80 stats = self._compute_stats_python(db, component, operation_type, window_start, window_end)
82 if stats is None:
83 return None
85 count = stats["count"]
86 avg_duration = stats["avg_duration"]
87 min_duration = stats["min_duration"]
88 max_duration = stats["max_duration"]
89 p50 = stats["p50"]
90 p95 = stats["p95"]
91 p99 = stats["p99"]
92 error_count = stats["error_count"]
93 error_rate = error_count / count if count > 0 else 0.0
95 metric = self._upsert_metric(
96 component=component,
97 operation_type=operation_type,
98 window_start=window_start,
99 window_end=window_end,
100 request_count=count,
101 error_count=error_count,
102 error_rate=error_rate,
103 avg_duration_ms=avg_duration,
104 min_duration_ms=min_duration,
105 max_duration_ms=max_duration,
106 p50_duration_ms=p50,
107 p95_duration_ms=p95,
108 p99_duration_ms=p99,
109 metric_metadata={
110 "sample_size": count,
111 "generated_at": datetime.now(timezone.utc).isoformat(),
112 },
113 db=db,
114 )
116 logger.info(f"Aggregated performance metrics for {component}.{operation_type}: " f"{count} requests, {avg_duration:.2f}ms avg, {error_rate:.2%} error rate")
118 if should_close: 118 ↛ 120line 118 didn't jump to line 120 because the condition on line 118 was always true
119 db.commit() # Commit transaction on success
120 return metric
122 except Exception as e:
123 logger.error(f"Failed to aggregate performance metrics: {e}")
124 if should_close and db: 124 ↛ 126line 124 didn't jump to line 126 because the condition on line 124 was always true
125 db.rollback()
126 return None
128 finally:
129 if should_close:
130 db.close()
132 def aggregate_all_components_batch(self, window_starts: List[datetime], window_minutes: int, db: Optional[Session] = None) -> List[PerformanceMetric]:
133 """Aggregate metrics for all components/operations for multiple windows in a single batch.
135 This reduces the number of database round-trips by fetching logs for the full
136 time span once per component/operation and partitioning them into windows in
137 Python, then upserting per-window metrics.
139 Args:
140 window_starts: List of window start datetimes (UTC)
141 window_minutes: Window size in minutes
142 db: Optional database session
144 Returns:
145 List of created/updated PerformanceMetric records
147 Raises:
148 Exception: If a database operation fails during aggregation.
149 """
150 if not self.enabled:
151 return []
153 if not window_starts:
154 return []
156 should_close = False
157 if db is None:
158 db = SessionLocal()
159 should_close = True
161 try:
162 window_delta = timedelta(minutes=window_minutes)
163 # Determine full range to query once
164 full_start = min(window_starts)
165 full_end = max(window_starts) + window_delta
167 # Validate window_starts is contiguous - warn if sparse (batch generates all windows in range)
168 expected_window_count = int((full_end - full_start).total_seconds() / (window_minutes * 60))
169 if len(window_starts) != expected_window_count:
170 logger.warning(
171 "Batch aggregation received %d windows but range spans %d; sparse windows will generate extra metrics",
172 len(window_starts),
173 expected_window_count,
174 )
176 # If PostgreSQL is available, use a single SQL rollup with generate_series and ordered-set aggregates
177 if _is_postgresql():
178 sql = text(
179 """
180 WITH windows AS (
181 SELECT generate_series(:full_start::timestamptz, (:full_end - (:window_minutes || ' minutes')::interval)::timestamptz, (:window_minutes || ' minutes')::interval) AS window_start
182 ), pairs AS (
183 SELECT DISTINCT component, operation_type FROM structured_log_entries
184 WHERE timestamp >= :full_start AND timestamp < :full_end
185 AND duration_ms IS NOT NULL
186 AND component IS NOT NULL AND component <> ''
187 AND operation_type IS NOT NULL AND operation_type <> ''
188 )
189 SELECT
190 w.window_start,
191 p.component,
192 p.operation_type,
193 COUNT(sle.duration_ms) AS cnt,
194 AVG(sle.duration_ms) AS avg_duration,
195 MIN(sle.duration_ms) AS min_duration,
196 MAX(sle.duration_ms) AS max_duration,
197 percentile_cont(0.50) WITHIN GROUP (ORDER BY sle.duration_ms) AS p50,
198 percentile_cont(0.95) WITHIN GROUP (ORDER BY sle.duration_ms) AS p95,
199 percentile_cont(0.99) WITHIN GROUP (ORDER BY sle.duration_ms) AS p99,
200 SUM(CASE WHEN upper(sle.level) IN ('ERROR','CRITICAL') OR sle.error_details IS NOT NULL THEN 1 ELSE 0 END) AS error_count
201 FROM windows w
202 CROSS JOIN pairs p
203 JOIN structured_log_entries sle
204 ON sle.timestamp >= w.window_start AND sle.timestamp < w.window_start + (:window_minutes || ' minutes')::interval
205 AND sle.component = p.component AND sle.operation_type = p.operation_type
206 AND sle.duration_ms IS NOT NULL
207 GROUP BY w.window_start, p.component, p.operation_type
208 HAVING COUNT(sle.duration_ms) > 0
209 ORDER BY w.window_start, p.component, p.operation_type
210 """
211 )
213 rows = db.execute(
214 sql,
215 {
216 "full_start": full_start,
217 "full_end": full_end,
218 "window_minutes": str(window_minutes),
219 },
220 ).fetchall()
222 created_metrics: List[PerformanceMetric] = []
223 for row in rows:
224 ws = row.window_start if row.window_start.tzinfo else row.window_start.replace(tzinfo=timezone.utc)
225 component = row.component
226 operation = row.operation_type
227 count = int(row.cnt)
228 avg_duration = float(row.avg_duration) if row.avg_duration is not None else 0.0
229 min_duration = float(row.min_duration) if row.min_duration is not None else 0.0
230 max_duration = float(row.max_duration) if row.max_duration is not None else 0.0
231 p50 = float(row.p50) if row.p50 is not None else 0.0
232 p95 = float(row.p95) if row.p95 is not None else 0.0
233 p99 = float(row.p99) if row.p99 is not None else 0.0
234 error_count = int(row.error_count) if row.error_count is not None else 0
236 metric = self._upsert_metric(
237 component=component,
238 operation_type=operation,
239 window_start=ws,
240 window_end=ws + window_delta,
241 request_count=count,
242 error_count=error_count,
243 error_rate=(error_count / count) if count else 0.0,
244 avg_duration_ms=avg_duration,
245 min_duration_ms=min_duration,
246 max_duration_ms=max_duration,
247 p50_duration_ms=p50,
248 p95_duration_ms=p95,
249 p99_duration_ms=p99,
250 metric_metadata={
251 "sample_size": count,
252 "generated_at": datetime.now(timezone.utc).isoformat(),
253 },
254 db=db,
255 )
256 if metric: 256 ↛ 223line 256 didn't jump to line 223 because the condition on line 256 was always true
257 created_metrics.append(metric)
259 if should_close:
260 db.commit()
261 return created_metrics
263 # Fallback: in-Python bucketing (previous implementation)
264 # Warning: This path loads all entries into memory; for very large ranges this may spike memory usage
265 range_hours = (full_end - full_start).total_seconds() / 3600
266 if range_hours > 168: # > 1 week
267 logger.warning("Large aggregation range (%.1f hours) may cause high memory usage in non-PostgreSQL fallback", range_hours)
269 # Get unique component/operation pairs for the full range
270 pair_stmt = (
271 select(StructuredLogEntry.component, StructuredLogEntry.operation_type)
272 .where(
273 and_(
274 StructuredLogEntry.timestamp >= full_start,
275 StructuredLogEntry.timestamp < full_end,
276 StructuredLogEntry.duration_ms.isnot(None),
277 StructuredLogEntry.component.isnot(None),
278 StructuredLogEntry.component != "",
279 StructuredLogEntry.operation_type.isnot(None),
280 StructuredLogEntry.operation_type != "",
281 )
282 )
283 .distinct()
284 )
286 pairs = db.execute(pair_stmt).all()
288 created_metrics: List[PerformanceMetric] = []
290 # helper to align timestamp to window start
291 def _align_to_window_local(dt: datetime, minutes: int) -> datetime:
292 """Align a datetime to the start of its aggregation window.
294 Args:
295 dt: The datetime to align.
296 minutes: The window size in minutes.
298 Returns:
299 The datetime aligned to the start of the window.
300 """
301 ts = dt.astimezone(timezone.utc)
302 total_minutes = int(ts.timestamp() // 60)
303 aligned_minutes = (total_minutes // minutes) * minutes
304 return datetime.fromtimestamp(aligned_minutes * 60, tz=timezone.utc)
306 for component, operation in pairs:
307 if not component or not operation:
308 continue
310 # Fetch all relevant log entries for this component/operation in the full range
311 entries_stmt = select(StructuredLogEntry).where(
312 and_(
313 StructuredLogEntry.component == component,
314 StructuredLogEntry.operation_type == operation,
315 StructuredLogEntry.timestamp >= full_start,
316 StructuredLogEntry.timestamp < full_end,
317 StructuredLogEntry.duration_ms.isnot(None),
318 )
319 )
321 entries = db.execute(entries_stmt).scalars().all()
322 if not entries:
323 continue
325 # Bucket entries into windows
326 buckets: Dict[datetime, List[StructuredLogEntry]] = {}
327 for e in entries:
328 ts = e.timestamp if e.timestamp.tzinfo else e.timestamp.replace(tzinfo=timezone.utc)
329 bucket_start = _align_to_window_local(ts, window_minutes)
330 if bucket_start not in buckets:
331 buckets[bucket_start] = []
332 buckets[bucket_start].append(e)
334 # For each requested window, compute stats if we have data
335 for window_start in window_starts:
336 bucket_entries = buckets.get(window_start)
337 if not bucket_entries:
338 continue
340 durations = sorted([b.duration_ms for b in bucket_entries if b.duration_ms is not None])
341 if not durations:
342 continue
344 count = len(durations)
345 avg_duration = float(sum(durations) / count) if count else 0.0
346 min_duration = float(durations[0])
347 max_duration = float(durations[-1])
348 p50 = float(self._percentile(durations, 0.5))
349 p95 = float(self._percentile(durations, 0.95))
350 p99 = float(self._percentile(durations, 0.99))
351 error_count = self._calculate_error_count(bucket_entries)
353 try:
354 metric = self._upsert_metric(
355 component=component,
356 operation_type=operation,
357 window_start=window_start,
358 window_end=window_start + window_delta,
359 request_count=count,
360 error_count=error_count,
361 error_rate=(error_count / count) if count else 0.0,
362 avg_duration_ms=avg_duration,
363 min_duration_ms=min_duration,
364 max_duration_ms=max_duration,
365 p50_duration_ms=p50,
366 p95_duration_ms=p95,
367 p99_duration_ms=p99,
368 metric_metadata={
369 "sample_size": count,
370 "generated_at": datetime.now(timezone.utc).isoformat(),
371 },
372 db=db,
373 )
374 if metric: 374 ↛ 335line 374 didn't jump to line 335 because the condition on line 374 was always true
375 created_metrics.append(metric)
376 except Exception:
377 logger.exception("Failed to upsert metric for %s.%s window %s", component, operation, window_start)
379 if should_close:
380 db.commit()
381 return created_metrics
382 except Exception:
383 if should_close: 383 ↛ 385line 383 didn't jump to line 385 because the condition on line 383 was always true
384 db.rollback()
385 raise
386 finally:
387 if should_close:
388 db.close()
390 def aggregate_all_components(self, window_start: Optional[datetime] = None, window_end: Optional[datetime] = None, db: Optional[Session] = None) -> List[PerformanceMetric]:
391 """Aggregate metrics for all components and operations.
393 Args:
394 window_start: Start of aggregation window
395 window_end: End of aggregation window
396 db: Optional database session
398 Returns:
399 List of created PerformanceMetric records
401 Raises:
402 Exception: If database operation fails
403 """
404 if not self.enabled: 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true
405 return []
407 should_close = False
408 if db is None: 408 ↛ 412line 408 didn't jump to line 412 because the condition on line 408 was always true
409 db = SessionLocal()
410 should_close = True
412 try:
413 window_start, window_end = self._resolve_window_bounds(window_start, window_end)
415 stmt = (
416 select(StructuredLogEntry.component, StructuredLogEntry.operation_type)
417 .where(
418 and_(
419 StructuredLogEntry.timestamp >= window_start,
420 StructuredLogEntry.timestamp < window_end,
421 StructuredLogEntry.duration_ms.isnot(None),
422 StructuredLogEntry.operation_type.isnot(None),
423 )
424 )
425 .distinct()
426 )
428 pairs = db.execute(stmt).all()
430 metrics = []
431 for component, operation in pairs:
432 if component and operation:
433 metric = self.aggregate_performance_metrics(component=component, operation_type=operation, window_start=window_start, window_end=window_end, db=db)
434 if metric: 434 ↛ 431line 434 didn't jump to line 431 because the condition on line 434 was always true
435 metrics.append(metric)
437 if should_close: 437 ↛ 439line 437 didn't jump to line 439 because the condition on line 437 was always true
438 db.commit() # Commit on success
439 return metrics
441 except Exception:
442 if should_close: 442 ↛ 444line 442 didn't jump to line 444 because the condition on line 442 was always true
443 db.rollback()
444 raise
446 finally:
447 if should_close:
448 db.close()
450 def get_recent_metrics(self, component: Optional[str] = None, operation: Optional[str] = None, hours: int = 24, db: Optional[Session] = None) -> List[PerformanceMetric]:
451 """Get recent performance metrics.
453 Args:
454 component: Optional component filter
455 operation: Optional operation filter
456 hours: Hours of history to retrieve
457 db: Optional database session
459 Returns:
460 List of PerformanceMetric records
462 Raises:
463 Exception: If database operation fails
464 """
465 should_close = False
466 if db is None: 466 ↛ 470line 466 didn't jump to line 470 because the condition on line 466 was always true
467 db = SessionLocal()
468 should_close = True
470 try:
471 since = datetime.now(timezone.utc) - timedelta(hours=hours)
473 stmt = select(PerformanceMetric).where(PerformanceMetric.window_start >= since)
475 if component:
476 stmt = stmt.where(PerformanceMetric.component == component)
477 if operation:
478 stmt = stmt.where(PerformanceMetric.operation_type == operation)
480 stmt = stmt.order_by(PerformanceMetric.window_start.desc())
482 result = db.execute(stmt).scalars().all()
483 if should_close: 483 ↛ 485line 483 didn't jump to line 485 because the condition on line 483 was always true
484 db.commit() # Commit on success
485 return result
487 except Exception:
488 if should_close: 488 ↛ 490line 488 didn't jump to line 490 because the condition on line 488 was always true
489 db.rollback()
490 raise
492 finally:
493 if should_close:
494 db.close()
496 def get_degradation_alerts(self, threshold_multiplier: float = 1.5, hours: int = 24, db: Optional[Session] = None) -> List[Dict[str, Any]]:
497 """Identify performance degradations by comparing recent vs baseline.
499 Args:
500 threshold_multiplier: Alert if recent is X times slower than baseline
501 hours: Hours of recent data to check
502 db: Optional database session
504 Returns:
505 List of degradation alerts with details
507 Raises:
508 Exception: If database operation fails
509 """
510 should_close = False
511 if db is None:
512 db = SessionLocal()
513 should_close = True
515 try:
516 recent_cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
517 baseline_cutoff = recent_cutoff - timedelta(hours=hours * 2)
519 # Get unique component/operation pairs
520 stmt = select(PerformanceMetric.component, PerformanceMetric.operation_type).distinct()
522 pairs = db.execute(stmt).all()
524 alerts = []
525 for component, operation in pairs:
526 # Get recent metrics
527 recent_stmt = select(PerformanceMetric).where(
528 and_(PerformanceMetric.component == component, PerformanceMetric.operation_type == operation, PerformanceMetric.window_start >= recent_cutoff)
529 )
530 recent_metrics = db.execute(recent_stmt).scalars().all()
532 # Get baseline metrics
533 baseline_stmt = select(PerformanceMetric).where(
534 and_(
535 PerformanceMetric.component == component,
536 PerformanceMetric.operation_type == operation,
537 PerformanceMetric.window_start >= baseline_cutoff,
538 PerformanceMetric.window_start < recent_cutoff,
539 )
540 )
541 baseline_metrics = db.execute(baseline_stmt).scalars().all()
543 if not recent_metrics or not baseline_metrics:
544 continue
546 recent_avg = statistics.mean([m.avg_duration_ms for m in recent_metrics])
547 baseline_avg = statistics.mean([m.avg_duration_ms for m in baseline_metrics])
549 if recent_avg > baseline_avg * threshold_multiplier: 549 ↛ 525line 549 didn't jump to line 525 because the condition on line 549 was always true
550 alerts.append(
551 {
552 "component": component,
553 "operation": operation,
554 "recent_avg_ms": recent_avg,
555 "baseline_avg_ms": baseline_avg,
556 "degradation_ratio": recent_avg / baseline_avg,
557 "recent_error_rate": statistics.mean([m.error_rate for m in recent_metrics]),
558 "baseline_error_rate": statistics.mean([m.error_rate for m in baseline_metrics]),
559 }
560 )
562 if should_close:
563 db.commit() # Commit on success
564 return alerts
566 except Exception:
567 if should_close:
568 db.rollback()
569 raise
571 finally:
572 if should_close:
573 db.close()
575 def backfill(self, hours: float, db: Optional[Session] = None) -> int:
576 """Backfill metrics for a historical time range.
578 Args:
579 hours: Number of hours of history to aggregate (supports fractional hours)
580 db: Optional shared database session
582 Returns:
583 Count of performance metric windows processed
585 Raises:
586 Exception: If database operation fails
587 """
588 if not self.enabled or hours <= 0:
589 return 0
591 window_minutes = self.aggregation_window_minutes
592 window_delta = timedelta(minutes=window_minutes)
593 total_windows = max(1, math.ceil((hours * 60) / window_minutes))
595 should_close = False
596 if db is None: 596 ↛ 600line 596 didn't jump to line 600 because the condition on line 596 was always true
597 db = SessionLocal()
598 should_close = True
600 try:
601 _, latest_end = self._resolve_window_bounds(None, None)
602 current_start = latest_end - (window_delta * total_windows)
603 processed = 0
605 while current_start < latest_end:
606 current_end = current_start + window_delta
607 created = self.aggregate_all_components(
608 window_start=current_start,
609 window_end=current_end,
610 db=db,
611 )
612 if created: 612 ↛ 614line 612 didn't jump to line 614 because the condition on line 612 was always true
613 processed += 1
614 current_start = current_end
616 if should_close: 616 ↛ 618line 616 didn't jump to line 618 because the condition on line 616 was always true
617 db.commit() # Commit on success
618 return processed
620 except Exception:
621 if should_close:
622 db.rollback()
623 raise
625 finally:
626 if should_close:
627 db.close()
629 @staticmethod
630 def _percentile(sorted_values: List[float], percentile: float) -> float:
631 """Calculate percentile from sorted values.
633 Args:
634 sorted_values: Sorted list of values
635 percentile: Percentile to calculate (0.0 to 1.0)
637 Returns:
638 float: Calculated percentile value
639 """
640 if not sorted_values:
641 return 0.0
643 if len(sorted_values) == 1:
644 return float(sorted_values[0])
646 k = (len(sorted_values) - 1) * percentile
647 f = math.floor(k)
648 c = math.ceil(k)
650 if f == c:
651 return float(sorted_values[int(k)])
653 d0 = sorted_values[f] * (c - k)
654 d1 = sorted_values[c] * (k - f)
655 return float(d0 + d1)
657 @staticmethod
658 def _calculate_error_count(entries: List[StructuredLogEntry]) -> int:
659 """Calculate error occurrences for a batch of log entries.
661 Args:
662 entries: List of log entries to analyze
664 Returns:
665 int: Count of error entries
666 """
667 error_levels = {"ERROR", "CRITICAL"}
668 return sum(1 for entry in entries if (entry.level and entry.level.upper() in error_levels) or entry.error_details)
670 def _compute_stats_postgresql(
671 self,
672 db: Session,
673 component: str,
674 operation_type: str,
675 window_start: datetime,
676 window_end: datetime,
677 ) -> Optional[Dict[str, Any]]:
678 """Compute aggregation statistics using PostgreSQL SQL functions.
680 Uses PostgreSQL's percentile_cont for efficient in-database percentile
681 computation, avoiding loading all rows into Python memory.
683 Args:
684 db: Database session
685 component: Component name to filter by
686 operation_type: Operation type to filter by
687 window_start: Start of the aggregation window
688 window_end: End of the aggregation window
690 Returns:
691 Dictionary with count, avg_duration, min_duration, max_duration,
692 p50, p95, p99, and error_count, or None if no data.
693 """
694 # Build base filter conditions
695 base_conditions = and_(
696 StructuredLogEntry.component == component,
697 StructuredLogEntry.operation_type == operation_type,
698 StructuredLogEntry.timestamp >= window_start,
699 StructuredLogEntry.timestamp < window_end,
700 StructuredLogEntry.duration_ms.isnot(None),
701 )
703 # First, check if there are any rows and get error count
704 # (error count requires examining level/error_details which can't be done purely in SQL aggregate)
705 count_stmt = select(func.count()).select_from(StructuredLogEntry).where(base_conditions) # pylint: disable=not-callable
706 count_result = db.execute(count_stmt).scalar()
708 if not count_result or count_result == 0:
709 return None
711 # PostgreSQL percentile_cont query using ordered-set aggregate functions
712 # This computes all statistics in a single query
713 stats_sql = text(
714 """
715 SELECT
716 COUNT(duration_ms) as cnt,
717 AVG(duration_ms) as avg_duration,
718 MIN(duration_ms) as min_duration,
719 MAX(duration_ms) as max_duration,
720 percentile_cont(0.50) WITHIN GROUP (ORDER BY duration_ms) as p50,
721 percentile_cont(0.95) WITHIN GROUP (ORDER BY duration_ms) as p95,
722 percentile_cont(0.99) WITHIN GROUP (ORDER BY duration_ms) as p99
723 FROM structured_log_entries
724 WHERE component = :component
725 AND operation_type = :operation_type
726 AND timestamp >= :window_start
727 AND timestamp < :window_end
728 AND duration_ms IS NOT NULL
729 """
730 )
732 result = db.execute(
733 stats_sql,
734 {
735 "component": component,
736 "operation_type": operation_type,
737 "window_start": window_start,
738 "window_end": window_end,
739 },
740 ).fetchone()
742 if not result or result.cnt == 0:
743 return None
745 # Get error count separately (requires level/error_details examination)
746 error_stmt = (
747 select(func.count()) # pylint: disable=not-callable
748 .select_from(StructuredLogEntry)
749 .where(
750 and_(
751 base_conditions,
752 ((func.upper(StructuredLogEntry.level).in_(["ERROR", "CRITICAL"])) | (StructuredLogEntry.error_details.isnot(None))),
753 )
754 )
755 )
756 error_count = db.execute(error_stmt).scalar() or 0
758 return {
759 "count": result.cnt,
760 "avg_duration": float(result.avg_duration) if result.avg_duration else 0.0,
761 "min_duration": float(result.min_duration) if result.min_duration else 0.0,
762 "max_duration": float(result.max_duration) if result.max_duration else 0.0,
763 "p50": float(result.p50) if result.p50 else 0.0,
764 "p95": float(result.p95) if result.p95 else 0.0,
765 "p99": float(result.p99) if result.p99 else 0.0,
766 "error_count": error_count,
767 }
769 def _compute_stats_python(
770 self,
771 db: Session,
772 component: str,
773 operation_type: str,
774 window_start: datetime,
775 window_end: datetime,
776 ) -> Optional[Dict[str, Any]]:
777 """Compute aggregation statistics using Python (fallback for SQLite).
779 Loads duration values into memory and computes statistics in Python.
780 Used when database doesn't support native percentile functions.
782 Args:
783 db: Database session
784 component: Component name to filter by
785 operation_type: Operation type to filter by
786 window_start: Start of the aggregation window
787 window_end: End of the aggregation window
789 Returns:
790 Dictionary with count, avg_duration, min_duration, max_duration,
791 p50, p95, p99, and error_count, or None if no data.
792 """
793 # Query structured logs for this component/operation in time window
794 stmt = select(StructuredLogEntry).where(
795 and_(
796 StructuredLogEntry.component == component,
797 StructuredLogEntry.operation_type == operation_type,
798 StructuredLogEntry.timestamp >= window_start,
799 StructuredLogEntry.timestamp < window_end,
800 StructuredLogEntry.duration_ms.isnot(None),
801 )
802 )
804 results = db.execute(stmt).scalars().all()
806 if not results:
807 return None
809 # Extract durations
810 durations = sorted(r.duration_ms for r in results if r.duration_ms is not None)
812 if not durations:
813 return None
815 # Calculate statistics
816 count = len(durations)
817 avg_duration = statistics.fmean(durations) if hasattr(statistics, "fmean") else statistics.mean(durations)
818 min_duration = durations[0]
819 max_duration = durations[-1]
821 # Calculate percentiles
822 p50 = self._percentile(durations, 0.50)
823 p95 = self._percentile(durations, 0.95)
824 p99 = self._percentile(durations, 0.99)
826 # Count errors
827 error_count = self._calculate_error_count(results)
829 return {
830 "count": count,
831 "avg_duration": avg_duration,
832 "min_duration": min_duration,
833 "max_duration": max_duration,
834 "p50": p50,
835 "p95": p95,
836 "p99": p99,
837 "error_count": error_count,
838 }
840 def _resolve_window_bounds(
841 self,
842 window_start: Optional[datetime],
843 window_end: Optional[datetime],
844 ) -> Tuple[datetime, datetime]:
845 """Resolve and normalize aggregation window bounds.
847 Args:
848 window_start: Start of window or None to calculate
849 window_end: End of window or None for current time
851 Returns:
852 Tuple[datetime, datetime]: Resolved window start and end
853 """
854 window_delta = timedelta(minutes=self.aggregation_window_minutes)
856 if window_start is not None and window_end is not None:
857 resolved_start = window_start.astimezone(timezone.utc)
858 resolved_end = window_end.astimezone(timezone.utc)
859 if resolved_end <= resolved_start:
860 resolved_end = resolved_start + window_delta
861 return resolved_start, resolved_end
863 if window_end is None:
864 reference = datetime.now(timezone.utc)
865 else:
866 reference = window_end.astimezone(timezone.utc)
868 reference = reference.replace(second=0, microsecond=0)
869 minutes_offset = reference.minute % self.aggregation_window_minutes
870 if window_end is None and minutes_offset:
871 reference = reference - timedelta(minutes=minutes_offset)
873 resolved_end = reference
875 if window_start is None:
876 resolved_start = resolved_end - window_delta
877 else:
878 resolved_start = window_start.astimezone(timezone.utc)
880 if resolved_end <= resolved_start:
881 resolved_start = resolved_end - window_delta
883 return resolved_start, resolved_end
885 def _upsert_metric(
886 self,
887 component: str,
888 operation_type: str,
889 window_start: datetime,
890 window_end: datetime,
891 request_count: int,
892 error_count: int,
893 error_rate: float,
894 avg_duration_ms: float,
895 min_duration_ms: float,
896 max_duration_ms: float,
897 p50_duration_ms: float,
898 p95_duration_ms: float,
899 p99_duration_ms: float,
900 metric_metadata: Optional[Dict[str, Any]],
901 db: Session,
902 ) -> PerformanceMetric:
903 """Create or update a performance metric window.
905 Args:
906 component: Component name
907 operation_type: Operation type
908 window_start: Window start time
909 window_end: Window end time
910 request_count: Total request count
911 error_count: Total error count
912 error_rate: Error rate (0.0-1.0)
913 avg_duration_ms: Average duration in milliseconds
914 min_duration_ms: Minimum duration in milliseconds
915 max_duration_ms: Maximum duration in milliseconds
916 p50_duration_ms: 50th percentile duration
917 p95_duration_ms: 95th percentile duration
918 p99_duration_ms: 99th percentile duration
919 metric_metadata: Additional metadata
920 db: Database session
922 Returns:
923 PerformanceMetric: Created or updated metric
924 """
926 existing_stmt = select(PerformanceMetric).where(
927 and_(
928 PerformanceMetric.component == component,
929 PerformanceMetric.operation_type == operation_type,
930 PerformanceMetric.window_start == window_start,
931 PerformanceMetric.window_end == window_end,
932 )
933 )
935 existing_metrics = db.execute(existing_stmt).scalars().all()
936 metric = existing_metrics[0] if existing_metrics else None
938 if len(existing_metrics) > 1:
939 logger.warning(
940 "Found %s duplicate performance metric rows for %s.%s window %s-%s; pruning extras",
941 len(existing_metrics),
942 component,
943 operation_type,
944 window_start.isoformat(),
945 window_end.isoformat(),
946 )
947 for duplicate in existing_metrics[1:]:
948 db.delete(duplicate)
950 if metric is None:
951 metric = PerformanceMetric(
952 component=component,
953 operation_type=operation_type,
954 window_start=window_start,
955 window_end=window_end,
956 window_duration_seconds=int((window_end - window_start).total_seconds()),
957 )
958 db.add(metric)
960 metric.request_count = request_count
961 metric.error_count = error_count
962 metric.error_rate = error_rate
963 metric.avg_duration_ms = avg_duration_ms
964 metric.min_duration_ms = min_duration_ms
965 metric.max_duration_ms = max_duration_ms
966 metric.p50_duration_ms = p50_duration_ms
967 metric.p95_duration_ms = p95_duration_ms
968 metric.p99_duration_ms = p99_duration_ms
969 metric.metric_metadata = metric_metadata
971 db.commit()
972 db.refresh(metric)
973 return metric
976# Global log aggregator instance
977_log_aggregator: Optional[LogAggregator] = None
980def get_log_aggregator() -> LogAggregator:
981 """Get or create the global log aggregator instance.
983 Returns:
984 Global LogAggregator instance
985 """
986 global _log_aggregator # pylint: disable=global-statement
987 if _log_aggregator is None:
988 _log_aggregator = LogAggregator()
989 return _log_aggregator