Coverage for mcpgateway / services / performance_service.py: 95%
344 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/performance_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
6Performance Monitoring Service.
8This module provides comprehensive system and application performance monitoring
9for the MCP Gateway. It collects metrics from:
10- System resources (CPU, memory, disk, network) via psutil
11- Gunicorn workers and processes
12- Database connection pools
13- Redis cache (when available)
14- HTTP request statistics from Prometheus metrics
16The service supports both single-instance and distributed deployments,
17with optional Redis-based metric aggregation for multi-worker environments.
18"""
20# Standard
21from datetime import datetime, timedelta, timezone
22import logging
23import os
24import socket
25import threading
26import time
27from typing import Dict, List, Optional
29# Third-Party
30from sqlalchemy import delete, desc
31from sqlalchemy.orm import Session
33# First-Party
34from mcpgateway.config import settings
35from mcpgateway.db import PerformanceAggregate, PerformanceSnapshot
36from mcpgateway.schemas import (
37 CacheMetricsSchema,
38 DatabaseMetricsSchema,
39 GunicornMetricsSchema,
40 PerformanceAggregateRead,
41 PerformanceDashboard,
42 PerformanceHistoryResponse,
43 RequestMetricsSchema,
44 SystemMetricsSchema,
45 WorkerMetrics,
46)
47from mcpgateway.utils.redis_client import get_redis_client
49# Cache import (lazy to avoid circular dependencies)
50_ADMIN_STATS_CACHE = None
53def _get_admin_stats_cache():
54 """Get admin stats cache singleton lazily.
56 Returns:
57 AdminStatsCache instance.
58 """
59 global _ADMIN_STATS_CACHE # pylint: disable=global-statement
60 if _ADMIN_STATS_CACHE is None:
61 # First-Party
62 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
64 _ADMIN_STATS_CACHE = admin_stats_cache
65 return _ADMIN_STATS_CACHE
68# Optional psutil import
69try:
70 # Third-Party
71 import psutil
73 PSUTIL_AVAILABLE = True
74except ImportError:
75 psutil = None # type: ignore
76 PSUTIL_AVAILABLE = False
78# Optional redis import
79try:
80 # Third-Party
81 import redis.asyncio as aioredis
83 REDIS_AVAILABLE = True
84except ImportError:
85 aioredis = None # type: ignore
86 REDIS_AVAILABLE = False
88# Optional prometheus_client import
89try:
90 # Third-Party
91 from prometheus_client import REGISTRY
93 PROMETHEUS_AVAILABLE = True
94except ImportError:
95 REGISTRY = None # type: ignore
96 PROMETHEUS_AVAILABLE = False
98logger = logging.getLogger(__name__)
100# Track application start time
101APP_START_TIME = time.time()
102HOSTNAME = socket.gethostname()
104# Cache for net_connections (throttled to reduce CPU usage)
105_net_connections_cache: int = 0
106_net_connections_cache_time: float = 0.0
107_net_connections_lock = threading.Lock()
110class PerformanceService:
111 """
112 Service for collecting and managing performance metrics.
114 Provides methods for:
115 - Real-time metric collection from system and application
116 - Historical metric storage and retrieval
117 - Metric aggregation for trend analysis
118 - Worker process discovery and monitoring
119 """
121 def __init__(self, db: Optional[Session] = None):
122 """Initialize the performance service.
124 Args:
125 db: Optional SQLAlchemy database session.
126 """
127 self.db = db
128 self._request_count_cache: Dict[str, int] = {}
129 self._last_request_time = time.time()
131 def _get_net_connections_cached(self) -> int:
132 """Get network connections count with caching to reduce CPU usage.
134 Uses module-level cache with configurable TTL to throttle expensive
135 psutil.net_connections() calls. Thread-safe with double-check locking.
137 Returns:
138 int: Number of active network connections, or 0 if disabled/unavailable.
139 """
140 global _net_connections_cache, _net_connections_cache_time # pylint: disable=global-statement
142 # Check if net_connections tracking is disabled
143 if not settings.mcpgateway_performance_net_connections_enabled:
144 return 0
146 if not PSUTIL_AVAILABLE or psutil is None:
147 return 0
149 current_time = time.time()
150 cache_ttl = settings.mcpgateway_performance_net_connections_cache_ttl
152 # Return cached value if still valid (fast path, no lock needed)
153 if current_time - _net_connections_cache_time < cache_ttl:
154 return _net_connections_cache
156 # Use lock for cache refresh to prevent concurrent expensive calls
157 with _net_connections_lock:
158 # Double-check after acquiring lock (another thread may have refreshed)
159 # Re-read current time in case we waited on the lock
160 current_time = time.time()
161 if current_time - _net_connections_cache_time < cache_ttl:
162 return _net_connections_cache
164 # Refresh the cache
165 try:
166 _net_connections_cache = len(psutil.net_connections(kind="inet"))
167 except (psutil.AccessDenied, OSError) as e:
168 logger.debug("Could not get net_connections: %s", e)
169 # Keep stale cache value on error (don't update _net_connections_cache)
171 # Update cache time after the call to anchor TTL to actual refresh time
172 _net_connections_cache_time = time.time()
174 return _net_connections_cache
176 def get_system_metrics(self) -> SystemMetricsSchema:
177 """Collect current system metrics using psutil.
179 Returns:
180 SystemMetricsSchema: Current system resource metrics.
181 """
182 if not PSUTIL_AVAILABLE or psutil is None:
183 # Return empty metrics if psutil not available
184 return SystemMetricsSchema(
185 cpu_percent=0.0,
186 cpu_count=os.cpu_count() or 1,
187 memory_total_mb=0,
188 memory_used_mb=0,
189 memory_available_mb=0,
190 memory_percent=0.0,
191 disk_total_gb=0.0,
192 disk_used_gb=0.0,
193 disk_percent=0.0,
194 )
196 # CPU metrics
197 cpu_percent = psutil.cpu_percent(interval=0.1)
198 cpu_count = psutil.cpu_count(logical=True) or 1
199 cpu_freq = psutil.cpu_freq()
200 cpu_freq_mhz = round(cpu_freq.current) if cpu_freq else None
202 # Load average (Unix only)
203 try:
204 load_1, load_5, load_15 = os.getloadavg()
205 except (AttributeError, OSError):
206 load_1, load_5, load_15 = None, None, None
208 # Memory metrics
209 vm = psutil.virtual_memory()
210 swap = psutil.swap_memory()
212 # Disk metrics
213 root = os.getenv("SystemDrive", "C:\\") if os.name == "nt" else "/"
214 disk = psutil.disk_usage(str(root))
216 # Network metrics
217 net_io = psutil.net_io_counters()
218 net_connections = self._get_net_connections_cached()
220 # Boot time
221 boot_time = datetime.fromtimestamp(psutil.boot_time(), tz=timezone.utc)
223 return SystemMetricsSchema(
224 cpu_percent=cpu_percent,
225 cpu_count=cpu_count,
226 cpu_freq_mhz=cpu_freq_mhz,
227 load_avg_1m=round(load_1, 2) if load_1 is not None else None,
228 load_avg_5m=round(load_5, 2) if load_5 is not None else None,
229 load_avg_15m=round(load_15, 2) if load_15 is not None else None,
230 memory_total_mb=round(vm.total / 1_048_576),
231 memory_used_mb=round(vm.used / 1_048_576),
232 memory_available_mb=round(vm.available / 1_048_576),
233 memory_percent=vm.percent,
234 swap_total_mb=round(swap.total / 1_048_576),
235 swap_used_mb=round(swap.used / 1_048_576),
236 disk_total_gb=round(disk.total / 1_073_741_824, 2),
237 disk_used_gb=round(disk.used / 1_073_741_824, 2),
238 disk_percent=disk.percent,
239 network_bytes_sent=net_io.bytes_sent,
240 network_bytes_recv=net_io.bytes_recv,
241 network_connections=net_connections,
242 boot_time=boot_time,
243 )
245 def get_worker_metrics(self) -> List[WorkerMetrics]:
246 """Discover and collect metrics from Gunicorn worker processes.
248 Returns:
249 List[WorkerMetrics]: Metrics for each worker process.
250 """
251 workers: List[WorkerMetrics] = []
253 if not PSUTIL_AVAILABLE or psutil is None:
254 return workers
256 current_pid = os.getpid()
257 current_proc = psutil.Process(current_pid)
259 # Try to find Gunicorn master by looking at parent
260 try:
261 parent = current_proc.parent()
262 if parent and "gunicorn" in parent.name().lower():
263 # We're in a Gunicorn worker, get all siblings
264 for child in parent.children(recursive=False):
265 workers.append(self._get_process_metrics(child))
266 else:
267 # Not in Gunicorn, just report current process
268 workers.append(self._get_process_metrics(current_proc))
269 except (psutil.NoSuchProcess, psutil.AccessDenied):
270 # Fallback to current process only
271 workers.append(self._get_process_metrics(current_proc))
273 return workers
275 def _get_process_metrics(self, proc: "psutil.Process") -> WorkerMetrics:
276 """Get metrics for a specific process.
278 Args:
279 proc: psutil Process object.
281 Returns:
282 WorkerMetrics: Metrics for the process.
283 """
284 try:
285 with proc.oneshot():
286 mem_info = proc.memory_info()
287 create_time = datetime.fromtimestamp(proc.create_time(), tz=timezone.utc)
288 uptime = int(time.time() - proc.create_time())
290 # Get file descriptors (Unix only)
291 try:
292 open_fds = proc.num_fds()
293 except (AttributeError, psutil.AccessDenied):
294 open_fds = None
296 # Get connections
297 try:
298 connection_fetcher = getattr(proc, "net_connections", None) or proc.connections
299 connections = len(connection_fetcher(kind="inet"))
300 except (psutil.AccessDenied, psutil.NoSuchProcess):
301 connections = 0
303 return WorkerMetrics(
304 pid=proc.pid,
305 cpu_percent=proc.cpu_percent(interval=0.1),
306 memory_rss_mb=round(mem_info.rss / 1_048_576, 2),
307 memory_vms_mb=round(mem_info.vms / 1_048_576, 2),
308 threads=proc.num_threads(),
309 connections=connections,
310 open_fds=open_fds,
311 status=proc.status(),
312 create_time=create_time,
313 uptime_seconds=uptime,
314 )
315 except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
316 logger.warning(f"Could not get metrics for process {proc.pid}: {e}")
317 return WorkerMetrics(
318 pid=proc.pid,
319 cpu_percent=0.0,
320 memory_rss_mb=0.0,
321 memory_vms_mb=0.0,
322 threads=0,
323 status="unknown",
324 )
326 def get_gunicorn_metrics(self) -> GunicornMetricsSchema:
327 """Collect Gunicorn-specific metrics.
329 Returns:
330 GunicornMetricsSchema: Gunicorn server metrics.
331 """
332 if not PSUTIL_AVAILABLE or psutil is None:
333 return GunicornMetricsSchema()
335 current_pid = os.getpid()
336 current_proc = psutil.Process(current_pid)
338 try:
339 parent = current_proc.parent()
340 if parent and "gunicorn" in parent.name().lower():
341 # We're in a Gunicorn worker
342 children = parent.children(recursive=False)
343 workers_total = len(children)
345 # Count active workers (those with connections or recent CPU activity)
346 workers_active = 0
347 for child in children:
348 try:
349 if child.cpu_percent(interval=0) > 0:
350 workers_active += 1
351 except (psutil.NoSuchProcess, psutil.AccessDenied):
352 pass
354 return GunicornMetricsSchema(
355 master_pid=parent.pid,
356 workers_total=workers_total,
357 workers_active=workers_active,
358 workers_idle=workers_total - workers_active,
359 max_requests=10000, # Default from gunicorn.config.py
360 )
361 except (psutil.NoSuchProcess, psutil.AccessDenied):
362 pass
364 # Not running under Gunicorn or can't determine
365 return GunicornMetricsSchema(
366 master_pid=None,
367 workers_total=1,
368 workers_active=1,
369 workers_idle=0,
370 )
372 def get_request_metrics(self) -> RequestMetricsSchema:
373 """Collect HTTP request metrics from Prometheus.
375 Returns:
376 RequestMetricsSchema: HTTP request performance metrics.
377 """
378 metrics = RequestMetricsSchema()
380 if not PROMETHEUS_AVAILABLE or REGISTRY is None:
381 return metrics
383 try:
384 # Try to get metrics from Prometheus registry
385 for metric in REGISTRY.collect():
386 if metric.name == "http_requests_total":
387 for sample in metric.samples:
388 if sample.name == "http_requests_total": 388 ↛ 387line 388 didn't jump to line 387 because the condition on line 388 was always true
389 # prometheus_fastapi_instrumentator uses 'status' label, not 'status_code'
390 status = sample.labels.get("status", sample.labels.get("status_code", ""))
391 value = int(sample.value)
392 metrics.requests_total += value
394 if status.startswith("1"):
395 metrics.requests_1xx += value
396 elif status.startswith("2"):
397 metrics.requests_2xx += value
398 elif status.startswith("3"):
399 metrics.requests_3xx += value
400 elif status.startswith("4"):
401 metrics.requests_4xx += value
402 elif status.startswith("5"): 402 ↛ 387line 402 didn't jump to line 387 because the condition on line 402 was always true
403 metrics.requests_5xx += value
405 elif metric.name == "http_request_duration_seconds": 405 ↛ 385line 405 didn't jump to line 385 because the condition on line 405 was always true
406 # Extract histogram data for percentiles
407 sum_val = 0.0
408 count_val = 0
409 for sample in metric.samples:
410 if sample.name.endswith("_sum"):
411 sum_val = sample.value
412 elif sample.name.endswith("_count"): 412 ↛ 409line 412 didn't jump to line 409 because the condition on line 412 was always true
413 count_val = int(sample.value)
415 if count_val > 0: 415 ↛ 385line 415 didn't jump to line 385 because the condition on line 415 was always true
416 metrics.response_time_avg_ms = round((sum_val / count_val) * 1000, 2)
418 # Calculate error rate
419 if metrics.requests_total > 0:
420 error_count = metrics.requests_4xx + metrics.requests_5xx
421 metrics.error_rate = round((error_count / metrics.requests_total) * 100, 2)
423 # Calculate requests per second
424 current_time = time.time()
425 elapsed = current_time - self._last_request_time
426 if elapsed > 0: 426 ↛ 436line 426 didn't jump to line 436 because the condition on line 426 was always true
427 prev_total = self._request_count_cache.get("total", 0)
428 if prev_total > 0:
429 metrics.requests_per_second = round((metrics.requests_total - prev_total) / elapsed, 2)
430 self._request_count_cache["total"] = metrics.requests_total
431 self._last_request_time = current_time
433 except Exception as e:
434 logger.warning(f"Error collecting Prometheus metrics: {e}")
436 return metrics
438 def get_database_metrics(self, _db: Optional[Session] = None) -> DatabaseMetricsSchema:
439 """Collect database connection pool metrics.
441 Args:
442 _db: Optional SQLAlchemy session (unused, engine imported directly).
444 Returns:
445 DatabaseMetricsSchema: Database connection pool metrics.
446 """
447 metrics = DatabaseMetricsSchema()
449 try:
450 # Import engine from db module (lazy import to avoid circular dependency)
451 # First-Party
452 from mcpgateway.db import engine # pylint: disable=import-outside-toplevel
454 pool = engine.pool
455 if pool: 455 ↛ 463line 455 didn't jump to line 463 because the condition on line 455 was always true
456 metrics.pool_size = pool.size()
457 metrics.connections_in_use = pool.checkedout()
458 metrics.connections_available = pool.checkedin()
459 metrics.overflow = pool.overflow()
460 except Exception as e:
461 logger.warning(f"Error collecting database metrics: {e}")
463 return metrics
465 async def get_cache_metrics(self) -> CacheMetricsSchema:
466 """Collect Redis cache metrics.
468 Returns:
469 CacheMetricsSchema: Redis cache metrics.
470 """
471 metrics = CacheMetricsSchema()
473 if not REDIS_AVAILABLE or aioredis is None:
474 return metrics
476 if not settings.redis_url or settings.cache_type.lower() != "redis":
477 return metrics
479 try:
480 # Use shared Redis client from factory
481 client = await get_redis_client()
482 if not client:
483 return metrics
485 info = await client.info()
487 metrics.connected = True
488 metrics.version = info.get("redis_version")
489 metrics.used_memory_mb = round(info.get("used_memory", 0) / 1_048_576, 2)
490 metrics.connected_clients = info.get("connected_clients", 0)
491 metrics.ops_per_second = info.get("instantaneous_ops_per_sec", 0)
493 # Cache hit rate
494 hits = info.get("keyspace_hits", 0)
495 misses = info.get("keyspace_misses", 0)
496 metrics.keyspace_hits = hits
497 metrics.keyspace_misses = misses
499 total = hits + misses
500 if total > 0: 500 ↛ 508line 500 didn't jump to line 508 because the condition on line 500 was always true
501 metrics.hit_rate = round((hits / total) * 100, 2)
503 # Don't close the shared client
504 except Exception as e:
505 logger.warning(f"Error collecting Redis metrics: {e}")
506 metrics.connected = False
508 return metrics
510 async def get_dashboard(self) -> PerformanceDashboard:
511 """Collect all metrics for the performance dashboard.
513 Returns:
514 PerformanceDashboard: Complete dashboard data.
515 """
516 uptime = int(time.time() - APP_START_TIME)
518 # Collect all metrics
519 system = self.get_system_metrics()
520 requests = self.get_request_metrics()
521 database = self.get_database_metrics(self.db)
522 cache = await self.get_cache_metrics()
523 gunicorn = self.get_gunicorn_metrics()
524 workers = self.get_worker_metrics()
526 return PerformanceDashboard(
527 timestamp=datetime.now(timezone.utc),
528 uptime_seconds=uptime,
529 host=HOSTNAME,
530 system=system,
531 requests=requests,
532 database=database,
533 cache=cache,
534 gunicorn=gunicorn,
535 workers=workers,
536 cluster_hosts=[HOSTNAME],
537 is_distributed=settings.mcpgateway_performance_distributed,
538 )
540 def save_snapshot(self, db: Session) -> Optional[PerformanceSnapshot]:
541 """Save current metrics as a snapshot.
543 Args:
544 db: SQLAlchemy database session.
546 Returns:
547 PerformanceSnapshot: The saved snapshot, or None on error.
548 """
549 try:
550 # Collect current metrics
551 system = self.get_system_metrics()
552 requests = self.get_request_metrics()
553 database = self.get_database_metrics(db)
554 gunicorn = self.get_gunicorn_metrics()
555 workers = self.get_worker_metrics()
557 # Serialize to JSON
558 metrics_json = {
559 "system": system.model_dump(),
560 "requests": requests.model_dump(),
561 "database": database.model_dump(),
562 "gunicorn": gunicorn.model_dump(),
563 "workers": [w.model_dump() for w in workers],
564 }
566 # Convert datetime to ISO format strings for JSON serialization
567 if metrics_json["system"].get("boot_time"):
568 metrics_json["system"]["boot_time"] = metrics_json["system"]["boot_time"].isoformat()
569 for worker in metrics_json["workers"]:
570 if worker.get("create_time"): 570 ↛ 569line 570 didn't jump to line 569 because the condition on line 570 was always true
571 worker["create_time"] = worker["create_time"].isoformat()
573 snapshot = PerformanceSnapshot(
574 host=HOSTNAME,
575 worker_id=str(os.getpid()),
576 metrics_json=metrics_json,
577 )
578 db.add(snapshot)
579 db.commit()
580 db.refresh(snapshot)
582 return snapshot
583 except Exception as e:
584 logger.error(f"Error saving performance snapshot: {e}")
585 db.rollback()
586 return None
588 def cleanup_old_snapshots(self, db: Session) -> int:
589 """Delete snapshots older than retention period.
591 Args:
592 db: SQLAlchemy database session.
594 Returns:
595 int: Number of deleted snapshots.
596 """
597 try:
598 cutoff = datetime.now(timezone.utc) - timedelta(hours=settings.mcpgateway_performance_retention_hours)
600 result = db.execute(delete(PerformanceSnapshot).where(PerformanceSnapshot.timestamp < cutoff))
601 deleted = result.rowcount
602 db.commit()
604 if deleted > 0: 604 ↛ 607line 604 didn't jump to line 607 because the condition on line 604 was always true
605 logger.info(f"Cleaned up {deleted} old performance snapshots")
607 return deleted
608 except Exception as e:
609 logger.error(f"Error cleaning up snapshots: {e}")
610 db.rollback()
611 return 0
613 async def get_history(
614 self,
615 db: Session,
616 period_type: str = "hourly",
617 start_time: Optional[datetime] = None,
618 end_time: Optional[datetime] = None,
619 host: Optional[str] = None,
620 limit: int = 168,
621 ) -> PerformanceHistoryResponse:
622 """Get historical performance aggregates.
624 Args:
625 db: SQLAlchemy database session.
626 period_type: Aggregation period (hourly, daily).
627 start_time: Start of time range.
628 end_time: End of time range.
629 host: Filter by host.
630 limit: Maximum results.
632 Returns:
633 PerformanceHistoryResponse: Historical aggregates.
634 """
635 # Build cache key from parameters
636 start_str = start_time.isoformat() if start_time else "none"
637 end_str = end_time.isoformat() if end_time else "none"
638 host_str = host or "all"
639 cache_key = f"{period_type}:{start_str}:{end_str}:{host_str}:{limit}"
641 # Check cache first
642 cache = _get_admin_stats_cache()
643 cached = await cache.get_performance_history(cache_key)
644 if cached is not None:
645 return PerformanceHistoryResponse.model_validate(cached)
647 query = db.query(PerformanceAggregate).filter(PerformanceAggregate.period_type == period_type)
649 if start_time:
650 query = query.filter(PerformanceAggregate.period_start >= start_time)
651 if end_time:
652 query = query.filter(PerformanceAggregate.period_end <= end_time)
653 if host:
654 query = query.filter(PerformanceAggregate.host == host)
656 total_count = query.count()
657 aggregates = query.order_by(desc(PerformanceAggregate.period_start)).limit(limit).all()
659 result = PerformanceHistoryResponse(
660 aggregates=[PerformanceAggregateRead.model_validate(a) for a in aggregates],
661 period_type=period_type,
662 total_count=total_count,
663 )
665 # Store in cache
666 await cache.set_performance_history(result.model_dump(), cache_key)
668 return result
670 def create_hourly_aggregate(self, db: Session, hour_start: datetime) -> Optional[PerformanceAggregate]:
671 """Create an hourly aggregate from snapshots.
673 Args:
674 db: SQLAlchemy database session.
675 hour_start: Start of the hour to aggregate.
677 Returns:
678 PerformanceAggregate: The created aggregate, or None on error.
679 """
680 hour_end = hour_start + timedelta(hours=1)
682 try:
683 # Get snapshots for this hour
684 snapshots = db.query(PerformanceSnapshot).filter(PerformanceSnapshot.timestamp >= hour_start, PerformanceSnapshot.timestamp < hour_end).all()
686 if not snapshots:
687 return None
689 # Aggregate metrics
690 total_requests = 0
691 total_2xx = 0
692 total_4xx = 0
693 total_5xx = 0
694 response_times: List[float] = []
695 request_rates: List[float] = []
696 cpu_percents: List[float] = []
697 memory_percents: List[float] = []
699 for snapshot in snapshots:
700 metrics = snapshot.metrics_json
701 req = metrics.get("requests", {})
702 sys = metrics.get("system", {})
704 total_requests += req.get("requests_total", 0)
705 total_2xx += req.get("requests_2xx", 0)
706 total_4xx += req.get("requests_4xx", 0)
707 total_5xx += req.get("requests_5xx", 0)
709 if req.get("response_time_avg_ms"): 709 ↛ 711line 709 didn't jump to line 711 because the condition on line 709 was always true
710 response_times.append(req["response_time_avg_ms"])
711 if req.get("requests_per_second"): 711 ↛ 713line 711 didn't jump to line 713 because the condition on line 711 was always true
712 request_rates.append(req["requests_per_second"])
713 if sys.get("cpu_percent"): 713 ↛ 715line 713 didn't jump to line 715 because the condition on line 713 was always true
714 cpu_percents.append(sys["cpu_percent"])
715 if sys.get("memory_percent"): 715 ↛ 699line 715 didn't jump to line 699 because the condition on line 715 was always true
716 memory_percents.append(sys["memory_percent"])
718 # Calculate averages and peaks
719 avg_response_time = sum(response_times) / len(response_times) if response_times else 0.0
720 peak_rps = max(request_rates) if request_rates else 0.0
721 avg_cpu = sum(cpu_percents) / len(cpu_percents) if cpu_percents else 0.0
722 avg_memory = sum(memory_percents) / len(memory_percents) if memory_percents else 0.0
723 peak_cpu = max(cpu_percents) if cpu_percents else 0.0
724 peak_memory = max(memory_percents) if memory_percents else 0.0
726 # Create aggregate
727 aggregate = PerformanceAggregate(
728 period_start=hour_start,
729 period_end=hour_end,
730 period_type="hourly",
731 host=HOSTNAME,
732 requests_total=total_requests,
733 requests_2xx=total_2xx,
734 requests_4xx=total_4xx,
735 requests_5xx=total_5xx,
736 avg_response_time_ms=round(avg_response_time, 2),
737 p95_response_time_ms=0.0, # Would need more data for percentiles
738 peak_requests_per_second=round(peak_rps, 2),
739 avg_cpu_percent=round(avg_cpu, 2),
740 avg_memory_percent=round(avg_memory, 2),
741 peak_cpu_percent=round(peak_cpu, 2),
742 peak_memory_percent=round(peak_memory, 2),
743 )
745 db.add(aggregate)
746 db.commit()
747 db.refresh(aggregate)
749 return aggregate
750 except Exception as e:
751 logger.error(f"Error creating hourly aggregate: {e}")
752 db.rollback()
753 return None
756# Singleton service instance
757_performance_service: Optional[PerformanceService] = None
760def get_performance_service(db: Optional[Session] = None) -> PerformanceService:
761 """Get or create the performance service singleton.
763 Args:
764 db: Optional database session.
766 Returns:
767 PerformanceService: The service instance.
768 """
769 global _performance_service # pylint: disable=global-statement
770 if _performance_service is None:
771 _performance_service = PerformanceService(db)
772 elif db is not None:
773 _performance_service.db = db
774 return _performance_service