Coverage for mcpgateway / routers / log_search.py: 100%

327 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/routers/log_search.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5 

6Log Search API Router. 

7 

8This module provides REST API endpoints for searching and analyzing structured logs, 

9security events, audit trails, and performance metrics. 

10""" 

11 

12# Standard 

13from datetime import datetime, timedelta, timezone 

14import logging 

15from typing import Any, Dict, List, Optional, Tuple 

16 

17# Third-Party 

18from fastapi import APIRouter, Depends, HTTPException, Query 

19from pydantic import BaseModel, ConfigDict, Field 

20from sqlalchemy import and_, delete, desc, or_, select 

21from sqlalchemy.orm import Session 

22from sqlalchemy.sql import func as sa_func 

23 

24# First-Party 

25from mcpgateway.config import settings 

26from mcpgateway.db import ( 

27 AuditTrail, 

28 get_db, 

29 PerformanceMetric, 

30 SecurityEvent, 

31 StructuredLogEntry, 

32) 

33from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission 

34from mcpgateway.services.log_aggregator import get_log_aggregator 

35 

36logger = logging.getLogger(__name__) 

37 

38router = APIRouter(prefix="/api/logs", tags=["logs"]) 

39 

40MIN_PERFORMANCE_RANGE_HOURS = 5.0 / 60.0 

41_DEFAULT_AGGREGATION_KEY = "5m" 

42_AGGREGATION_LEVELS: Dict[str, Dict[str, Any]] = { 

43 "5m": {"minutes": 5, "label": "5-minute windows"}, 

44 "24h": {"minutes": 24 * 60, "label": "24-hour windows"}, 

45} 

46 

47 

48def _align_to_window(dt: datetime, window_minutes: int) -> datetime: 

49 """Align a datetime down to the nearest aggregation window boundary. 

50 

51 Args: 

52 dt: Datetime to align 

53 window_minutes: Aggregation window size in minutes 

54 

55 Returns: 

56 datetime: Aligned datetime at window boundary 

57 """ 

58 timestamp = dt.astimezone(timezone.utc) 

59 total_minutes = int(timestamp.timestamp() // 60) 

60 aligned_minutes = (total_minutes // window_minutes) * window_minutes 

61 return datetime.fromtimestamp(aligned_minutes * 60, tz=timezone.utc) 

62 

63 

64def _deduplicate_metrics(metrics: List[PerformanceMetric]) -> List[PerformanceMetric]: 

65 """Ensure a single metric per component/operation/window. 

66 

67 Args: 

68 metrics: List of performance metrics to deduplicate 

69 

70 Returns: 

71 List[PerformanceMetric]: Deduplicated metrics sorted by window_start 

72 """ 

73 if not metrics: 

74 return [] 

75 

76 deduped: Dict[Tuple[str, str, datetime], PerformanceMetric] = {} 

77 for metric in metrics: 

78 component = metric.component or "" 

79 operation = metric.operation_type or "" 

80 key = (component, operation, metric.window_start) 

81 existing = deduped.get(key) 

82 if existing is None or metric.timestamp > existing.timestamp: 

83 deduped[key] = metric 

84 

85 return sorted(deduped.values(), key=lambda m: m.window_start, reverse=True) 

86 

87 

88def _expand_component_filters(components: List[str]) -> List[str]: 

89 """Expand component filters to include aliases for backward compatibility. 

90 

91 Args: 

92 components: Component filter values from the request 

93 

94 Returns: 

95 List of component values including aliases 

96 """ 

97 normalized = {component for component in components if component} 

98 if "http_gateway" in normalized or "gateway" in normalized: 

99 normalized.update({"http_gateway", "gateway"}) 

100 return list(normalized) 

101 

102 

103def _aggregate_custom_windows( 

104 aggregator, 

105 window_minutes: int, 

106 db: Session, 

107) -> None: 

108 """Aggregate metrics using custom window duration. 

109 

110 Args: 

111 aggregator: Log aggregator instance 

112 window_minutes: Window size in minutes 

113 db: Database session 

114 """ 

115 window_delta = timedelta(minutes=window_minutes) 

116 window_duration_seconds = window_minutes * 60 

117 

118 sample_row = db.execute( 

119 select(PerformanceMetric.window_start, PerformanceMetric.window_end) 

120 .where(PerformanceMetric.window_duration_seconds == window_duration_seconds) 

121 .order_by(desc(PerformanceMetric.window_start)) 

122 .limit(1) 

123 ).first() 

124 

125 needs_rebuild = False 

126 if sample_row: 

127 sample_start, sample_end = sample_row 

128 if sample_start is not None and sample_end is not None: 

129 start_utc = sample_start if sample_start.tzinfo else sample_start.replace(tzinfo=timezone.utc) 

130 end_utc = sample_end if sample_end.tzinfo else sample_end.replace(tzinfo=timezone.utc) 

131 duration = int((end_utc - start_utc).total_seconds()) 

132 if duration != window_duration_seconds: 

133 needs_rebuild = True 

134 aligned_start = _align_to_window(start_utc, window_minutes) 

135 if aligned_start != start_utc: 

136 needs_rebuild = True 

137 

138 if needs_rebuild: 

139 db.execute(delete(PerformanceMetric).where(PerformanceMetric.window_duration_seconds == window_duration_seconds)) 

140 db.commit() 

141 sample_row = None 

142 

143 max_existing = None 

144 if not needs_rebuild: 

145 max_existing = db.execute(select(sa_func.max(PerformanceMetric.window_start)).where(PerformanceMetric.window_duration_seconds == window_duration_seconds)).scalar() 

146 

147 if max_existing: 

148 current_start = max_existing if max_existing.tzinfo else max_existing.replace(tzinfo=timezone.utc) 

149 current_start = current_start + window_delta 

150 else: 

151 earliest_log = db.execute(select(sa_func.min(StructuredLogEntry.timestamp))).scalar() 

152 if not earliest_log: 

153 return 

154 if earliest_log.tzinfo is None: 

155 earliest_log = earliest_log.replace(tzinfo=timezone.utc) 

156 current_start = _align_to_window(earliest_log, window_minutes) 

157 

158 reference_end = datetime.now(timezone.utc) 

159 

160 # Collect all window starts for the full range, then perform a single batched aggregation 

161 window_starts: List[datetime] = [] 

162 while current_start < reference_end: 

163 window_starts.append(current_start) 

164 current_start = current_start + window_delta 

165 

166 # Limit to prevent memory issues; keep most recent windows (trim oldest) 

167 max_windows = 10000 

168 if len(window_starts) > max_windows: 

169 logger.warning( 

170 "Window list truncated from %d to %d windows; keeping most recent", 

171 len(window_starts), 

172 max_windows, 

173 ) 

174 window_starts = window_starts[-max_windows:] 

175 

176 # Delegate to aggregator batch method to avoid per-window recomputation 

177 # Note: window_starts must be contiguous and aligned; sparse lists will generate extra windows 

178 if window_starts: 

179 batch_succeeded = False 

180 if hasattr(aggregator, "aggregate_all_components_batch"): 

181 try: 

182 aggregator.aggregate_all_components_batch(window_starts=window_starts, window_minutes=window_minutes, db=db) 

183 batch_succeeded = True 

184 except Exception: 

185 logger.exception("Batch aggregation failed; falling back to per-window aggregation") 

186 # Rollback failed transaction before attempting fallback (required for PostgreSQL) 

187 db.rollback() 

188 if not batch_succeeded: 

189 # Backwards-compatible fallback: iterate windows (less efficient) 

190 for ws in window_starts: 

191 aggregator.aggregate_all_components(window_start=ws, window_end=ws + window_delta, db=db) 

192 

193 

194# Request/Response Models 

195class LogSearchRequest(BaseModel): 

196 """Log search request parameters.""" 

197 

198 search_text: Optional[str] = Field(None, description="Text search query") 

199 level: Optional[List[str]] = Field(None, description="Log levels to filter") 

200 component: Optional[List[str]] = Field(None, description="Components to filter") 

201 category: Optional[List[str]] = Field(None, description="Categories to filter") 

202 correlation_id: Optional[str] = Field(None, description="Correlation ID to filter") 

203 user_id: Optional[str] = Field(None, description="User ID to filter") 

204 start_time: Optional[datetime] = Field(None, description="Start timestamp") 

205 end_time: Optional[datetime] = Field(None, description="End timestamp") 

206 min_duration_ms: Optional[float] = Field(None, description="Minimum duration") 

207 max_duration_ms: Optional[float] = Field(None, description="Maximum duration") 

208 has_error: Optional[bool] = Field(None, description="Filter for errors") 

209 limit: int = Field(100, ge=1, le=1000, description="Maximum results") 

210 offset: int = Field(0, ge=0, description="Result offset") 

211 sort_by: str = Field("timestamp", description="Field to sort by") 

212 sort_order: str = Field("desc", description="Sort order (asc/desc)") 

213 

214 

215class LogEntry(BaseModel): 

216 """Log entry response model.""" 

217 

218 id: str 

219 timestamp: datetime 

220 level: str 

221 component: str 

222 message: str 

223 correlation_id: Optional[str] = None 

224 user_id: Optional[str] = None 

225 user_email: Optional[str] = None 

226 duration_ms: Optional[float] = None 

227 operation_type: Optional[str] = None 

228 request_path: Optional[str] = None 

229 request_method: Optional[str] = None 

230 is_security_event: bool = False 

231 error_details: Optional[Dict[str, Any]] = None 

232 

233 model_config = ConfigDict(from_attributes=True) 

234 

235 

236class LogSearchResponse(BaseModel): 

237 """Log search response.""" 

238 

239 total: int 

240 results: List[LogEntry] 

241 

242 

243class CorrelationTraceRequest(BaseModel): 

244 """Correlation trace request.""" 

245 

246 correlation_id: str 

247 

248 

249class CorrelationTraceResponse(BaseModel): 

250 """Correlation trace response with all related logs.""" 

251 

252 correlation_id: str 

253 total_duration_ms: Optional[float] 

254 log_count: int 

255 error_count: int 

256 logs: List[LogEntry] 

257 security_events: List[Dict[str, Any]] 

258 audit_trails: List[Dict[str, Any]] 

259 performance_metrics: Optional[Dict[str, Any]] 

260 

261 

262class SecurityEventResponse(BaseModel): 

263 """Security event response model.""" 

264 

265 id: str 

266 timestamp: datetime 

267 event_type: str 

268 severity: str 

269 category: str 

270 user_id: Optional[str] 

271 client_ip: str 

272 description: str 

273 threat_score: float 

274 action_taken: Optional[str] 

275 resolved: bool 

276 

277 model_config = ConfigDict(from_attributes=True) 

278 

279 

280class AuditTrailResponse(BaseModel): 

281 """Audit trail response model.""" 

282 

283 id: str 

284 timestamp: datetime 

285 correlation_id: Optional[str] = None 

286 action: str 

287 resource_type: str 

288 resource_id: Optional[str] 

289 resource_name: Optional[str] = None 

290 user_id: str 

291 user_email: Optional[str] = None 

292 success: bool 

293 requires_review: bool 

294 data_classification: Optional[str] 

295 

296 model_config = ConfigDict(from_attributes=True) 

297 

298 

299class PerformanceMetricResponse(BaseModel): 

300 """Performance metric response model.""" 

301 

302 id: str 

303 timestamp: datetime 

304 component: str 

305 operation_type: str 

306 window_start: datetime 

307 window_end: datetime 

308 request_count: int 

309 error_count: int 

310 error_rate: float 

311 avg_duration_ms: float 

312 min_duration_ms: float 

313 max_duration_ms: float 

314 p50_duration_ms: float 

315 p95_duration_ms: float 

316 p99_duration_ms: float 

317 

318 model_config = ConfigDict(from_attributes=True) 

319 

320 

321# API Endpoints 

322@router.post("/search", response_model=LogSearchResponse) 

323@require_permission("logs:read") 

324async def search_logs(request: LogSearchRequest, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> LogSearchResponse: 

325 """Search structured logs with filters and pagination. 

326 

327 Args: 

328 request: Search parameters 

329 user: Current authenticated user 

330 db: Database session 

331 

332 Returns: 

333 Search results with pagination 

334 

335 Raises: 

336 HTTPException: On database or validation errors 

337 """ 

338 try: 

339 # Build base query 

340 stmt = select(StructuredLogEntry) 

341 

342 # Apply filters 

343 conditions = [] 

344 

345 if request.search_text: 

346 conditions.append(or_(StructuredLogEntry.message.ilike(f"%{request.search_text}%"), StructuredLogEntry.component.ilike(f"%{request.search_text}%"))) 

347 

348 if request.level: 

349 conditions.append(StructuredLogEntry.level.in_(request.level)) 

350 

351 if request.component: 

352 components = _expand_component_filters(request.component) 

353 conditions.append(StructuredLogEntry.component.in_(components)) 

354 

355 # Note: category field doesn't exist in StructuredLogEntry 

356 # if request.category: 

357 # conditions.append(StructuredLogEntry.category.in_(request.category)) 

358 

359 if request.correlation_id: 

360 conditions.append(StructuredLogEntry.correlation_id == request.correlation_id) 

361 

362 if request.user_id: 

363 conditions.append(StructuredLogEntry.user_id == request.user_id) 

364 

365 if request.start_time: 

366 conditions.append(StructuredLogEntry.timestamp >= request.start_time) 

367 

368 if request.end_time: 

369 conditions.append(StructuredLogEntry.timestamp <= request.end_time) 

370 

371 if request.min_duration_ms is not None: 

372 conditions.append(StructuredLogEntry.duration_ms >= request.min_duration_ms) 

373 

374 if request.max_duration_ms is not None: 

375 conditions.append(StructuredLogEntry.duration_ms <= request.max_duration_ms) 

376 

377 if request.has_error is not None: 

378 if request.has_error: 

379 conditions.append(StructuredLogEntry.error_details.isnot(None)) 

380 else: 

381 conditions.append(StructuredLogEntry.error_details.is_(None)) 

382 

383 if conditions: 

384 stmt = stmt.where(and_(*conditions)) 

385 

386 # Get total count 

387 count_stmt = select(sa_func.count()).select_from(stmt.subquery()) 

388 total = db.execute(count_stmt).scalar() or 0 

389 

390 # Apply sorting 

391 sort_column = getattr(StructuredLogEntry, request.sort_by, StructuredLogEntry.timestamp) 

392 if request.sort_order == "desc": 

393 stmt = stmt.order_by(desc(sort_column)) 

394 else: 

395 stmt = stmt.order_by(sort_column) 

396 

397 # Apply pagination 

398 stmt = stmt.limit(request.limit).offset(request.offset) 

399 

400 # Execute query 

401 results = db.execute(stmt).scalars().all() 

402 

403 # Convert to response models 

404 log_entries = [ 

405 LogEntry( 

406 id=str(log.id), 

407 timestamp=log.timestamp, 

408 level=log.level, 

409 component=log.component, 

410 message=log.message, 

411 correlation_id=log.correlation_id, 

412 user_id=log.user_id, 

413 user_email=log.user_email, 

414 duration_ms=log.duration_ms, 

415 operation_type=log.operation_type, 

416 request_path=log.request_path, 

417 request_method=log.request_method, 

418 is_security_event=log.is_security_event, 

419 error_details=log.error_details, 

420 ) 

421 for log in results 

422 ] 

423 

424 return LogSearchResponse(total=total, results=log_entries) 

425 

426 except Exception as e: 

427 logger.error(f"Log search failed: {e}") 

428 raise HTTPException(status_code=500, detail="Log search failed") 

429 

430 

431@router.get("/trace/{correlation_id}", response_model=CorrelationTraceResponse) 

432@require_permission("logs:read") 

433async def trace_correlation_id(correlation_id: str, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> CorrelationTraceResponse: 

434 """Get all logs and events for a correlation ID. 

435 

436 Args: 

437 correlation_id: Correlation ID to trace 

438 user: Current authenticated user 

439 db: Database session 

440 

441 Returns: 

442 Complete trace of all related logs and events 

443 

444 Raises: 

445 HTTPException: On database or validation errors 

446 """ 

447 try: 

448 # Get structured logs 

449 log_stmt = select(StructuredLogEntry).where(StructuredLogEntry.correlation_id == correlation_id).order_by(StructuredLogEntry.timestamp) 

450 

451 logs = db.execute(log_stmt).scalars().all() 

452 

453 # Get security events 

454 security_stmt = select(SecurityEvent).where(SecurityEvent.correlation_id == correlation_id).order_by(SecurityEvent.timestamp) 

455 

456 security_events = db.execute(security_stmt).scalars().all() 

457 

458 # Get audit trails 

459 audit_stmt = select(AuditTrail).where(AuditTrail.correlation_id == correlation_id).order_by(AuditTrail.timestamp) 

460 

461 audit_trails = db.execute(audit_stmt).scalars().all() 

462 

463 # Calculate metrics 

464 durations = [log.duration_ms for log in logs if log.duration_ms is not None] 

465 total_duration = sum(durations) if durations else None 

466 error_count = sum(1 for log in logs if log.error_details) 

467 

468 # Get performance metrics (if any aggregations exist) 

469 perf_metrics = None 

470 if logs: 

471 component = logs[0].component 

472 operation = logs[0].operation_type 

473 if component and operation: 

474 perf_stmt = ( 

475 select(PerformanceMetric) 

476 .where(and_(PerformanceMetric.component == component, PerformanceMetric.operation_type == operation)) 

477 .order_by(desc(PerformanceMetric.window_start)) 

478 .limit(1) 

479 ) 

480 

481 perf = db.execute(perf_stmt).scalar_one_or_none() 

482 if perf: 

483 perf_metrics = { 

484 "avg_duration_ms": perf.avg_duration_ms, 

485 "p95_duration_ms": perf.p95_duration_ms, 

486 "p99_duration_ms": perf.p99_duration_ms, 

487 "error_rate": perf.error_rate, 

488 } 

489 

490 return CorrelationTraceResponse( 

491 correlation_id=correlation_id, 

492 total_duration_ms=total_duration, 

493 log_count=len(logs), 

494 error_count=error_count, 

495 logs=[ 

496 LogEntry( 

497 id=str(log.id), 

498 timestamp=log.timestamp, 

499 level=log.level, 

500 component=log.component, 

501 message=log.message, 

502 correlation_id=log.correlation_id, 

503 user_id=log.user_id, 

504 user_email=log.user_email, 

505 duration_ms=log.duration_ms, 

506 operation_type=log.operation_type, 

507 request_path=log.request_path, 

508 request_method=log.request_method, 

509 is_security_event=log.is_security_event, 

510 error_details=log.error_details, 

511 ) 

512 for log in logs 

513 ], 

514 security_events=[ 

515 { 

516 "id": str(event.id), 

517 "timestamp": event.timestamp.isoformat(), 

518 "event_type": event.event_type, 

519 "severity": event.severity, 

520 "description": event.description, 

521 "threat_score": event.threat_score, 

522 } 

523 for event in security_events 

524 ], 

525 audit_trails=[ 

526 { 

527 "id": str(audit.id), 

528 "timestamp": audit.timestamp.isoformat(), 

529 "action": audit.action, 

530 "resource_type": audit.resource_type, 

531 "resource_id": audit.resource_id, 

532 "success": audit.success, 

533 } 

534 for audit in audit_trails 

535 ], 

536 performance_metrics=perf_metrics, 

537 ) 

538 

539 except Exception as e: 

540 logger.error(f"Correlation trace failed: {e}", exc_info=True) 

541 raise HTTPException(status_code=500, detail=f"Correlation trace failed: {str(e)}") 

542 

543 

544@router.get("/security-events", response_model=List[SecurityEventResponse]) 

545@require_permission("security:read") 

546async def get_security_events( 

547 severity: Optional[List[str]] = Query(None), 

548 event_type: Optional[List[str]] = Query(None), 

549 resolved: Optional[bool] = Query(None), 

550 start_time: Optional[datetime] = Query(None), 

551 end_time: Optional[datetime] = Query(None), 

552 limit: int = Query(100, ge=1, le=1000), 

553 offset: int = Query(0, ge=0), 

554 user=Depends(get_current_user_with_permissions), 

555 db: Session = Depends(get_db), 

556) -> List[SecurityEventResponse]: 

557 """Get security events with filters. 

558 

559 Args: 

560 severity: Filter by severity levels 

561 event_type: Filter by event types 

562 resolved: Filter by resolution status 

563 start_time: Start timestamp 

564 end_time: End timestamp 

565 limit: Maximum results 

566 offset: Result offset 

567 user: Current authenticated user 

568 db: Database session 

569 

570 Returns: 

571 List of security events 

572 

573 Raises: 

574 HTTPException: On database or validation errors 

575 """ 

576 try: 

577 stmt = select(SecurityEvent) 

578 

579 conditions = [] 

580 if severity: 

581 conditions.append(SecurityEvent.severity.in_(severity)) 

582 if event_type: 

583 conditions.append(SecurityEvent.event_type.in_(event_type)) 

584 if resolved is not None: 

585 conditions.append(SecurityEvent.resolved == resolved) 

586 if start_time: 

587 conditions.append(SecurityEvent.timestamp >= start_time) 

588 if end_time: 

589 conditions.append(SecurityEvent.timestamp <= end_time) 

590 

591 if conditions: 

592 stmt = stmt.where(and_(*conditions)) 

593 

594 stmt = stmt.order_by(desc(SecurityEvent.timestamp)).limit(limit).offset(offset) 

595 

596 events = db.execute(stmt).scalars().all() 

597 

598 return [ 

599 SecurityEventResponse( 

600 id=str(event.id), 

601 timestamp=event.timestamp, 

602 event_type=event.event_type, 

603 severity=event.severity, 

604 category=event.category, 

605 user_id=event.user_id, 

606 client_ip=event.client_ip, 

607 description=event.description, 

608 threat_score=event.threat_score, 

609 action_taken=event.action_taken, 

610 resolved=event.resolved, 

611 ) 

612 for event in events 

613 ] 

614 

615 except Exception as e: 

616 logger.error(f"Security events query failed: {e}", exc_info=True) 

617 raise HTTPException(status_code=500, detail=f"Security events query failed: {str(e)}") 

618 

619 

620@router.get("/audit-trails", response_model=List[AuditTrailResponse]) 

621@require_permission("audit:read") 

622async def get_audit_trails( 

623 action: Optional[List[str]] = Query(None), 

624 resource_type: Optional[List[str]] = Query(None), 

625 user_id: Optional[str] = Query(None), 

626 requires_review: Optional[bool] = Query(None), 

627 start_time: Optional[datetime] = Query(None), 

628 end_time: Optional[datetime] = Query(None), 

629 limit: int = Query(100, ge=1, le=1000), 

630 offset: int = Query(0, ge=0), 

631 user=Depends(get_current_user_with_permissions), 

632 db: Session = Depends(get_db), 

633) -> List[AuditTrailResponse]: 

634 """Get audit trails with filters. 

635 

636 Args: 

637 action: Filter by actions 

638 resource_type: Filter by resource types 

639 user_id: Filter by user ID 

640 requires_review: Filter by review requirement 

641 start_time: Start timestamp 

642 end_time: End timestamp 

643 limit: Maximum results 

644 offset: Result offset 

645 user: Current authenticated user 

646 db: Database session 

647 

648 Returns: 

649 List of audit trail entries 

650 

651 Raises: 

652 HTTPException: On database or validation errors 

653 """ 

654 try: 

655 stmt = select(AuditTrail) 

656 

657 conditions = [] 

658 if action: 

659 conditions.append(AuditTrail.action.in_(action)) 

660 if resource_type: 

661 conditions.append(AuditTrail.resource_type.in_(resource_type)) 

662 if user_id: 

663 conditions.append(AuditTrail.user_id == user_id) 

664 if requires_review is not None: 

665 conditions.append(AuditTrail.requires_review == requires_review) 

666 if start_time: 

667 conditions.append(AuditTrail.timestamp >= start_time) 

668 if end_time: 

669 conditions.append(AuditTrail.timestamp <= end_time) 

670 

671 if conditions: 

672 stmt = stmt.where(and_(*conditions)) 

673 

674 stmt = stmt.order_by(desc(AuditTrail.timestamp)).limit(limit).offset(offset) 

675 

676 trails = db.execute(stmt).scalars().all() 

677 

678 return [ 

679 AuditTrailResponse( 

680 id=str(trail.id), 

681 timestamp=trail.timestamp, 

682 correlation_id=trail.correlation_id, 

683 action=trail.action, 

684 resource_type=trail.resource_type, 

685 resource_id=trail.resource_id, 

686 resource_name=trail.resource_name, 

687 user_id=trail.user_id, 

688 user_email=trail.user_email, 

689 success=trail.success, 

690 requires_review=trail.requires_review, 

691 data_classification=trail.data_classification, 

692 ) 

693 for trail in trails 

694 ] 

695 

696 except Exception as e: 

697 logger.error(f"Audit trails query failed: {e}", exc_info=True) 

698 raise HTTPException(status_code=500, detail=f"Audit trails query failed: {str(e)}") 

699 

700 

701@router.get("/performance-metrics", response_model=List[PerformanceMetricResponse]) 

702@require_permission("metrics:read") 

703async def get_performance_metrics( 

704 component: Optional[str] = Query(None), 

705 operation: Optional[str] = Query(None), 

706 hours: float = Query(24.0, ge=MIN_PERFORMANCE_RANGE_HOURS, le=1000.0, description="Historical window to display"), 

707 aggregation: str = Query(_DEFAULT_AGGREGATION_KEY, pattern="^(5m|24h)$", description="Aggregation level for metrics"), 

708 user=Depends(get_current_user_with_permissions), 

709 db: Session = Depends(get_db), 

710) -> List[PerformanceMetricResponse]: 

711 """Get performance metrics. 

712 

713 Args: 

714 component: Filter by component 

715 operation: Filter by operation 

716 aggregation: Aggregation level (5m, 1h, 1d, 7d) 

717 hours: Hours of history 

718 user: Current authenticated user 

719 db: Database session 

720 

721 Returns: 

722 List of performance metrics 

723 

724 Raises: 

725 HTTPException: On database or validation errors 

726 """ 

727 try: 

728 aggregation_config = _AGGREGATION_LEVELS.get(aggregation, _AGGREGATION_LEVELS[_DEFAULT_AGGREGATION_KEY]) 

729 window_minutes = aggregation_config["minutes"] 

730 window_duration_seconds = window_minutes * 60 

731 

732 if settings.metrics_aggregation_enabled: 

733 try: 

734 aggregator = get_log_aggregator() 

735 if aggregation == "5m": 

736 aggregator.backfill(hours=hours, db=db) 

737 else: 

738 _aggregate_custom_windows( 

739 aggregator=aggregator, 

740 window_minutes=window_minutes, 

741 db=db, 

742 ) 

743 except Exception as agg_error: # pragma: no cover - defensive logging 

744 logger.warning("On-demand metrics aggregation failed: %s", agg_error) 

745 

746 stmt = select(PerformanceMetric).where(PerformanceMetric.window_duration_seconds == window_duration_seconds) 

747 

748 if component: 

749 stmt = stmt.where(PerformanceMetric.component == component) 

750 if operation: 

751 stmt = stmt.where(PerformanceMetric.operation_type == operation) 

752 

753 stmt = stmt.order_by(desc(PerformanceMetric.window_start), desc(PerformanceMetric.timestamp)) 

754 

755 metrics = db.execute(stmt).scalars().all() 

756 

757 metrics = _deduplicate_metrics(metrics) 

758 

759 return [ 

760 PerformanceMetricResponse( 

761 id=str(metric.id), 

762 timestamp=metric.timestamp, 

763 component=metric.component, 

764 operation_type=metric.operation_type, 

765 window_start=metric.window_start, 

766 window_end=metric.window_end, 

767 request_count=metric.request_count, 

768 error_count=metric.error_count, 

769 error_rate=metric.error_rate, 

770 avg_duration_ms=metric.avg_duration_ms, 

771 min_duration_ms=metric.min_duration_ms, 

772 max_duration_ms=metric.max_duration_ms, 

773 p50_duration_ms=metric.p50_duration_ms, 

774 p95_duration_ms=metric.p95_duration_ms, 

775 p99_duration_ms=metric.p99_duration_ms, 

776 ) 

777 for metric in metrics 

778 ] 

779 

780 except Exception as e: 

781 logger.error(f"Performance metrics query failed: {e}") 

782 raise HTTPException(status_code=500, detail="Performance metrics query failed")