Coverage for mcpgateway / services / performance_service.py: 95%

344 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/performance_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5 

6Performance Monitoring Service. 

7 

8This module provides comprehensive system and application performance monitoring 

9for the MCP Gateway. It collects metrics from: 

10- System resources (CPU, memory, disk, network) via psutil 

11- Gunicorn workers and processes 

12- Database connection pools 

13- Redis cache (when available) 

14- HTTP request statistics from Prometheus metrics 

15 

16The service supports both single-instance and distributed deployments, 

17with optional Redis-based metric aggregation for multi-worker environments. 

18""" 

19 

20# Standard 

21from datetime import datetime, timedelta, timezone 

22import logging 

23import os 

24import socket 

25import threading 

26import time 

27from typing import Dict, List, Optional 

28 

29# Third-Party 

30from sqlalchemy import delete, desc 

31from sqlalchemy.orm import Session 

32 

33# First-Party 

34from mcpgateway.config import settings 

35from mcpgateway.db import PerformanceAggregate, PerformanceSnapshot 

36from mcpgateway.schemas import ( 

37 CacheMetricsSchema, 

38 DatabaseMetricsSchema, 

39 GunicornMetricsSchema, 

40 PerformanceAggregateRead, 

41 PerformanceDashboard, 

42 PerformanceHistoryResponse, 

43 RequestMetricsSchema, 

44 SystemMetricsSchema, 

45 WorkerMetrics, 

46) 

47from mcpgateway.utils.redis_client import get_redis_client 

48 

49# Cache import (lazy to avoid circular dependencies) 

50_ADMIN_STATS_CACHE = None 

51 

52 

53def _get_admin_stats_cache(): 

54 """Get admin stats cache singleton lazily. 

55 

56 Returns: 

57 AdminStatsCache instance. 

58 """ 

59 global _ADMIN_STATS_CACHE # pylint: disable=global-statement 

60 if _ADMIN_STATS_CACHE is None: 

61 # First-Party 

62 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

63 

64 _ADMIN_STATS_CACHE = admin_stats_cache 

65 return _ADMIN_STATS_CACHE 

66 

67 

68# Optional psutil import 

69try: 

70 # Third-Party 

71 import psutil 

72 

73 PSUTIL_AVAILABLE = True 

74except ImportError: 

75 psutil = None # type: ignore 

76 PSUTIL_AVAILABLE = False 

77 

78# Optional redis import 

79try: 

80 # Third-Party 

81 import redis.asyncio as aioredis 

82 

83 REDIS_AVAILABLE = True 

84except ImportError: 

85 aioredis = None # type: ignore 

86 REDIS_AVAILABLE = False 

87 

88# Optional prometheus_client import 

89try: 

90 # Third-Party 

91 from prometheus_client import REGISTRY 

92 

93 PROMETHEUS_AVAILABLE = True 

94except ImportError: 

95 REGISTRY = None # type: ignore 

96 PROMETHEUS_AVAILABLE = False 

97 

98logger = logging.getLogger(__name__) 

99 

100# Track application start time 

101APP_START_TIME = time.time() 

102HOSTNAME = socket.gethostname() 

103 

104# Cache for net_connections (throttled to reduce CPU usage) 

105_net_connections_cache: int = 0 

106_net_connections_cache_time: float = 0.0 

107_net_connections_lock = threading.Lock() 

108 

109 

110class PerformanceService: 

111 """ 

112 Service for collecting and managing performance metrics. 

113 

114 Provides methods for: 

115 - Real-time metric collection from system and application 

116 - Historical metric storage and retrieval 

117 - Metric aggregation for trend analysis 

118 - Worker process discovery and monitoring 

119 """ 

120 

121 def __init__(self, db: Optional[Session] = None): 

122 """Initialize the performance service. 

123 

124 Args: 

125 db: Optional SQLAlchemy database session. 

126 """ 

127 self.db = db 

128 self._request_count_cache: Dict[str, int] = {} 

129 self._last_request_time = time.time() 

130 

131 def _get_net_connections_cached(self) -> int: 

132 """Get network connections count with caching to reduce CPU usage. 

133 

134 Uses module-level cache with configurable TTL to throttle expensive 

135 psutil.net_connections() calls. Thread-safe with double-check locking. 

136 

137 Returns: 

138 int: Number of active network connections, or 0 if disabled/unavailable. 

139 """ 

140 global _net_connections_cache, _net_connections_cache_time # pylint: disable=global-statement 

141 

142 # Check if net_connections tracking is disabled 

143 if not settings.mcpgateway_performance_net_connections_enabled: 

144 return 0 

145 

146 if not PSUTIL_AVAILABLE or psutil is None: 

147 return 0 

148 

149 current_time = time.time() 

150 cache_ttl = settings.mcpgateway_performance_net_connections_cache_ttl 

151 

152 # Return cached value if still valid (fast path, no lock needed) 

153 if current_time - _net_connections_cache_time < cache_ttl: 

154 return _net_connections_cache 

155 

156 # Use lock for cache refresh to prevent concurrent expensive calls 

157 with _net_connections_lock: 

158 # Double-check after acquiring lock (another thread may have refreshed) 

159 # Re-read current time in case we waited on the lock 

160 current_time = time.time() 

161 if current_time - _net_connections_cache_time < cache_ttl: 

162 return _net_connections_cache 

163 

164 # Refresh the cache 

165 try: 

166 _net_connections_cache = len(psutil.net_connections(kind="inet")) 

167 except (psutil.AccessDenied, OSError) as e: 

168 logger.debug("Could not get net_connections: %s", e) 

169 # Keep stale cache value on error (don't update _net_connections_cache) 

170 

171 # Update cache time after the call to anchor TTL to actual refresh time 

172 _net_connections_cache_time = time.time() 

173 

174 return _net_connections_cache 

175 

176 def get_system_metrics(self) -> SystemMetricsSchema: 

177 """Collect current system metrics using psutil. 

178 

179 Returns: 

180 SystemMetricsSchema: Current system resource metrics. 

181 """ 

182 if not PSUTIL_AVAILABLE or psutil is None: 

183 # Return empty metrics if psutil not available 

184 return SystemMetricsSchema( 

185 cpu_percent=0.0, 

186 cpu_count=os.cpu_count() or 1, 

187 memory_total_mb=0, 

188 memory_used_mb=0, 

189 memory_available_mb=0, 

190 memory_percent=0.0, 

191 disk_total_gb=0.0, 

192 disk_used_gb=0.0, 

193 disk_percent=0.0, 

194 ) 

195 

196 # CPU metrics 

197 cpu_percent = psutil.cpu_percent(interval=0.1) 

198 cpu_count = psutil.cpu_count(logical=True) or 1 

199 cpu_freq = psutil.cpu_freq() 

200 cpu_freq_mhz = round(cpu_freq.current) if cpu_freq else None 

201 

202 # Load average (Unix only) 

203 try: 

204 load_1, load_5, load_15 = os.getloadavg() 

205 except (AttributeError, OSError): 

206 load_1, load_5, load_15 = None, None, None 

207 

208 # Memory metrics 

209 vm = psutil.virtual_memory() 

210 swap = psutil.swap_memory() 

211 

212 # Disk metrics 

213 root = os.getenv("SystemDrive", "C:\\") if os.name == "nt" else "/" 

214 disk = psutil.disk_usage(str(root)) 

215 

216 # Network metrics 

217 net_io = psutil.net_io_counters() 

218 net_connections = self._get_net_connections_cached() 

219 

220 # Boot time 

221 boot_time = datetime.fromtimestamp(psutil.boot_time(), tz=timezone.utc) 

222 

223 return SystemMetricsSchema( 

224 cpu_percent=cpu_percent, 

225 cpu_count=cpu_count, 

226 cpu_freq_mhz=cpu_freq_mhz, 

227 load_avg_1m=round(load_1, 2) if load_1 is not None else None, 

228 load_avg_5m=round(load_5, 2) if load_5 is not None else None, 

229 load_avg_15m=round(load_15, 2) if load_15 is not None else None, 

230 memory_total_mb=round(vm.total / 1_048_576), 

231 memory_used_mb=round(vm.used / 1_048_576), 

232 memory_available_mb=round(vm.available / 1_048_576), 

233 memory_percent=vm.percent, 

234 swap_total_mb=round(swap.total / 1_048_576), 

235 swap_used_mb=round(swap.used / 1_048_576), 

236 disk_total_gb=round(disk.total / 1_073_741_824, 2), 

237 disk_used_gb=round(disk.used / 1_073_741_824, 2), 

238 disk_percent=disk.percent, 

239 network_bytes_sent=net_io.bytes_sent, 

240 network_bytes_recv=net_io.bytes_recv, 

241 network_connections=net_connections, 

242 boot_time=boot_time, 

243 ) 

244 

245 def get_worker_metrics(self) -> List[WorkerMetrics]: 

246 """Discover and collect metrics from Gunicorn worker processes. 

247 

248 Returns: 

249 List[WorkerMetrics]: Metrics for each worker process. 

250 """ 

251 workers: List[WorkerMetrics] = [] 

252 

253 if not PSUTIL_AVAILABLE or psutil is None: 

254 return workers 

255 

256 current_pid = os.getpid() 

257 current_proc = psutil.Process(current_pid) 

258 

259 # Try to find Gunicorn master by looking at parent 

260 try: 

261 parent = current_proc.parent() 

262 if parent and "gunicorn" in parent.name().lower(): 

263 # We're in a Gunicorn worker, get all siblings 

264 for child in parent.children(recursive=False): 

265 workers.append(self._get_process_metrics(child)) 

266 else: 

267 # Not in Gunicorn, just report current process 

268 workers.append(self._get_process_metrics(current_proc)) 

269 except (psutil.NoSuchProcess, psutil.AccessDenied): 

270 # Fallback to current process only 

271 workers.append(self._get_process_metrics(current_proc)) 

272 

273 return workers 

274 

275 def _get_process_metrics(self, proc: "psutil.Process") -> WorkerMetrics: 

276 """Get metrics for a specific process. 

277 

278 Args: 

279 proc: psutil Process object. 

280 

281 Returns: 

282 WorkerMetrics: Metrics for the process. 

283 """ 

284 try: 

285 with proc.oneshot(): 

286 mem_info = proc.memory_info() 

287 create_time = datetime.fromtimestamp(proc.create_time(), tz=timezone.utc) 

288 uptime = int(time.time() - proc.create_time()) 

289 

290 # Get file descriptors (Unix only) 

291 try: 

292 open_fds = proc.num_fds() 

293 except (AttributeError, psutil.AccessDenied): 

294 open_fds = None 

295 

296 # Get connections 

297 try: 

298 connection_fetcher = getattr(proc, "net_connections", None) or proc.connections 

299 connections = len(connection_fetcher(kind="inet")) 

300 except (psutil.AccessDenied, psutil.NoSuchProcess): 

301 connections = 0 

302 

303 return WorkerMetrics( 

304 pid=proc.pid, 

305 cpu_percent=proc.cpu_percent(interval=0.1), 

306 memory_rss_mb=round(mem_info.rss / 1_048_576, 2), 

307 memory_vms_mb=round(mem_info.vms / 1_048_576, 2), 

308 threads=proc.num_threads(), 

309 connections=connections, 

310 open_fds=open_fds, 

311 status=proc.status(), 

312 create_time=create_time, 

313 uptime_seconds=uptime, 

314 ) 

315 except (psutil.NoSuchProcess, psutil.AccessDenied) as e: 

316 logger.warning(f"Could not get metrics for process {proc.pid}: {e}") 

317 return WorkerMetrics( 

318 pid=proc.pid, 

319 cpu_percent=0.0, 

320 memory_rss_mb=0.0, 

321 memory_vms_mb=0.0, 

322 threads=0, 

323 status="unknown", 

324 ) 

325 

326 def get_gunicorn_metrics(self) -> GunicornMetricsSchema: 

327 """Collect Gunicorn-specific metrics. 

328 

329 Returns: 

330 GunicornMetricsSchema: Gunicorn server metrics. 

331 """ 

332 if not PSUTIL_AVAILABLE or psutil is None: 

333 return GunicornMetricsSchema() 

334 

335 current_pid = os.getpid() 

336 current_proc = psutil.Process(current_pid) 

337 

338 try: 

339 parent = current_proc.parent() 

340 if parent and "gunicorn" in parent.name().lower(): 

341 # We're in a Gunicorn worker 

342 children = parent.children(recursive=False) 

343 workers_total = len(children) 

344 

345 # Count active workers (those with connections or recent CPU activity) 

346 workers_active = 0 

347 for child in children: 

348 try: 

349 if child.cpu_percent(interval=0) > 0: 

350 workers_active += 1 

351 except (psutil.NoSuchProcess, psutil.AccessDenied): 

352 pass 

353 

354 return GunicornMetricsSchema( 

355 master_pid=parent.pid, 

356 workers_total=workers_total, 

357 workers_active=workers_active, 

358 workers_idle=workers_total - workers_active, 

359 max_requests=10000, # Default from gunicorn.config.py 

360 ) 

361 except (psutil.NoSuchProcess, psutil.AccessDenied): 

362 pass 

363 

364 # Not running under Gunicorn or can't determine 

365 return GunicornMetricsSchema( 

366 master_pid=None, 

367 workers_total=1, 

368 workers_active=1, 

369 workers_idle=0, 

370 ) 

371 

372 def get_request_metrics(self) -> RequestMetricsSchema: 

373 """Collect HTTP request metrics from Prometheus. 

374 

375 Returns: 

376 RequestMetricsSchema: HTTP request performance metrics. 

377 """ 

378 metrics = RequestMetricsSchema() 

379 

380 if not PROMETHEUS_AVAILABLE or REGISTRY is None: 

381 return metrics 

382 

383 try: 

384 # Try to get metrics from Prometheus registry 

385 for metric in REGISTRY.collect(): 

386 if metric.name == "http_requests_total": 

387 for sample in metric.samples: 

388 if sample.name == "http_requests_total": 388 ↛ 387line 388 didn't jump to line 387 because the condition on line 388 was always true

389 # prometheus_fastapi_instrumentator uses 'status' label, not 'status_code' 

390 status = sample.labels.get("status", sample.labels.get("status_code", "")) 

391 value = int(sample.value) 

392 metrics.requests_total += value 

393 

394 if status.startswith("1"): 

395 metrics.requests_1xx += value 

396 elif status.startswith("2"): 

397 metrics.requests_2xx += value 

398 elif status.startswith("3"): 

399 metrics.requests_3xx += value 

400 elif status.startswith("4"): 

401 metrics.requests_4xx += value 

402 elif status.startswith("5"): 402 ↛ 387line 402 didn't jump to line 387 because the condition on line 402 was always true

403 metrics.requests_5xx += value 

404 

405 elif metric.name == "http_request_duration_seconds": 405 ↛ 385line 405 didn't jump to line 385 because the condition on line 405 was always true

406 # Extract histogram data for percentiles 

407 sum_val = 0.0 

408 count_val = 0 

409 for sample in metric.samples: 

410 if sample.name.endswith("_sum"): 

411 sum_val = sample.value 

412 elif sample.name.endswith("_count"): 412 ↛ 409line 412 didn't jump to line 409 because the condition on line 412 was always true

413 count_val = int(sample.value) 

414 

415 if count_val > 0: 415 ↛ 385line 415 didn't jump to line 385 because the condition on line 415 was always true

416 metrics.response_time_avg_ms = round((sum_val / count_val) * 1000, 2) 

417 

418 # Calculate error rate 

419 if metrics.requests_total > 0: 

420 error_count = metrics.requests_4xx + metrics.requests_5xx 

421 metrics.error_rate = round((error_count / metrics.requests_total) * 100, 2) 

422 

423 # Calculate requests per second 

424 current_time = time.time() 

425 elapsed = current_time - self._last_request_time 

426 if elapsed > 0: 426 ↛ 436line 426 didn't jump to line 436 because the condition on line 426 was always true

427 prev_total = self._request_count_cache.get("total", 0) 

428 if prev_total > 0: 

429 metrics.requests_per_second = round((metrics.requests_total - prev_total) / elapsed, 2) 

430 self._request_count_cache["total"] = metrics.requests_total 

431 self._last_request_time = current_time 

432 

433 except Exception as e: 

434 logger.warning(f"Error collecting Prometheus metrics: {e}") 

435 

436 return metrics 

437 

438 def get_database_metrics(self, _db: Optional[Session] = None) -> DatabaseMetricsSchema: 

439 """Collect database connection pool metrics. 

440 

441 Args: 

442 _db: Optional SQLAlchemy session (unused, engine imported directly). 

443 

444 Returns: 

445 DatabaseMetricsSchema: Database connection pool metrics. 

446 """ 

447 metrics = DatabaseMetricsSchema() 

448 

449 try: 

450 # Import engine from db module (lazy import to avoid circular dependency) 

451 # First-Party 

452 from mcpgateway.db import engine # pylint: disable=import-outside-toplevel 

453 

454 pool = engine.pool 

455 if pool: 455 ↛ 463line 455 didn't jump to line 463 because the condition on line 455 was always true

456 metrics.pool_size = pool.size() 

457 metrics.connections_in_use = pool.checkedout() 

458 metrics.connections_available = pool.checkedin() 

459 metrics.overflow = pool.overflow() 

460 except Exception as e: 

461 logger.warning(f"Error collecting database metrics: {e}") 

462 

463 return metrics 

464 

465 async def get_cache_metrics(self) -> CacheMetricsSchema: 

466 """Collect Redis cache metrics. 

467 

468 Returns: 

469 CacheMetricsSchema: Redis cache metrics. 

470 """ 

471 metrics = CacheMetricsSchema() 

472 

473 if not REDIS_AVAILABLE or aioredis is None: 

474 return metrics 

475 

476 if not settings.redis_url or settings.cache_type.lower() != "redis": 

477 return metrics 

478 

479 try: 

480 # Use shared Redis client from factory 

481 client = await get_redis_client() 

482 if not client: 

483 return metrics 

484 

485 info = await client.info() 

486 

487 metrics.connected = True 

488 metrics.version = info.get("redis_version") 

489 metrics.used_memory_mb = round(info.get("used_memory", 0) / 1_048_576, 2) 

490 metrics.connected_clients = info.get("connected_clients", 0) 

491 metrics.ops_per_second = info.get("instantaneous_ops_per_sec", 0) 

492 

493 # Cache hit rate 

494 hits = info.get("keyspace_hits", 0) 

495 misses = info.get("keyspace_misses", 0) 

496 metrics.keyspace_hits = hits 

497 metrics.keyspace_misses = misses 

498 

499 total = hits + misses 

500 if total > 0: 500 ↛ 508line 500 didn't jump to line 508 because the condition on line 500 was always true

501 metrics.hit_rate = round((hits / total) * 100, 2) 

502 

503 # Don't close the shared client 

504 except Exception as e: 

505 logger.warning(f"Error collecting Redis metrics: {e}") 

506 metrics.connected = False 

507 

508 return metrics 

509 

510 async def get_dashboard(self) -> PerformanceDashboard: 

511 """Collect all metrics for the performance dashboard. 

512 

513 Returns: 

514 PerformanceDashboard: Complete dashboard data. 

515 """ 

516 uptime = int(time.time() - APP_START_TIME) 

517 

518 # Collect all metrics 

519 system = self.get_system_metrics() 

520 requests = self.get_request_metrics() 

521 database = self.get_database_metrics(self.db) 

522 cache = await self.get_cache_metrics() 

523 gunicorn = self.get_gunicorn_metrics() 

524 workers = self.get_worker_metrics() 

525 

526 return PerformanceDashboard( 

527 timestamp=datetime.now(timezone.utc), 

528 uptime_seconds=uptime, 

529 host=HOSTNAME, 

530 system=system, 

531 requests=requests, 

532 database=database, 

533 cache=cache, 

534 gunicorn=gunicorn, 

535 workers=workers, 

536 cluster_hosts=[HOSTNAME], 

537 is_distributed=settings.mcpgateway_performance_distributed, 

538 ) 

539 

540 def save_snapshot(self, db: Session) -> Optional[PerformanceSnapshot]: 

541 """Save current metrics as a snapshot. 

542 

543 Args: 

544 db: SQLAlchemy database session. 

545 

546 Returns: 

547 PerformanceSnapshot: The saved snapshot, or None on error. 

548 """ 

549 try: 

550 # Collect current metrics 

551 system = self.get_system_metrics() 

552 requests = self.get_request_metrics() 

553 database = self.get_database_metrics(db) 

554 gunicorn = self.get_gunicorn_metrics() 

555 workers = self.get_worker_metrics() 

556 

557 # Serialize to JSON 

558 metrics_json = { 

559 "system": system.model_dump(), 

560 "requests": requests.model_dump(), 

561 "database": database.model_dump(), 

562 "gunicorn": gunicorn.model_dump(), 

563 "workers": [w.model_dump() for w in workers], 

564 } 

565 

566 # Convert datetime to ISO format strings for JSON serialization 

567 if metrics_json["system"].get("boot_time"): 

568 metrics_json["system"]["boot_time"] = metrics_json["system"]["boot_time"].isoformat() 

569 for worker in metrics_json["workers"]: 

570 if worker.get("create_time"): 570 ↛ 569line 570 didn't jump to line 569 because the condition on line 570 was always true

571 worker["create_time"] = worker["create_time"].isoformat() 

572 

573 snapshot = PerformanceSnapshot( 

574 host=HOSTNAME, 

575 worker_id=str(os.getpid()), 

576 metrics_json=metrics_json, 

577 ) 

578 db.add(snapshot) 

579 db.commit() 

580 db.refresh(snapshot) 

581 

582 return snapshot 

583 except Exception as e: 

584 logger.error(f"Error saving performance snapshot: {e}") 

585 db.rollback() 

586 return None 

587 

588 def cleanup_old_snapshots(self, db: Session) -> int: 

589 """Delete snapshots older than retention period. 

590 

591 Args: 

592 db: SQLAlchemy database session. 

593 

594 Returns: 

595 int: Number of deleted snapshots. 

596 """ 

597 try: 

598 cutoff = datetime.now(timezone.utc) - timedelta(hours=settings.mcpgateway_performance_retention_hours) 

599 

600 result = db.execute(delete(PerformanceSnapshot).where(PerformanceSnapshot.timestamp < cutoff)) 

601 deleted = result.rowcount 

602 db.commit() 

603 

604 if deleted > 0: 604 ↛ 607line 604 didn't jump to line 607 because the condition on line 604 was always true

605 logger.info(f"Cleaned up {deleted} old performance snapshots") 

606 

607 return deleted 

608 except Exception as e: 

609 logger.error(f"Error cleaning up snapshots: {e}") 

610 db.rollback() 

611 return 0 

612 

613 async def get_history( 

614 self, 

615 db: Session, 

616 period_type: str = "hourly", 

617 start_time: Optional[datetime] = None, 

618 end_time: Optional[datetime] = None, 

619 host: Optional[str] = None, 

620 limit: int = 168, 

621 ) -> PerformanceHistoryResponse: 

622 """Get historical performance aggregates. 

623 

624 Args: 

625 db: SQLAlchemy database session. 

626 period_type: Aggregation period (hourly, daily). 

627 start_time: Start of time range. 

628 end_time: End of time range. 

629 host: Filter by host. 

630 limit: Maximum results. 

631 

632 Returns: 

633 PerformanceHistoryResponse: Historical aggregates. 

634 """ 

635 # Build cache key from parameters 

636 start_str = start_time.isoformat() if start_time else "none" 

637 end_str = end_time.isoformat() if end_time else "none" 

638 host_str = host or "all" 

639 cache_key = f"{period_type}:{start_str}:{end_str}:{host_str}:{limit}" 

640 

641 # Check cache first 

642 cache = _get_admin_stats_cache() 

643 cached = await cache.get_performance_history(cache_key) 

644 if cached is not None: 

645 return PerformanceHistoryResponse.model_validate(cached) 

646 

647 query = db.query(PerformanceAggregate).filter(PerformanceAggregate.period_type == period_type) 

648 

649 if start_time: 

650 query = query.filter(PerformanceAggregate.period_start >= start_time) 

651 if end_time: 

652 query = query.filter(PerformanceAggregate.period_end <= end_time) 

653 if host: 

654 query = query.filter(PerformanceAggregate.host == host) 

655 

656 total_count = query.count() 

657 aggregates = query.order_by(desc(PerformanceAggregate.period_start)).limit(limit).all() 

658 

659 result = PerformanceHistoryResponse( 

660 aggregates=[PerformanceAggregateRead.model_validate(a) for a in aggregates], 

661 period_type=period_type, 

662 total_count=total_count, 

663 ) 

664 

665 # Store in cache 

666 await cache.set_performance_history(result.model_dump(), cache_key) 

667 

668 return result 

669 

670 def create_hourly_aggregate(self, db: Session, hour_start: datetime) -> Optional[PerformanceAggregate]: 

671 """Create an hourly aggregate from snapshots. 

672 

673 Args: 

674 db: SQLAlchemy database session. 

675 hour_start: Start of the hour to aggregate. 

676 

677 Returns: 

678 PerformanceAggregate: The created aggregate, or None on error. 

679 """ 

680 hour_end = hour_start + timedelta(hours=1) 

681 

682 try: 

683 # Get snapshots for this hour 

684 snapshots = db.query(PerformanceSnapshot).filter(PerformanceSnapshot.timestamp >= hour_start, PerformanceSnapshot.timestamp < hour_end).all() 

685 

686 if not snapshots: 

687 return None 

688 

689 # Aggregate metrics 

690 total_requests = 0 

691 total_2xx = 0 

692 total_4xx = 0 

693 total_5xx = 0 

694 response_times: List[float] = [] 

695 request_rates: List[float] = [] 

696 cpu_percents: List[float] = [] 

697 memory_percents: List[float] = [] 

698 

699 for snapshot in snapshots: 

700 metrics = snapshot.metrics_json 

701 req = metrics.get("requests", {}) 

702 sys = metrics.get("system", {}) 

703 

704 total_requests += req.get("requests_total", 0) 

705 total_2xx += req.get("requests_2xx", 0) 

706 total_4xx += req.get("requests_4xx", 0) 

707 total_5xx += req.get("requests_5xx", 0) 

708 

709 if req.get("response_time_avg_ms"): 709 ↛ 711line 709 didn't jump to line 711 because the condition on line 709 was always true

710 response_times.append(req["response_time_avg_ms"]) 

711 if req.get("requests_per_second"): 711 ↛ 713line 711 didn't jump to line 713 because the condition on line 711 was always true

712 request_rates.append(req["requests_per_second"]) 

713 if sys.get("cpu_percent"): 713 ↛ 715line 713 didn't jump to line 715 because the condition on line 713 was always true

714 cpu_percents.append(sys["cpu_percent"]) 

715 if sys.get("memory_percent"): 715 ↛ 699line 715 didn't jump to line 699 because the condition on line 715 was always true

716 memory_percents.append(sys["memory_percent"]) 

717 

718 # Calculate averages and peaks 

719 avg_response_time = sum(response_times) / len(response_times) if response_times else 0.0 

720 peak_rps = max(request_rates) if request_rates else 0.0 

721 avg_cpu = sum(cpu_percents) / len(cpu_percents) if cpu_percents else 0.0 

722 avg_memory = sum(memory_percents) / len(memory_percents) if memory_percents else 0.0 

723 peak_cpu = max(cpu_percents) if cpu_percents else 0.0 

724 peak_memory = max(memory_percents) if memory_percents else 0.0 

725 

726 # Create aggregate 

727 aggregate = PerformanceAggregate( 

728 period_start=hour_start, 

729 period_end=hour_end, 

730 period_type="hourly", 

731 host=HOSTNAME, 

732 requests_total=total_requests, 

733 requests_2xx=total_2xx, 

734 requests_4xx=total_4xx, 

735 requests_5xx=total_5xx, 

736 avg_response_time_ms=round(avg_response_time, 2), 

737 p95_response_time_ms=0.0, # Would need more data for percentiles 

738 peak_requests_per_second=round(peak_rps, 2), 

739 avg_cpu_percent=round(avg_cpu, 2), 

740 avg_memory_percent=round(avg_memory, 2), 

741 peak_cpu_percent=round(peak_cpu, 2), 

742 peak_memory_percent=round(peak_memory, 2), 

743 ) 

744 

745 db.add(aggregate) 

746 db.commit() 

747 db.refresh(aggregate) 

748 

749 return aggregate 

750 except Exception as e: 

751 logger.error(f"Error creating hourly aggregate: {e}") 

752 db.rollback() 

753 return None 

754 

755 

756# Singleton service instance 

757_performance_service: Optional[PerformanceService] = None 

758 

759 

760def get_performance_service(db: Optional[Session] = None) -> PerformanceService: 

761 """Get or create the performance service singleton. 

762 

763 Args: 

764 db: Optional database session. 

765 

766 Returns: 

767 PerformanceService: The service instance. 

768 """ 

769 global _performance_service # pylint: disable=global-statement 

770 if _performance_service is None: 

771 _performance_service = PerformanceService(db) 

772 elif db is not None: 

773 _performance_service.db = db 

774 return _performance_service