Coverage for mcpgateway / services / log_aggregator.py: 97%
355 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/log_aggregator.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
6Log Aggregation Service.
8This module provides aggregation of performance metrics from structured logs
9into time-windowed statistics for analysis and monitoring.
10"""
12# Standard
13from datetime import datetime, timedelta, timezone
14import logging
15import math
16import statistics
17from typing import Any, Dict, List, Optional, Tuple
19# Third-Party
20from sqlalchemy import and_, func, select, text
21from sqlalchemy.orm import Session
23# First-Party
24from mcpgateway.config import settings
25from mcpgateway.db import engine, PerformanceMetric, SessionLocal, StructuredLogEntry
27logger = logging.getLogger(__name__)
30def _is_postgresql() -> bool:
31 """Check if the database backend is PostgreSQL.
33 Returns:
34 True if using PostgreSQL, False otherwise.
35 """
36 return engine.dialect.name == "postgresql"
39class LogAggregator:
40 """Aggregates structured logs into performance metrics."""
42 def __init__(self):
43 """Initialize log aggregator."""
44 self.aggregation_window_minutes = getattr(settings, "metrics_aggregation_window_minutes", 5)
45 self.enabled = getattr(settings, "metrics_aggregation_enabled", True)
46 self._use_sql_percentiles = _is_postgresql()
48 def aggregate_performance_metrics(
49 self, component: Optional[str], operation_type: Optional[str], window_start: Optional[datetime] = None, window_end: Optional[datetime] = None, db: Optional[Session] = None
50 ) -> Optional[PerformanceMetric]:
51 """Aggregate performance metrics for a component and operation.
53 Args:
54 component: Component name
55 operation_type: Operation name
56 window_start: Start of aggregation window (defaults to N minutes ago)
57 window_end: End of aggregation window (defaults to now)
58 db: Optional database session
60 Returns:
61 Created PerformanceMetric or None if no data
62 """
63 if not self.enabled:
64 return None
65 if not component or not operation_type:
66 return None
68 window_start, window_end = self._resolve_window_bounds(window_start, window_end)
70 should_close = False
71 if db is None:
72 db = SessionLocal()
73 should_close = True
75 try:
76 # Use SQL-based aggregation for PostgreSQL, Python fallback for SQLite
77 if self._use_sql_percentiles:
78 stats = self._compute_stats_postgresql(db, component, operation_type, window_start, window_end)
79 else:
80 stats = self._compute_stats_python(db, component, operation_type, window_start, window_end)
82 if stats is None:
83 return None
85 count = stats["count"]
86 avg_duration = stats["avg_duration"]
87 min_duration = stats["min_duration"]
88 max_duration = stats["max_duration"]
89 p50 = stats["p50"]
90 p95 = stats["p95"]
91 p99 = stats["p99"]
92 error_count = stats["error_count"]
93 error_rate = error_count / count if count > 0 else 0.0
95 metric = self._upsert_metric(
96 component=component,
97 operation_type=operation_type,
98 window_start=window_start,
99 window_end=window_end,
100 request_count=count,
101 error_count=error_count,
102 error_rate=error_rate,
103 avg_duration_ms=avg_duration,
104 min_duration_ms=min_duration,
105 max_duration_ms=max_duration,
106 p50_duration_ms=p50,
107 p95_duration_ms=p95,
108 p99_duration_ms=p99,
109 metric_metadata={
110 "sample_size": count,
111 "generated_at": datetime.now(timezone.utc).isoformat(),
112 },
113 db=db,
114 )
116 logger.info(f"Aggregated performance metrics for {component}.{operation_type}: " f"{count} requests, {avg_duration:.2f}ms avg, {error_rate:.2%} error rate")
118 if should_close:
119 db.commit() # Commit transaction on success
120 return metric
122 except Exception as e:
123 logger.error(f"Failed to aggregate performance metrics: {e}")
124 if should_close and db:
125 db.rollback()
126 return None
128 finally:
129 if should_close:
130 db.close()
132 def aggregate_all_components_batch(self, window_starts: List[datetime], window_minutes: int, db: Optional[Session] = None) -> List[PerformanceMetric]:
133 """Aggregate metrics for all components/operations for multiple windows in a single batch.
135 This reduces the number of database round-trips by fetching logs for the full
136 time span once per component/operation and partitioning them into windows in
137 Python, then upserting per-window metrics.
139 Args:
140 window_starts: List of window start datetimes (UTC)
141 window_minutes: Window size in minutes
142 db: Optional database session
144 Returns:
145 List of created/updated PerformanceMetric records
147 Raises:
148 Exception: If a database operation fails during aggregation.
149 """
150 if not self.enabled:
151 return []
153 if not window_starts:
154 return []
156 should_close = False
157 if db is None:
158 db = SessionLocal()
159 should_close = True
161 try:
162 window_delta = timedelta(minutes=window_minutes)
163 # Determine full range to query once
164 full_start = min(window_starts)
165 full_end = max(window_starts) + window_delta
167 # Validate window_starts is contiguous - warn if sparse (batch generates all windows in range)
168 expected_window_count = int((full_end - full_start).total_seconds() / (window_minutes * 60))
169 if len(window_starts) != expected_window_count:
170 logger.warning(
171 "Batch aggregation received %d windows but range spans %d; sparse windows will generate extra metrics",
172 len(window_starts),
173 expected_window_count,
174 )
176 # If PostgreSQL is available, use a single SQL rollup with generate_series and ordered-set aggregates
177 if _is_postgresql():
178 sql = text("""
179 WITH windows AS (
180 SELECT generate_series(:full_start::timestamptz, (:full_end - (:window_minutes || ' minutes')::interval)::timestamptz, (:window_minutes || ' minutes')::interval) AS window_start
181 ), pairs AS (
182 SELECT DISTINCT component, operation_type FROM structured_log_entries
183 WHERE timestamp >= :full_start AND timestamp < :full_end
184 AND duration_ms IS NOT NULL
185 AND component IS NOT NULL AND component <> ''
186 AND operation_type IS NOT NULL AND operation_type <> ''
187 )
188 SELECT
189 w.window_start,
190 p.component,
191 p.operation_type,
192 COUNT(sle.duration_ms) AS cnt,
193 AVG(sle.duration_ms) AS avg_duration,
194 MIN(sle.duration_ms) AS min_duration,
195 MAX(sle.duration_ms) AS max_duration,
196 percentile_cont(0.50) WITHIN GROUP (ORDER BY sle.duration_ms) AS p50,
197 percentile_cont(0.95) WITHIN GROUP (ORDER BY sle.duration_ms) AS p95,
198 percentile_cont(0.99) WITHIN GROUP (ORDER BY sle.duration_ms) AS p99,
199 SUM(CASE WHEN upper(sle.level) IN ('ERROR','CRITICAL') OR sle.error_details IS NOT NULL THEN 1 ELSE 0 END) AS error_count
200 FROM windows w
201 CROSS JOIN pairs p
202 JOIN structured_log_entries sle
203 ON sle.timestamp >= w.window_start AND sle.timestamp < w.window_start + (:window_minutes || ' minutes')::interval
204 AND sle.component = p.component AND sle.operation_type = p.operation_type
205 AND sle.duration_ms IS NOT NULL
206 GROUP BY w.window_start, p.component, p.operation_type
207 HAVING COUNT(sle.duration_ms) > 0
208 ORDER BY w.window_start, p.component, p.operation_type
209 """)
211 rows = db.execute(
212 sql,
213 {
214 "full_start": full_start,
215 "full_end": full_end,
216 "window_minutes": str(window_minutes),
217 },
218 ).fetchall()
220 created_metrics: List[PerformanceMetric] = []
221 for row in rows:
222 ws = row.window_start if row.window_start.tzinfo else row.window_start.replace(tzinfo=timezone.utc)
223 component = row.component
224 operation = row.operation_type
225 count = int(row.cnt)
226 avg_duration = float(row.avg_duration) if row.avg_duration is not None else 0.0
227 min_duration = float(row.min_duration) if row.min_duration is not None else 0.0
228 max_duration = float(row.max_duration) if row.max_duration is not None else 0.0
229 p50 = float(row.p50) if row.p50 is not None else 0.0
230 p95 = float(row.p95) if row.p95 is not None else 0.0
231 p99 = float(row.p99) if row.p99 is not None else 0.0
232 error_count = int(row.error_count) if row.error_count is not None else 0
234 metric = self._upsert_metric(
235 component=component,
236 operation_type=operation,
237 window_start=ws,
238 window_end=ws + window_delta,
239 request_count=count,
240 error_count=error_count,
241 error_rate=(error_count / count) if count else 0.0,
242 avg_duration_ms=avg_duration,
243 min_duration_ms=min_duration,
244 max_duration_ms=max_duration,
245 p50_duration_ms=p50,
246 p95_duration_ms=p95,
247 p99_duration_ms=p99,
248 metric_metadata={
249 "sample_size": count,
250 "generated_at": datetime.now(timezone.utc).isoformat(),
251 },
252 db=db,
253 )
254 if metric:
255 created_metrics.append(metric)
257 if should_close:
258 db.commit()
259 return created_metrics
261 # Fallback: in-Python bucketing (previous implementation)
262 # Warning: This path loads all entries into memory; for very large ranges this may spike memory usage
263 range_hours = (full_end - full_start).total_seconds() / 3600
264 if range_hours > 168: # > 1 week
265 logger.warning("Large aggregation range (%.1f hours) may cause high memory usage in non-PostgreSQL fallback", range_hours)
267 # Get unique component/operation pairs for the full range
268 pair_stmt = (
269 select(StructuredLogEntry.component, StructuredLogEntry.operation_type)
270 .where(
271 and_(
272 StructuredLogEntry.timestamp >= full_start,
273 StructuredLogEntry.timestamp < full_end,
274 StructuredLogEntry.duration_ms.isnot(None),
275 StructuredLogEntry.component.isnot(None),
276 StructuredLogEntry.component != "",
277 StructuredLogEntry.operation_type.isnot(None),
278 StructuredLogEntry.operation_type != "",
279 )
280 )
281 .distinct()
282 )
284 pairs = db.execute(pair_stmt).all()
286 created_metrics: List[PerformanceMetric] = []
288 # helper to align timestamp to window start
289 def _align_to_window_local(dt: datetime, minutes: int) -> datetime:
290 """Align a datetime to the start of its aggregation window.
292 Args:
293 dt: The datetime to align.
294 minutes: The window size in minutes.
296 Returns:
297 The datetime aligned to the start of the window.
298 """
299 ts = dt.astimezone(timezone.utc)
300 total_minutes = int(ts.timestamp() // 60)
301 aligned_minutes = (total_minutes // minutes) * minutes
302 return datetime.fromtimestamp(aligned_minutes * 60, tz=timezone.utc)
304 for component, operation in pairs:
305 if not component or not operation:
306 continue
308 # Fetch all relevant log entries for this component/operation in the full range
309 entries_stmt = select(StructuredLogEntry).where(
310 and_(
311 StructuredLogEntry.component == component,
312 StructuredLogEntry.operation_type == operation,
313 StructuredLogEntry.timestamp >= full_start,
314 StructuredLogEntry.timestamp < full_end,
315 StructuredLogEntry.duration_ms.isnot(None),
316 )
317 )
319 entries = db.execute(entries_stmt).scalars().all()
320 if not entries:
321 continue
323 # Bucket entries into windows
324 buckets: Dict[datetime, List[StructuredLogEntry]] = {}
325 for e in entries:
326 ts = e.timestamp if e.timestamp.tzinfo else e.timestamp.replace(tzinfo=timezone.utc)
327 bucket_start = _align_to_window_local(ts, window_minutes)
328 if bucket_start not in buckets:
329 buckets[bucket_start] = []
330 buckets[bucket_start].append(e)
332 # For each requested window, compute stats if we have data
333 for window_start in window_starts:
334 bucket_entries = buckets.get(window_start)
335 if not bucket_entries:
336 continue
338 durations = sorted([b.duration_ms for b in bucket_entries if b.duration_ms is not None])
339 if not durations:
340 continue
342 count = len(durations)
343 avg_duration = float(sum(durations) / count) if count else 0.0
344 min_duration = float(durations[0])
345 max_duration = float(durations[-1])
346 p50 = float(self._percentile(durations, 0.5))
347 p95 = float(self._percentile(durations, 0.95))
348 p99 = float(self._percentile(durations, 0.99))
349 error_count = self._calculate_error_count(bucket_entries)
351 try:
352 metric = self._upsert_metric(
353 component=component,
354 operation_type=operation,
355 window_start=window_start,
356 window_end=window_start + window_delta,
357 request_count=count,
358 error_count=error_count,
359 error_rate=(error_count / count) if count else 0.0,
360 avg_duration_ms=avg_duration,
361 min_duration_ms=min_duration,
362 max_duration_ms=max_duration,
363 p50_duration_ms=p50,
364 p95_duration_ms=p95,
365 p99_duration_ms=p99,
366 metric_metadata={
367 "sample_size": count,
368 "generated_at": datetime.now(timezone.utc).isoformat(),
369 },
370 db=db,
371 )
372 if metric:
373 created_metrics.append(metric)
374 except Exception:
375 logger.exception("Failed to upsert metric for %s.%s window %s", component, operation, window_start)
377 if should_close:
378 db.commit()
379 return created_metrics
380 except Exception:
381 if should_close:
382 db.rollback()
383 raise
384 finally:
385 if should_close:
386 db.close()
388 def aggregate_all_components(self, window_start: Optional[datetime] = None, window_end: Optional[datetime] = None, db: Optional[Session] = None) -> List[PerformanceMetric]:
389 """Aggregate metrics for all components and operations.
391 Args:
392 window_start: Start of aggregation window
393 window_end: End of aggregation window
394 db: Optional database session
396 Returns:
397 List of created PerformanceMetric records
399 Raises:
400 Exception: If database operation fails
401 """
402 if not self.enabled:
403 return []
405 should_close = False
406 if db is None:
407 db = SessionLocal()
408 should_close = True
410 try:
411 window_start, window_end = self._resolve_window_bounds(window_start, window_end)
413 stmt = (
414 select(StructuredLogEntry.component, StructuredLogEntry.operation_type)
415 .where(
416 and_(
417 StructuredLogEntry.timestamp >= window_start,
418 StructuredLogEntry.timestamp < window_end,
419 StructuredLogEntry.duration_ms.isnot(None),
420 StructuredLogEntry.operation_type.isnot(None),
421 )
422 )
423 .distinct()
424 )
426 pairs = db.execute(stmt).all()
428 metrics = []
429 for component, operation in pairs:
430 if component and operation:
431 metric = self.aggregate_performance_metrics(component=component, operation_type=operation, window_start=window_start, window_end=window_end, db=db)
432 if metric:
433 metrics.append(metric)
435 if should_close:
436 db.commit() # Commit on success
437 return metrics
439 except Exception:
440 if should_close:
441 db.rollback()
442 raise
444 finally:
445 if should_close:
446 db.close()
448 def get_recent_metrics(self, component: Optional[str] = None, operation: Optional[str] = None, hours: int = 24, db: Optional[Session] = None) -> List[PerformanceMetric]:
449 """Get recent performance metrics.
451 Args:
452 component: Optional component filter
453 operation: Optional operation filter
454 hours: Hours of history to retrieve
455 db: Optional database session
457 Returns:
458 List of PerformanceMetric records
460 Raises:
461 Exception: If database operation fails
462 """
463 should_close = False
464 if db is None:
465 db = SessionLocal()
466 should_close = True
468 try:
469 since = datetime.now(timezone.utc) - timedelta(hours=hours)
471 stmt = select(PerformanceMetric).where(PerformanceMetric.window_start >= since)
473 if component:
474 stmt = stmt.where(PerformanceMetric.component == component)
475 if operation:
476 stmt = stmt.where(PerformanceMetric.operation_type == operation)
478 stmt = stmt.order_by(PerformanceMetric.window_start.desc())
480 result = db.execute(stmt).scalars().all()
481 if should_close:
482 db.commit() # Commit on success
483 return result
485 except Exception:
486 if should_close:
487 db.rollback()
488 raise
490 finally:
491 if should_close:
492 db.close()
494 def get_degradation_alerts(self, threshold_multiplier: float = 1.5, hours: int = 24, db: Optional[Session] = None) -> List[Dict[str, Any]]:
495 """Identify performance degradations by comparing recent vs baseline.
497 Args:
498 threshold_multiplier: Alert if recent is X times slower than baseline
499 hours: Hours of recent data to check
500 db: Optional database session
502 Returns:
503 List of degradation alerts with details
505 Raises:
506 Exception: If database operation fails
507 """
508 should_close = False
509 if db is None:
510 db = SessionLocal()
511 should_close = True
513 try:
514 recent_cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
515 baseline_cutoff = recent_cutoff - timedelta(hours=hours * 2)
517 # Get unique component/operation pairs
518 stmt = select(PerformanceMetric.component, PerformanceMetric.operation_type).distinct()
520 pairs = db.execute(stmt).all()
522 alerts = []
523 for component, operation in pairs:
524 # Get recent metrics
525 recent_stmt = select(PerformanceMetric).where(
526 and_(PerformanceMetric.component == component, PerformanceMetric.operation_type == operation, PerformanceMetric.window_start >= recent_cutoff)
527 )
528 recent_metrics = db.execute(recent_stmt).scalars().all()
530 # Get baseline metrics
531 baseline_stmt = select(PerformanceMetric).where(
532 and_(
533 PerformanceMetric.component == component,
534 PerformanceMetric.operation_type == operation,
535 PerformanceMetric.window_start >= baseline_cutoff,
536 PerformanceMetric.window_start < recent_cutoff,
537 )
538 )
539 baseline_metrics = db.execute(baseline_stmt).scalars().all()
541 if not recent_metrics or not baseline_metrics:
542 continue
544 recent_avg = statistics.mean([m.avg_duration_ms for m in recent_metrics])
545 baseline_avg = statistics.mean([m.avg_duration_ms for m in baseline_metrics])
547 if recent_avg > baseline_avg * threshold_multiplier:
548 alerts.append(
549 {
550 "component": component,
551 "operation": operation,
552 "recent_avg_ms": recent_avg,
553 "baseline_avg_ms": baseline_avg,
554 "degradation_ratio": recent_avg / baseline_avg,
555 "recent_error_rate": statistics.mean([m.error_rate for m in recent_metrics]),
556 "baseline_error_rate": statistics.mean([m.error_rate for m in baseline_metrics]),
557 }
558 )
560 if should_close:
561 db.commit() # Commit on success
562 return alerts
564 except Exception:
565 if should_close:
566 db.rollback()
567 raise
569 finally:
570 if should_close:
571 db.close()
573 def backfill(self, hours: float, db: Optional[Session] = None) -> int:
574 """Backfill metrics for a historical time range.
576 Args:
577 hours: Number of hours of history to aggregate (supports fractional hours)
578 db: Optional shared database session
580 Returns:
581 Count of performance metric windows processed
583 Raises:
584 Exception: If database operation fails
585 """
586 if not self.enabled or hours <= 0:
587 return 0
589 window_minutes = self.aggregation_window_minutes
590 window_delta = timedelta(minutes=window_minutes)
591 total_windows = max(1, math.ceil((hours * 60) / window_minutes))
593 should_close = False
594 if db is None:
595 db = SessionLocal()
596 should_close = True
598 try:
599 _, latest_end = self._resolve_window_bounds(None, None)
600 current_start = latest_end - (window_delta * total_windows)
601 processed = 0
603 while current_start < latest_end:
604 current_end = current_start + window_delta
605 created = self.aggregate_all_components(
606 window_start=current_start,
607 window_end=current_end,
608 db=db,
609 )
610 if created:
611 processed += 1
612 current_start = current_end
614 if should_close:
615 db.commit() # Commit on success
616 return processed
618 except Exception:
619 if should_close:
620 db.rollback()
621 raise
623 finally:
624 if should_close:
625 db.close()
627 @staticmethod
628 def _percentile(sorted_values: List[float], percentile: float) -> float:
629 """Calculate percentile from sorted values.
631 Args:
632 sorted_values: Sorted list of values
633 percentile: Percentile to calculate (0.0 to 1.0)
635 Returns:
636 float: Calculated percentile value
637 """
638 if not sorted_values:
639 return 0.0
641 if len(sorted_values) == 1:
642 return float(sorted_values[0])
644 k = (len(sorted_values) - 1) * percentile
645 f = math.floor(k)
646 c = math.ceil(k)
648 if f == c:
649 return float(sorted_values[int(k)])
651 d0 = sorted_values[f] * (c - k)
652 d1 = sorted_values[c] * (k - f)
653 return float(d0 + d1)
655 @staticmethod
656 def _calculate_error_count(entries: List[StructuredLogEntry]) -> int:
657 """Calculate error occurrences for a batch of log entries.
659 Args:
660 entries: List of log entries to analyze
662 Returns:
663 int: Count of error entries
664 """
665 error_levels = {"ERROR", "CRITICAL"}
666 return sum(1 for entry in entries if (entry.level and entry.level.upper() in error_levels) or entry.error_details)
668 def _compute_stats_postgresql(
669 self,
670 db: Session,
671 component: str,
672 operation_type: str,
673 window_start: datetime,
674 window_end: datetime,
675 ) -> Optional[Dict[str, Any]]:
676 """Compute aggregation statistics using PostgreSQL SQL functions.
678 Uses PostgreSQL's percentile_cont for efficient in-database percentile
679 computation, avoiding loading all rows into Python memory.
681 Args:
682 db: Database session
683 component: Component name to filter by
684 operation_type: Operation type to filter by
685 window_start: Start of the aggregation window
686 window_end: End of the aggregation window
688 Returns:
689 Dictionary with count, avg_duration, min_duration, max_duration,
690 p50, p95, p99, and error_count, or None if no data.
691 """
692 # Build base filter conditions
693 base_conditions = and_(
694 StructuredLogEntry.component == component,
695 StructuredLogEntry.operation_type == operation_type,
696 StructuredLogEntry.timestamp >= window_start,
697 StructuredLogEntry.timestamp < window_end,
698 StructuredLogEntry.duration_ms.isnot(None),
699 )
701 # First, check if there are any rows and get error count
702 # (error count requires examining level/error_details which can't be done purely in SQL aggregate)
703 count_stmt = select(func.count()).select_from(StructuredLogEntry).where(base_conditions) # pylint: disable=not-callable
704 count_result = db.execute(count_stmt).scalar()
706 if not count_result or count_result == 0:
707 return None
709 # PostgreSQL percentile_cont query using ordered-set aggregate functions
710 # This computes all statistics in a single query
711 stats_sql = text("""
712 SELECT
713 COUNT(duration_ms) as cnt,
714 AVG(duration_ms) as avg_duration,
715 MIN(duration_ms) as min_duration,
716 MAX(duration_ms) as max_duration,
717 percentile_cont(0.50) WITHIN GROUP (ORDER BY duration_ms) as p50,
718 percentile_cont(0.95) WITHIN GROUP (ORDER BY duration_ms) as p95,
719 percentile_cont(0.99) WITHIN GROUP (ORDER BY duration_ms) as p99
720 FROM structured_log_entries
721 WHERE component = :component
722 AND operation_type = :operation_type
723 AND timestamp >= :window_start
724 AND timestamp < :window_end
725 AND duration_ms IS NOT NULL
726 """)
728 result = db.execute(
729 stats_sql,
730 {
731 "component": component,
732 "operation_type": operation_type,
733 "window_start": window_start,
734 "window_end": window_end,
735 },
736 ).fetchone()
738 if not result or result.cnt == 0:
739 return None
741 # Get error count separately (requires level/error_details examination)
742 error_stmt = (
743 select(func.count()) # pylint: disable=not-callable
744 .select_from(StructuredLogEntry)
745 .where(
746 and_(
747 base_conditions,
748 ((func.upper(StructuredLogEntry.level).in_(["ERROR", "CRITICAL"])) | (StructuredLogEntry.error_details.isnot(None))),
749 )
750 )
751 )
752 error_count = db.execute(error_stmt).scalar() or 0
754 return {
755 "count": result.cnt,
756 "avg_duration": float(result.avg_duration) if result.avg_duration else 0.0,
757 "min_duration": float(result.min_duration) if result.min_duration else 0.0,
758 "max_duration": float(result.max_duration) if result.max_duration else 0.0,
759 "p50": float(result.p50) if result.p50 else 0.0,
760 "p95": float(result.p95) if result.p95 else 0.0,
761 "p99": float(result.p99) if result.p99 else 0.0,
762 "error_count": error_count,
763 }
765 def _compute_stats_python(
766 self,
767 db: Session,
768 component: str,
769 operation_type: str,
770 window_start: datetime,
771 window_end: datetime,
772 ) -> Optional[Dict[str, Any]]:
773 """Compute aggregation statistics using Python (fallback for SQLite).
775 Loads duration values into memory and computes statistics in Python.
776 Used when database doesn't support native percentile functions.
778 Args:
779 db: Database session
780 component: Component name to filter by
781 operation_type: Operation type to filter by
782 window_start: Start of the aggregation window
783 window_end: End of the aggregation window
785 Returns:
786 Dictionary with count, avg_duration, min_duration, max_duration,
787 p50, p95, p99, and error_count, or None if no data.
788 """
789 # Query structured logs for this component/operation in time window
790 stmt = select(StructuredLogEntry).where(
791 and_(
792 StructuredLogEntry.component == component,
793 StructuredLogEntry.operation_type == operation_type,
794 StructuredLogEntry.timestamp >= window_start,
795 StructuredLogEntry.timestamp < window_end,
796 StructuredLogEntry.duration_ms.isnot(None),
797 )
798 )
800 results = db.execute(stmt).scalars().all()
802 if not results:
803 return None
805 # Extract durations
806 durations = sorted(r.duration_ms for r in results if r.duration_ms is not None)
808 if not durations:
809 return None
811 # Calculate statistics
812 count = len(durations)
813 avg_duration = statistics.fmean(durations) if hasattr(statistics, "fmean") else statistics.mean(durations)
814 min_duration = durations[0]
815 max_duration = durations[-1]
817 # Calculate percentiles
818 p50 = self._percentile(durations, 0.50)
819 p95 = self._percentile(durations, 0.95)
820 p99 = self._percentile(durations, 0.99)
822 # Count errors
823 error_count = self._calculate_error_count(results)
825 return {
826 "count": count,
827 "avg_duration": avg_duration,
828 "min_duration": min_duration,
829 "max_duration": max_duration,
830 "p50": p50,
831 "p95": p95,
832 "p99": p99,
833 "error_count": error_count,
834 }
836 def _resolve_window_bounds(
837 self,
838 window_start: Optional[datetime],
839 window_end: Optional[datetime],
840 ) -> Tuple[datetime, datetime]:
841 """Resolve and normalize aggregation window bounds.
843 Args:
844 window_start: Start of window or None to calculate
845 window_end: End of window or None for current time
847 Returns:
848 Tuple[datetime, datetime]: Resolved window start and end
849 """
850 window_delta = timedelta(minutes=self.aggregation_window_minutes)
852 if window_start is not None and window_end is not None:
853 resolved_start = window_start.astimezone(timezone.utc)
854 resolved_end = window_end.astimezone(timezone.utc)
855 if resolved_end <= resolved_start:
856 resolved_end = resolved_start + window_delta
857 return resolved_start, resolved_end
859 if window_end is None:
860 reference = datetime.now(timezone.utc)
861 else:
862 reference = window_end.astimezone(timezone.utc)
864 reference = reference.replace(second=0, microsecond=0)
865 minutes_offset = reference.minute % self.aggregation_window_minutes
866 if window_end is None and minutes_offset:
867 reference = reference - timedelta(minutes=minutes_offset)
869 resolved_end = reference
871 if window_start is None:
872 resolved_start = resolved_end - window_delta
873 else:
874 resolved_start = window_start.astimezone(timezone.utc)
876 if resolved_end <= resolved_start:
877 resolved_start = resolved_end - window_delta
879 return resolved_start, resolved_end
881 def _upsert_metric(
882 self,
883 component: str,
884 operation_type: str,
885 window_start: datetime,
886 window_end: datetime,
887 request_count: int,
888 error_count: int,
889 error_rate: float,
890 avg_duration_ms: float,
891 min_duration_ms: float,
892 max_duration_ms: float,
893 p50_duration_ms: float,
894 p95_duration_ms: float,
895 p99_duration_ms: float,
896 metric_metadata: Optional[Dict[str, Any]],
897 db: Session,
898 ) -> PerformanceMetric:
899 """Create or update a performance metric window.
901 Args:
902 component: Component name
903 operation_type: Operation type
904 window_start: Window start time
905 window_end: Window end time
906 request_count: Total request count
907 error_count: Total error count
908 error_rate: Error rate (0.0-1.0)
909 avg_duration_ms: Average duration in milliseconds
910 min_duration_ms: Minimum duration in milliseconds
911 max_duration_ms: Maximum duration in milliseconds
912 p50_duration_ms: 50th percentile duration
913 p95_duration_ms: 95th percentile duration
914 p99_duration_ms: 99th percentile duration
915 metric_metadata: Additional metadata
916 db: Database session
918 Returns:
919 PerformanceMetric: Created or updated metric
920 """
922 existing_stmt = select(PerformanceMetric).where(
923 and_(
924 PerformanceMetric.component == component,
925 PerformanceMetric.operation_type == operation_type,
926 PerformanceMetric.window_start == window_start,
927 PerformanceMetric.window_end == window_end,
928 )
929 )
931 existing_metrics = db.execute(existing_stmt).scalars().all()
932 metric = existing_metrics[0] if existing_metrics else None
934 if len(existing_metrics) > 1:
935 logger.warning(
936 "Found %s duplicate performance metric rows for %s.%s window %s-%s; pruning extras",
937 len(existing_metrics),
938 component,
939 operation_type,
940 window_start.isoformat(),
941 window_end.isoformat(),
942 )
943 for duplicate in existing_metrics[1:]:
944 db.delete(duplicate)
946 if metric is None:
947 metric = PerformanceMetric(
948 component=component,
949 operation_type=operation_type,
950 window_start=window_start,
951 window_end=window_end,
952 window_duration_seconds=int((window_end - window_start).total_seconds()),
953 )
954 db.add(metric)
956 metric.request_count = request_count
957 metric.error_count = error_count
958 metric.error_rate = error_rate
959 metric.avg_duration_ms = avg_duration_ms
960 metric.min_duration_ms = min_duration_ms
961 metric.max_duration_ms = max_duration_ms
962 metric.p50_duration_ms = p50_duration_ms
963 metric.p95_duration_ms = p95_duration_ms
964 metric.p99_duration_ms = p99_duration_ms
965 metric.metric_metadata = metric_metadata
967 db.commit()
968 db.refresh(metric)
969 return metric
972# Global log aggregator instance
973_log_aggregator: Optional[LogAggregator] = None
976def get_log_aggregator() -> LogAggregator:
977 """Get or create the global log aggregator instance.
979 Returns:
980 Global LogAggregator instance
981 """
982 global _log_aggregator # pylint: disable=global-statement
983 if _log_aggregator is None:
984 _log_aggregator = LogAggregator()
985 return _log_aggregator