Coverage for mcpgateway / routers / log_search.py: 100%
327 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/routers/log_search.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
6Log Search API Router.
8This module provides REST API endpoints for searching and analyzing structured logs,
9security events, audit trails, and performance metrics.
10"""
12# Standard
13from datetime import datetime, timedelta, timezone
14import logging
15from typing import Any, Dict, List, Optional, Tuple
17# Third-Party
18from fastapi import APIRouter, Depends, HTTPException, Query
19from pydantic import BaseModel, ConfigDict, Field
20from sqlalchemy import and_, delete, desc, or_, select
21from sqlalchemy.orm import Session
22from sqlalchemy.sql import func as sa_func
24# First-Party
25from mcpgateway.config import settings
26from mcpgateway.db import (
27 AuditTrail,
28 get_db,
29 PerformanceMetric,
30 SecurityEvent,
31 StructuredLogEntry,
32)
33from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission
34from mcpgateway.services.log_aggregator import get_log_aggregator
36logger = logging.getLogger(__name__)
38router = APIRouter(prefix="/api/logs", tags=["logs"])
40MIN_PERFORMANCE_RANGE_HOURS = 5.0 / 60.0
41_DEFAULT_AGGREGATION_KEY = "5m"
42_AGGREGATION_LEVELS: Dict[str, Dict[str, Any]] = {
43 "5m": {"minutes": 5, "label": "5-minute windows"},
44 "24h": {"minutes": 24 * 60, "label": "24-hour windows"},
45}
48def _align_to_window(dt: datetime, window_minutes: int) -> datetime:
49 """Align a datetime down to the nearest aggregation window boundary.
51 Args:
52 dt: Datetime to align
53 window_minutes: Aggregation window size in minutes
55 Returns:
56 datetime: Aligned datetime at window boundary
57 """
58 timestamp = dt.astimezone(timezone.utc)
59 total_minutes = int(timestamp.timestamp() // 60)
60 aligned_minutes = (total_minutes // window_minutes) * window_minutes
61 return datetime.fromtimestamp(aligned_minutes * 60, tz=timezone.utc)
64def _deduplicate_metrics(metrics: List[PerformanceMetric]) -> List[PerformanceMetric]:
65 """Ensure a single metric per component/operation/window.
67 Args:
68 metrics: List of performance metrics to deduplicate
70 Returns:
71 List[PerformanceMetric]: Deduplicated metrics sorted by window_start
72 """
73 if not metrics:
74 return []
76 deduped: Dict[Tuple[str, str, datetime], PerformanceMetric] = {}
77 for metric in metrics:
78 component = metric.component or ""
79 operation = metric.operation_type or ""
80 key = (component, operation, metric.window_start)
81 existing = deduped.get(key)
82 if existing is None or metric.timestamp > existing.timestamp:
83 deduped[key] = metric
85 return sorted(deduped.values(), key=lambda m: m.window_start, reverse=True)
88def _expand_component_filters(components: List[str]) -> List[str]:
89 """Expand component filters to include aliases for backward compatibility.
91 Args:
92 components: Component filter values from the request
94 Returns:
95 List of component values including aliases
96 """
97 normalized = {component for component in components if component}
98 if "http_gateway" in normalized or "gateway" in normalized:
99 normalized.update({"http_gateway", "gateway"})
100 return list(normalized)
103def _aggregate_custom_windows(
104 aggregator,
105 window_minutes: int,
106 db: Session,
107) -> None:
108 """Aggregate metrics using custom window duration.
110 Args:
111 aggregator: Log aggregator instance
112 window_minutes: Window size in minutes
113 db: Database session
114 """
115 window_delta = timedelta(minutes=window_minutes)
116 window_duration_seconds = window_minutes * 60
118 sample_row = db.execute(
119 select(PerformanceMetric.window_start, PerformanceMetric.window_end)
120 .where(PerformanceMetric.window_duration_seconds == window_duration_seconds)
121 .order_by(desc(PerformanceMetric.window_start))
122 .limit(1)
123 ).first()
125 needs_rebuild = False
126 if sample_row:
127 sample_start, sample_end = sample_row
128 if sample_start is not None and sample_end is not None:
129 start_utc = sample_start if sample_start.tzinfo else sample_start.replace(tzinfo=timezone.utc)
130 end_utc = sample_end if sample_end.tzinfo else sample_end.replace(tzinfo=timezone.utc)
131 duration = int((end_utc - start_utc).total_seconds())
132 if duration != window_duration_seconds:
133 needs_rebuild = True
134 aligned_start = _align_to_window(start_utc, window_minutes)
135 if aligned_start != start_utc:
136 needs_rebuild = True
138 if needs_rebuild:
139 db.execute(delete(PerformanceMetric).where(PerformanceMetric.window_duration_seconds == window_duration_seconds))
140 db.commit()
141 sample_row = None
143 max_existing = None
144 if not needs_rebuild:
145 max_existing = db.execute(select(sa_func.max(PerformanceMetric.window_start)).where(PerformanceMetric.window_duration_seconds == window_duration_seconds)).scalar()
147 if max_existing:
148 current_start = max_existing if max_existing.tzinfo else max_existing.replace(tzinfo=timezone.utc)
149 current_start = current_start + window_delta
150 else:
151 earliest_log = db.execute(select(sa_func.min(StructuredLogEntry.timestamp))).scalar()
152 if not earliest_log:
153 return
154 if earliest_log.tzinfo is None:
155 earliest_log = earliest_log.replace(tzinfo=timezone.utc)
156 current_start = _align_to_window(earliest_log, window_minutes)
158 reference_end = datetime.now(timezone.utc)
160 # Collect all window starts for the full range, then perform a single batched aggregation
161 window_starts: List[datetime] = []
162 while current_start < reference_end:
163 window_starts.append(current_start)
164 current_start = current_start + window_delta
166 # Limit to prevent memory issues; keep most recent windows (trim oldest)
167 max_windows = 10000
168 if len(window_starts) > max_windows:
169 logger.warning(
170 "Window list truncated from %d to %d windows; keeping most recent",
171 len(window_starts),
172 max_windows,
173 )
174 window_starts = window_starts[-max_windows:]
176 # Delegate to aggregator batch method to avoid per-window recomputation
177 # Note: window_starts must be contiguous and aligned; sparse lists will generate extra windows
178 if window_starts:
179 batch_succeeded = False
180 if hasattr(aggregator, "aggregate_all_components_batch"):
181 try:
182 aggregator.aggregate_all_components_batch(window_starts=window_starts, window_minutes=window_minutes, db=db)
183 batch_succeeded = True
184 except Exception:
185 logger.exception("Batch aggregation failed; falling back to per-window aggregation")
186 # Rollback failed transaction before attempting fallback (required for PostgreSQL)
187 db.rollback()
188 if not batch_succeeded:
189 # Backwards-compatible fallback: iterate windows (less efficient)
190 for ws in window_starts:
191 aggregator.aggregate_all_components(window_start=ws, window_end=ws + window_delta, db=db)
194# Request/Response Models
195class LogSearchRequest(BaseModel):
196 """Log search request parameters."""
198 search_text: Optional[str] = Field(None, description="Text search query")
199 level: Optional[List[str]] = Field(None, description="Log levels to filter")
200 component: Optional[List[str]] = Field(None, description="Components to filter")
201 category: Optional[List[str]] = Field(None, description="Categories to filter")
202 correlation_id: Optional[str] = Field(None, description="Correlation ID to filter")
203 user_id: Optional[str] = Field(None, description="User ID to filter")
204 start_time: Optional[datetime] = Field(None, description="Start timestamp")
205 end_time: Optional[datetime] = Field(None, description="End timestamp")
206 min_duration_ms: Optional[float] = Field(None, description="Minimum duration")
207 max_duration_ms: Optional[float] = Field(None, description="Maximum duration")
208 has_error: Optional[bool] = Field(None, description="Filter for errors")
209 limit: int = Field(100, ge=1, le=1000, description="Maximum results")
210 offset: int = Field(0, ge=0, description="Result offset")
211 sort_by: str = Field("timestamp", description="Field to sort by")
212 sort_order: str = Field("desc", description="Sort order (asc/desc)")
215class LogEntry(BaseModel):
216 """Log entry response model."""
218 id: str
219 timestamp: datetime
220 level: str
221 component: str
222 message: str
223 correlation_id: Optional[str] = None
224 user_id: Optional[str] = None
225 user_email: Optional[str] = None
226 duration_ms: Optional[float] = None
227 operation_type: Optional[str] = None
228 request_path: Optional[str] = None
229 request_method: Optional[str] = None
230 is_security_event: bool = False
231 error_details: Optional[Dict[str, Any]] = None
233 model_config = ConfigDict(from_attributes=True)
236class LogSearchResponse(BaseModel):
237 """Log search response."""
239 total: int
240 results: List[LogEntry]
243class CorrelationTraceRequest(BaseModel):
244 """Correlation trace request."""
246 correlation_id: str
249class CorrelationTraceResponse(BaseModel):
250 """Correlation trace response with all related logs."""
252 correlation_id: str
253 total_duration_ms: Optional[float]
254 log_count: int
255 error_count: int
256 logs: List[LogEntry]
257 security_events: List[Dict[str, Any]]
258 audit_trails: List[Dict[str, Any]]
259 performance_metrics: Optional[Dict[str, Any]]
262class SecurityEventResponse(BaseModel):
263 """Security event response model."""
265 id: str
266 timestamp: datetime
267 event_type: str
268 severity: str
269 category: str
270 user_id: Optional[str]
271 client_ip: str
272 description: str
273 threat_score: float
274 action_taken: Optional[str]
275 resolved: bool
277 model_config = ConfigDict(from_attributes=True)
280class AuditTrailResponse(BaseModel):
281 """Audit trail response model."""
283 id: str
284 timestamp: datetime
285 correlation_id: Optional[str] = None
286 action: str
287 resource_type: str
288 resource_id: Optional[str]
289 resource_name: Optional[str] = None
290 user_id: str
291 user_email: Optional[str] = None
292 success: bool
293 requires_review: bool
294 data_classification: Optional[str]
296 model_config = ConfigDict(from_attributes=True)
299class PerformanceMetricResponse(BaseModel):
300 """Performance metric response model."""
302 id: str
303 timestamp: datetime
304 component: str
305 operation_type: str
306 window_start: datetime
307 window_end: datetime
308 request_count: int
309 error_count: int
310 error_rate: float
311 avg_duration_ms: float
312 min_duration_ms: float
313 max_duration_ms: float
314 p50_duration_ms: float
315 p95_duration_ms: float
316 p99_duration_ms: float
318 model_config = ConfigDict(from_attributes=True)
321# API Endpoints
322@router.post("/search", response_model=LogSearchResponse)
323@require_permission("logs:read")
324async def search_logs(request: LogSearchRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> LogSearchResponse:
325 """Search structured logs with filters and pagination.
327 Args:
328 request: Search parameters
329 user: Current authenticated user
330 db: Database session
332 Returns:
333 Search results with pagination
335 Raises:
336 HTTPException: On database or validation errors
337 """
338 try:
339 # Build base query
340 stmt = select(StructuredLogEntry)
342 # Apply filters
343 conditions = []
345 if request.search_text:
346 conditions.append(or_(StructuredLogEntry.message.ilike(f"%{request.search_text}%"), StructuredLogEntry.component.ilike(f"%{request.search_text}%")))
348 if request.level:
349 conditions.append(StructuredLogEntry.level.in_(request.level))
351 if request.component:
352 components = _expand_component_filters(request.component)
353 conditions.append(StructuredLogEntry.component.in_(components))
355 # Note: category field doesn't exist in StructuredLogEntry
356 # if request.category:
357 # conditions.append(StructuredLogEntry.category.in_(request.category))
359 if request.correlation_id:
360 conditions.append(StructuredLogEntry.correlation_id == request.correlation_id)
362 if request.user_id:
363 conditions.append(StructuredLogEntry.user_id == request.user_id)
365 if request.start_time:
366 conditions.append(StructuredLogEntry.timestamp >= request.start_time)
368 if request.end_time:
369 conditions.append(StructuredLogEntry.timestamp <= request.end_time)
371 if request.min_duration_ms is not None:
372 conditions.append(StructuredLogEntry.duration_ms >= request.min_duration_ms)
374 if request.max_duration_ms is not None:
375 conditions.append(StructuredLogEntry.duration_ms <= request.max_duration_ms)
377 if request.has_error is not None:
378 if request.has_error:
379 conditions.append(StructuredLogEntry.error_details.isnot(None))
380 else:
381 conditions.append(StructuredLogEntry.error_details.is_(None))
383 if conditions:
384 stmt = stmt.where(and_(*conditions))
386 # Get total count
387 count_stmt = select(sa_func.count()).select_from(stmt.subquery())
388 total = db.execute(count_stmt).scalar() or 0
390 # Apply sorting
391 sort_column = getattr(StructuredLogEntry, request.sort_by, StructuredLogEntry.timestamp)
392 if request.sort_order == "desc":
393 stmt = stmt.order_by(desc(sort_column))
394 else:
395 stmt = stmt.order_by(sort_column)
397 # Apply pagination
398 stmt = stmt.limit(request.limit).offset(request.offset)
400 # Execute query
401 results = db.execute(stmt).scalars().all()
403 # Convert to response models
404 log_entries = [
405 LogEntry(
406 id=str(log.id),
407 timestamp=log.timestamp,
408 level=log.level,
409 component=log.component,
410 message=log.message,
411 correlation_id=log.correlation_id,
412 user_id=log.user_id,
413 user_email=log.user_email,
414 duration_ms=log.duration_ms,
415 operation_type=log.operation_type,
416 request_path=log.request_path,
417 request_method=log.request_method,
418 is_security_event=log.is_security_event,
419 error_details=log.error_details,
420 )
421 for log in results
422 ]
424 return LogSearchResponse(total=total, results=log_entries)
426 except Exception as e:
427 logger.error(f"Log search failed: {e}")
428 raise HTTPException(status_code=500, detail="Log search failed")
431@router.get("/trace/{correlation_id}", response_model=CorrelationTraceResponse)
432@require_permission("logs:read")
433async def trace_correlation_id(correlation_id: str, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> CorrelationTraceResponse:
434 """Get all logs and events for a correlation ID.
436 Args:
437 correlation_id: Correlation ID to trace
438 user: Current authenticated user
439 db: Database session
441 Returns:
442 Complete trace of all related logs and events
444 Raises:
445 HTTPException: On database or validation errors
446 """
447 try:
448 # Get structured logs
449 log_stmt = select(StructuredLogEntry).where(StructuredLogEntry.correlation_id == correlation_id).order_by(StructuredLogEntry.timestamp)
451 logs = db.execute(log_stmt).scalars().all()
453 # Get security events
454 security_stmt = select(SecurityEvent).where(SecurityEvent.correlation_id == correlation_id).order_by(SecurityEvent.timestamp)
456 security_events = db.execute(security_stmt).scalars().all()
458 # Get audit trails
459 audit_stmt = select(AuditTrail).where(AuditTrail.correlation_id == correlation_id).order_by(AuditTrail.timestamp)
461 audit_trails = db.execute(audit_stmt).scalars().all()
463 # Calculate metrics
464 durations = [log.duration_ms for log in logs if log.duration_ms is not None]
465 total_duration = sum(durations) if durations else None
466 error_count = sum(1 for log in logs if log.error_details)
468 # Get performance metrics (if any aggregations exist)
469 perf_metrics = None
470 if logs:
471 component = logs[0].component
472 operation = logs[0].operation_type
473 if component and operation:
474 perf_stmt = (
475 select(PerformanceMetric)
476 .where(and_(PerformanceMetric.component == component, PerformanceMetric.operation_type == operation))
477 .order_by(desc(PerformanceMetric.window_start))
478 .limit(1)
479 )
481 perf = db.execute(perf_stmt).scalar_one_or_none()
482 if perf:
483 perf_metrics = {
484 "avg_duration_ms": perf.avg_duration_ms,
485 "p95_duration_ms": perf.p95_duration_ms,
486 "p99_duration_ms": perf.p99_duration_ms,
487 "error_rate": perf.error_rate,
488 }
490 return CorrelationTraceResponse(
491 correlation_id=correlation_id,
492 total_duration_ms=total_duration,
493 log_count=len(logs),
494 error_count=error_count,
495 logs=[
496 LogEntry(
497 id=str(log.id),
498 timestamp=log.timestamp,
499 level=log.level,
500 component=log.component,
501 message=log.message,
502 correlation_id=log.correlation_id,
503 user_id=log.user_id,
504 user_email=log.user_email,
505 duration_ms=log.duration_ms,
506 operation_type=log.operation_type,
507 request_path=log.request_path,
508 request_method=log.request_method,
509 is_security_event=log.is_security_event,
510 error_details=log.error_details,
511 )
512 for log in logs
513 ],
514 security_events=[
515 {
516 "id": str(event.id),
517 "timestamp": event.timestamp.isoformat(),
518 "event_type": event.event_type,
519 "severity": event.severity,
520 "description": event.description,
521 "threat_score": event.threat_score,
522 }
523 for event in security_events
524 ],
525 audit_trails=[
526 {
527 "id": str(audit.id),
528 "timestamp": audit.timestamp.isoformat(),
529 "action": audit.action,
530 "resource_type": audit.resource_type,
531 "resource_id": audit.resource_id,
532 "success": audit.success,
533 }
534 for audit in audit_trails
535 ],
536 performance_metrics=perf_metrics,
537 )
539 except Exception as e:
540 logger.error(f"Correlation trace failed: {e}", exc_info=True)
541 raise HTTPException(status_code=500, detail=f"Correlation trace failed: {str(e)}")
544@router.get("/security-events", response_model=List[SecurityEventResponse])
545@require_permission("security:read")
546async def get_security_events(
547 severity: Optional[List[str]] = Query(None),
548 event_type: Optional[List[str]] = Query(None),
549 resolved: Optional[bool] = Query(None),
550 start_time: Optional[datetime] = Query(None),
551 end_time: Optional[datetime] = Query(None),
552 limit: int = Query(100, ge=1, le=1000),
553 offset: int = Query(0, ge=0),
554 user=Depends(get_current_user_with_permissions),
555 db: Session = Depends(get_db),
556) -> List[SecurityEventResponse]:
557 """Get security events with filters.
559 Args:
560 severity: Filter by severity levels
561 event_type: Filter by event types
562 resolved: Filter by resolution status
563 start_time: Start timestamp
564 end_time: End timestamp
565 limit: Maximum results
566 offset: Result offset
567 user: Current authenticated user
568 db: Database session
570 Returns:
571 List of security events
573 Raises:
574 HTTPException: On database or validation errors
575 """
576 try:
577 stmt = select(SecurityEvent)
579 conditions = []
580 if severity:
581 conditions.append(SecurityEvent.severity.in_(severity))
582 if event_type:
583 conditions.append(SecurityEvent.event_type.in_(event_type))
584 if resolved is not None:
585 conditions.append(SecurityEvent.resolved == resolved)
586 if start_time:
587 conditions.append(SecurityEvent.timestamp >= start_time)
588 if end_time:
589 conditions.append(SecurityEvent.timestamp <= end_time)
591 if conditions:
592 stmt = stmt.where(and_(*conditions))
594 stmt = stmt.order_by(desc(SecurityEvent.timestamp)).limit(limit).offset(offset)
596 events = db.execute(stmt).scalars().all()
598 return [
599 SecurityEventResponse(
600 id=str(event.id),
601 timestamp=event.timestamp,
602 event_type=event.event_type,
603 severity=event.severity,
604 category=event.category,
605 user_id=event.user_id,
606 client_ip=event.client_ip,
607 description=event.description,
608 threat_score=event.threat_score,
609 action_taken=event.action_taken,
610 resolved=event.resolved,
611 )
612 for event in events
613 ]
615 except Exception as e:
616 logger.error(f"Security events query failed: {e}", exc_info=True)
617 raise HTTPException(status_code=500, detail=f"Security events query failed: {str(e)}")
620@router.get("/audit-trails", response_model=List[AuditTrailResponse])
621@require_permission("audit:read")
622async def get_audit_trails(
623 action: Optional[List[str]] = Query(None),
624 resource_type: Optional[List[str]] = Query(None),
625 user_id: Optional[str] = Query(None),
626 requires_review: Optional[bool] = Query(None),
627 start_time: Optional[datetime] = Query(None),
628 end_time: Optional[datetime] = Query(None),
629 limit: int = Query(100, ge=1, le=1000),
630 offset: int = Query(0, ge=0),
631 user=Depends(get_current_user_with_permissions),
632 db: Session = Depends(get_db),
633) -> List[AuditTrailResponse]:
634 """Get audit trails with filters.
636 Args:
637 action: Filter by actions
638 resource_type: Filter by resource types
639 user_id: Filter by user ID
640 requires_review: Filter by review requirement
641 start_time: Start timestamp
642 end_time: End timestamp
643 limit: Maximum results
644 offset: Result offset
645 user: Current authenticated user
646 db: Database session
648 Returns:
649 List of audit trail entries
651 Raises:
652 HTTPException: On database or validation errors
653 """
654 try:
655 stmt = select(AuditTrail)
657 conditions = []
658 if action:
659 conditions.append(AuditTrail.action.in_(action))
660 if resource_type:
661 conditions.append(AuditTrail.resource_type.in_(resource_type))
662 if user_id:
663 conditions.append(AuditTrail.user_id == user_id)
664 if requires_review is not None:
665 conditions.append(AuditTrail.requires_review == requires_review)
666 if start_time:
667 conditions.append(AuditTrail.timestamp >= start_time)
668 if end_time:
669 conditions.append(AuditTrail.timestamp <= end_time)
671 if conditions:
672 stmt = stmt.where(and_(*conditions))
674 stmt = stmt.order_by(desc(AuditTrail.timestamp)).limit(limit).offset(offset)
676 trails = db.execute(stmt).scalars().all()
678 return [
679 AuditTrailResponse(
680 id=str(trail.id),
681 timestamp=trail.timestamp,
682 correlation_id=trail.correlation_id,
683 action=trail.action,
684 resource_type=trail.resource_type,
685 resource_id=trail.resource_id,
686 resource_name=trail.resource_name,
687 user_id=trail.user_id,
688 user_email=trail.user_email,
689 success=trail.success,
690 requires_review=trail.requires_review,
691 data_classification=trail.data_classification,
692 )
693 for trail in trails
694 ]
696 except Exception as e:
697 logger.error(f"Audit trails query failed: {e}", exc_info=True)
698 raise HTTPException(status_code=500, detail=f"Audit trails query failed: {str(e)}")
701@router.get("/performance-metrics", response_model=List[PerformanceMetricResponse])
702@require_permission("metrics:read")
703async def get_performance_metrics(
704 component: Optional[str] = Query(None),
705 operation: Optional[str] = Query(None),
706 hours: float = Query(24.0, ge=MIN_PERFORMANCE_RANGE_HOURS, le=1000.0, description="Historical window to display"),
707 aggregation: str = Query(_DEFAULT_AGGREGATION_KEY, pattern="^(5m|24h)$", description="Aggregation level for metrics"),
708 user=Depends(get_current_user_with_permissions),
709 db: Session = Depends(get_db),
710) -> List[PerformanceMetricResponse]:
711 """Get performance metrics.
713 Args:
714 component: Filter by component
715 operation: Filter by operation
716 aggregation: Aggregation level (5m, 1h, 1d, 7d)
717 hours: Hours of history
718 user: Current authenticated user
719 db: Database session
721 Returns:
722 List of performance metrics
724 Raises:
725 HTTPException: On database or validation errors
726 """
727 try:
728 aggregation_config = _AGGREGATION_LEVELS.get(aggregation, _AGGREGATION_LEVELS[_DEFAULT_AGGREGATION_KEY])
729 window_minutes = aggregation_config["minutes"]
730 window_duration_seconds = window_minutes * 60
732 if settings.metrics_aggregation_enabled:
733 try:
734 aggregator = get_log_aggregator()
735 if aggregation == "5m":
736 aggregator.backfill(hours=hours, db=db)
737 else:
738 _aggregate_custom_windows(
739 aggregator=aggregator,
740 window_minutes=window_minutes,
741 db=db,
742 )
743 except Exception as agg_error: # pragma: no cover - defensive logging
744 logger.warning("On-demand metrics aggregation failed: %s", agg_error)
746 stmt = select(PerformanceMetric).where(PerformanceMetric.window_duration_seconds == window_duration_seconds)
748 if component:
749 stmt = stmt.where(PerformanceMetric.component == component)
750 if operation:
751 stmt = stmt.where(PerformanceMetric.operation_type == operation)
753 stmt = stmt.order_by(desc(PerformanceMetric.window_start), desc(PerformanceMetric.timestamp))
755 metrics = db.execute(stmt).scalars().all()
757 metrics = _deduplicate_metrics(metrics)
759 return [
760 PerformanceMetricResponse(
761 id=str(metric.id),
762 timestamp=metric.timestamp,
763 component=metric.component,
764 operation_type=metric.operation_type,
765 window_start=metric.window_start,
766 window_end=metric.window_end,
767 request_count=metric.request_count,
768 error_count=metric.error_count,
769 error_rate=metric.error_rate,
770 avg_duration_ms=metric.avg_duration_ms,
771 min_duration_ms=metric.min_duration_ms,
772 max_duration_ms=metric.max_duration_ms,
773 p50_duration_ms=metric.p50_duration_ms,
774 p95_duration_ms=metric.p95_duration_ms,
775 p99_duration_ms=metric.p99_duration_ms,
776 )
777 for metric in metrics
778 ]
780 except Exception as e:
781 logger.error(f"Performance metrics query failed: {e}")
782 raise HTTPException(status_code=500, detail="Performance metrics query failed")