Coverage for mcpgateway / routers / observability.py: 100%
145 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/routers/observability.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Observability API Router.
8Provides REST endpoints for querying traces, spans, events, and metrics.
9"""
11# Standard
12from datetime import datetime, timedelta
13from typing import List, Optional
15# Third-Party
16from fastapi import APIRouter, Depends, HTTPException, Query
17import orjson
18from sqlalchemy import text
19from sqlalchemy.orm import Session
21# First-Party
22from mcpgateway.db import SessionLocal
23from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission
24from mcpgateway.schemas import ObservabilitySpanRead, ObservabilityTraceRead, ObservabilityTraceWithSpans
25from mcpgateway.services.observability_service import ObservabilityService
27router = APIRouter(prefix="/observability", tags=["Observability"])
30def get_db():
31 """Database session dependency.
33 Commits the transaction on successful completion to avoid implicit rollbacks
34 for read-only operations. Rolls back explicitly on exception.
36 Yields:
37 Session: SQLAlchemy database session
39 Raises:
40 Exception: Re-raises any exception after rolling back the transaction.
41 """
42 db = SessionLocal()
43 try:
44 yield db
45 db.commit()
46 except Exception:
47 try:
48 db.rollback()
49 except Exception:
50 try:
51 db.invalidate()
52 except Exception:
53 pass # nosec B110 - Best effort cleanup on connection failure
54 raise
55 finally:
56 db.close()
59@router.get("/traces", response_model=List[ObservabilityTraceRead])
60@require_permission("admin.system_config")
61async def list_traces(
62 start_time: Optional[datetime] = Query(None, description="Filter traces after this time"),
63 end_time: Optional[datetime] = Query(None, description="Filter traces before this time"),
64 min_duration_ms: Optional[float] = Query(None, ge=0, description="Minimum duration in milliseconds"),
65 max_duration_ms: Optional[float] = Query(None, ge=0, description="Maximum duration in milliseconds"),
66 status: Optional[str] = Query(None, description="Filter by status (ok, error)"),
67 http_status_code: Optional[int] = Query(None, description="Filter by HTTP status code"),
68 http_method: Optional[str] = Query(None, description="Filter by HTTP method (GET, POST, etc.)"),
69 user_email: Optional[str] = Query(None, description="Filter by user email"),
70 attribute_search: Optional[str] = Query(None, description="Free-text search within trace attributes"),
71 limit: int = Query(100, ge=1, le=1000, description="Maximum results"),
72 offset: int = Query(0, ge=0, description="Result offset"),
73 db: Session = Depends(get_db),
74 _user=Depends(get_current_user_with_permissions),
75):
76 """List traces with optional filtering.
78 Query traces with various filters including time range, duration, status, HTTP method,
79 HTTP status code, user email, and attribute search. Results are paginated.
81 Note: For structured attribute filtering (key-value pairs with AND logic),
82 use a JSON request body via POST endpoint or the Python SDK.
84 Args:
85 start_time: Filter traces after this time
86 end_time: Filter traces before this time
87 min_duration_ms: Minimum duration in milliseconds
88 max_duration_ms: Maximum duration in milliseconds
89 status: Filter by status (ok, error)
90 http_status_code: Filter by HTTP status code
91 http_method: Filter by HTTP method (GET, POST, etc.)
92 user_email: Filter by user email
93 attribute_search: Free-text search across all trace attributes
94 limit: Maximum results
95 offset: Result offset
96 db: Database session
98 Returns:
99 List[ObservabilityTraceRead]: List of traces matching filters
101 Examples:
102 >>> import asyncio
103 >>> import mcpgateway.routers.observability as obs
104 >>> from mcpgateway.config import settings
105 >>> class FakeTrace:
106 ... def __init__(self, trace_id='t1'):
107 ... self.trace_id = trace_id
108 ... self.name = 'n'
109 ... self.start_time = None
110 ... self.end_time = None
111 ... self.duration_ms = 100
112 ... self.status = 'ok'
113 ... self.http_method = 'GET'
114 ... self.http_url = '/'
115 ... self.http_status_code = 200
116 ... self.user_email = 'u'
117 >>> class FakeService:
118 ... def query_traces(self, **kwargs):
119 ... return [FakeTrace('t1')]
120 >>> obs.ObservabilityService = FakeService
121 >>> async def run_list_traces():
122 ... traces = await obs.list_traces(db=None, _user={"email": settings.platform_admin_email, "db": None})
123 ... return traces[0].trace_id
124 >>> asyncio.run(run_list_traces())
125 't1'
126 """
127 service = ObservabilityService()
128 traces = service.query_traces(
129 db=db,
130 start_time=start_time,
131 end_time=end_time,
132 min_duration_ms=min_duration_ms,
133 max_duration_ms=max_duration_ms,
134 status=status,
135 http_status_code=http_status_code,
136 http_method=http_method,
137 user_email=user_email,
138 attribute_search=attribute_search,
139 limit=limit,
140 offset=offset,
141 )
142 return traces
145@router.post("/traces/query", response_model=List[ObservabilityTraceRead])
146@require_permission("admin.system_config")
147async def query_traces_advanced(
148 # Third-Party
149 request_body: dict,
150 db: Session = Depends(get_db),
151 _user=Depends(get_current_user_with_permissions),
152):
153 """Advanced trace querying with attribute filtering.
155 POST endpoint that accepts a JSON body with complex filtering criteria,
156 including structured attribute filters with AND logic.
158 Request Body:
159 {
160 "start_time": "2025-01-01T00:00:00Z", # Optional datetime
161 "end_time": "2025-01-02T00:00:00Z", # Optional datetime
162 "min_duration_ms": 100.0, # Optional float
163 "max_duration_ms": 5000.0, # Optional float
164 "status": "error", # Optional string
165 "http_status_code": 500, # Optional int
166 "http_method": "POST", # Optional string
167 "user_email": "user@example.com", # Optional string
168 "attribute_filters": { # Optional dict (AND logic)
169 "http.route": "/api/tools",
170 "service.name": "mcp-gateway"
171 },
172 "attribute_search": "error", # Optional string (OR logic)
173 "limit": 100, # Optional int
174 "offset": 0 # Optional int
175 }
177 Args:
178 request_body: JSON request body with filter criteria
179 db: Database session
181 Returns:
182 List[ObservabilityTraceRead]: List of traces matching filters
184 Raises:
185 HTTPException: 400 error if request body is invalid
187 Examples:
188 >>> import asyncio
189 >>> from fastapi import HTTPException
190 >>> from mcpgateway.config import settings
191 >>> async def run_invalid_query():
192 ... try:
193 ... await query_traces_advanced({"start_time": "not-a-date"}, db=None, _user={"email": settings.platform_admin_email, "db": None})
194 ... except HTTPException as e:
195 ... return (e.status_code, "Invalid request body" in str(e.detail))
196 >>> asyncio.run(run_invalid_query())
197 (400, True)
199 >>> import mcpgateway.routers.observability as obs
200 >>> class FakeTrace:
201 ... def __init__(self):
202 ... self.trace_id = 'tx'
203 ... self.name = 'n'
205 >>> class FakeService2:
206 ... def query_traces(self, **kwargs):
207 ... return [FakeTrace()]
208 >>> obs.ObservabilityService = FakeService2
209 >>> async def run_query_traces():
210 ... traces = await obs.query_traces_advanced({}, db=None, _user={"email": settings.platform_admin_email, "db": None})
211 ... return traces[0].trace_id
212 >>> asyncio.run(run_query_traces())
213 'tx'
214 """
215 # Third-Party
216 from pydantic import ValidationError
218 try:
219 # Extract filters from request body
220 service = ObservabilityService()
222 # Parse datetime strings if provided
223 start_time = request_body.get("start_time")
224 if start_time and isinstance(start_time, str):
225 start_time = datetime.fromisoformat(start_time.replace("Z", "+00:00"))
227 end_time = request_body.get("end_time")
228 if end_time and isinstance(end_time, str):
229 end_time = datetime.fromisoformat(end_time.replace("Z", "+00:00"))
231 traces = service.query_traces(
232 db=db,
233 start_time=start_time,
234 end_time=end_time,
235 min_duration_ms=request_body.get("min_duration_ms"),
236 max_duration_ms=request_body.get("max_duration_ms"),
237 status=request_body.get("status"),
238 status_in=request_body.get("status_in"),
239 status_not_in=request_body.get("status_not_in"),
240 http_status_code=request_body.get("http_status_code"),
241 http_status_code_in=request_body.get("http_status_code_in"),
242 http_method=request_body.get("http_method"),
243 http_method_in=request_body.get("http_method_in"),
244 user_email=request_body.get("user_email"),
245 user_email_in=request_body.get("user_email_in"),
246 attribute_filters=request_body.get("attribute_filters"),
247 attribute_filters_or=request_body.get("attribute_filters_or"),
248 attribute_search=request_body.get("attribute_search"),
249 name_contains=request_body.get("name_contains"),
250 order_by=request_body.get("order_by", "start_time_desc"),
251 limit=request_body.get("limit", 100),
252 offset=request_body.get("offset", 0),
253 )
254 return traces
255 except (ValidationError, ValueError) as e:
256 raise HTTPException(status_code=400, detail=f"Invalid request body: {e}")
259@router.get("/traces/{trace_id}", response_model=ObservabilityTraceWithSpans)
260@require_permission("admin.system_config")
261async def get_trace(trace_id: str, db: Session = Depends(get_db), _user=Depends(get_current_user_with_permissions)):
262 """Get a trace by ID with all its spans and events.
264 Returns a complete trace with all nested spans and their events,
265 providing a full view of the request flow.
267 Args:
268 trace_id: UUID of the trace to retrieve
269 db: Database session
271 Returns:
272 ObservabilityTraceWithSpans: Complete trace with all spans and events
274 Raises:
275 HTTPException: 404 if trace not found
277 Examples:
278 >>> import asyncio
279 >>> import mcpgateway.routers.observability as obs
280 >>> from mcpgateway.config import settings
281 >>> class FakeService:
282 ... def get_trace_with_spans(self, db, trace_id):
283 ... return None
284 >>> obs.ObservabilityService = FakeService
285 >>> async def run_missing_trace():
286 ... try:
287 ... await obs.get_trace("missing", db=None, _user={"email": settings.platform_admin_email, "db": None})
288 ... except obs.HTTPException as e:
289 ... return e.status_code
290 >>> asyncio.run(run_missing_trace())
291 404
292 >>> class FakeService2:
293 ... def get_trace_with_spans(self, db, trace_id):
294 ... return {'trace_id': trace_id}
295 >>> obs.ObservabilityService = FakeService2
296 >>> async def run_found_trace():
297 ... trace = await obs.get_trace("found", db=None, _user={"email": settings.platform_admin_email, "db": None})
298 ... return trace["trace_id"]
299 >>> asyncio.run(run_found_trace())
300 'found'
301 """
302 service = ObservabilityService()
303 trace = service.get_trace_with_spans(db, trace_id)
304 if not trace:
305 raise HTTPException(status_code=404, detail="Trace not found")
306 return trace
309@router.get("/spans", response_model=List[ObservabilitySpanRead])
310@require_permission("admin.system_config")
311async def list_spans(
312 trace_id: Optional[str] = Query(None, description="Filter by trace ID"),
313 resource_type: Optional[str] = Query(None, description="Filter by resource type"),
314 resource_name: Optional[str] = Query(None, description="Filter by resource name"),
315 start_time: Optional[datetime] = Query(None, description="Filter spans after this time"),
316 end_time: Optional[datetime] = Query(None, description="Filter spans before this time"),
317 limit: int = Query(100, ge=1, le=1000, description="Maximum results"),
318 offset: int = Query(0, ge=0, description="Result offset"),
319 db: Session = Depends(get_db),
320 _user=Depends(get_current_user_with_permissions),
321):
322 """List spans with optional filtering.
324 Query spans by trace ID, resource type, resource name, or time range.
325 Useful for analyzing specific operations or resource performance.
327 Args:
328 trace_id: Filter by trace ID
329 resource_type: Filter by resource type
330 resource_name: Filter by resource name
331 start_time: Filter spans after this time
332 end_time: Filter spans before this time
333 limit: Maximum results
334 offset: Result offset
335 db: Database session
337 Returns:
338 List[ObservabilitySpanRead]: List of spans matching filters
340 Examples:
341 >>> import asyncio
342 >>> import mcpgateway.routers.observability as obs
343 >>> from mcpgateway.config import settings
344 >>> class FakeSpan:
345 ... def __init__(self):
346 ... self.span_id = 's1'
347 ... self.trace_id = 't1'
348 ... self.name = 'op'
349 >>> class FakeService:
350 ... def query_spans(self, **kwargs):
351 ... return [FakeSpan()]
352 >>> obs.ObservabilityService = FakeService
353 >>> async def run_list_spans():
354 ... spans = await obs.list_spans(db=None, _user={"email": settings.platform_admin_email, "db": None})
355 ... return spans[0].span_id
356 >>> asyncio.run(run_list_spans())
357 's1'
358 """
359 service = ObservabilityService()
360 spans = service.query_spans(
361 db=db,
362 trace_id=trace_id,
363 resource_type=resource_type,
364 resource_name=resource_name,
365 start_time=start_time,
366 end_time=end_time,
367 limit=limit,
368 offset=offset,
369 )
370 return spans
373@router.delete("/traces/cleanup")
374@require_permission("admin.system_config")
375async def cleanup_old_traces(
376 days: int = Query(7, ge=1, description="Delete traces older than this many days"),
377 db: Session = Depends(get_db),
378 _user=Depends(get_current_user_with_permissions),
379):
380 """Delete traces older than a specified number of days.
382 Cleans up old trace data to manage storage. Cascading deletes will
383 also remove associated spans, events, and metrics.
385 Args:
386 days: Delete traces older than this many days
387 db: Database session
389 Returns:
390 dict: Number of deleted traces and cutoff time
392 Examples:
393 >>> import asyncio
394 >>> import mcpgateway.routers.observability as obs
395 >>> from mcpgateway.config import settings
396 >>> class FakeService:
397 ... def delete_old_traces(self, db, cutoff):
398 ... return 5
399 >>> obs.ObservabilityService = FakeService
400 >>> async def run_cleanup():
401 ... res = await obs.cleanup_old_traces(days=7, db=None, _user={"email": settings.platform_admin_email, "db": None})
402 ... return res["deleted"]
403 >>> asyncio.run(run_cleanup())
404 5
405 """
406 service = ObservabilityService()
407 cutoff_time = datetime.now() - timedelta(days=days)
408 deleted = service.delete_old_traces(db, cutoff_time)
409 return {"deleted": deleted, "cutoff_time": cutoff_time}
412@router.get("/stats")
413@require_permission("admin.system_config")
414async def get_stats(
415 hours: int = Query(24, ge=1, le=168, description="Time window in hours"),
416 db: Session = Depends(get_db),
417 _user=Depends(get_current_user_with_permissions),
418):
419 """Get observability statistics.
421 Returns summary statistics including:
422 - Total traces in time window
423 - Success/error counts
424 - Average response time
425 - Top slowest endpoints
427 Args:
428 hours: Time window in hours
429 db: Database session
431 Returns:
432 dict: Statistics including counts, error rate, and slowest endpoints
433 """
434 # Third-Party
435 from sqlalchemy import func
437 # First-Party
438 from mcpgateway.db import ObservabilityTrace
440 ObservabilityService()
441 cutoff_time = datetime.now() - timedelta(hours=hours)
443 # Get basic counts
444 total_traces = db.query(func.count(ObservabilityTrace.trace_id)).filter(ObservabilityTrace.start_time >= cutoff_time).scalar()
446 success_count = db.query(func.count(ObservabilityTrace.trace_id)).filter(ObservabilityTrace.start_time >= cutoff_time, ObservabilityTrace.status == "ok").scalar()
448 error_count = db.query(func.count(ObservabilityTrace.trace_id)).filter(ObservabilityTrace.start_time >= cutoff_time, ObservabilityTrace.status == "error").scalar()
450 avg_duration = db.query(func.avg(ObservabilityTrace.duration_ms)).filter(ObservabilityTrace.start_time >= cutoff_time, ObservabilityTrace.duration_ms.isnot(None)).scalar() or 0
452 # Get slowest endpoints
453 slowest = (
454 db.query(ObservabilityTrace.name, func.avg(ObservabilityTrace.duration_ms).label("avg_duration"), func.count(ObservabilityTrace.trace_id).label("count"))
455 .filter(ObservabilityTrace.start_time >= cutoff_time, ObservabilityTrace.duration_ms.isnot(None))
456 .group_by(ObservabilityTrace.name)
457 .order_by(func.avg(ObservabilityTrace.duration_ms).desc())
458 .limit(10)
459 .all()
460 )
462 return {
463 "time_window_hours": hours,
464 "total_traces": total_traces,
465 "success_count": success_count,
466 "error_count": error_count,
467 "error_rate": (error_count / total_traces * 100) if total_traces > 0 else 0,
468 "avg_duration_ms": round(avg_duration, 2),
469 "slowest_endpoints": [{"name": row[0], "avg_duration_ms": round(row[1], 2), "count": row[2]} for row in slowest],
470 }
473@router.post("/traces/export")
474@require_permission("admin.system_config")
475async def export_traces(
476 request_body: dict,
477 format: str = Query("json", description="Export format (json, csv, ndjson)"),
478 db: Session = Depends(get_db),
479 _user=Depends(get_current_user_with_permissions),
480):
481 """Export traces in various formats.
483 POST endpoint that accepts filter criteria (same as /traces/query) and exports
484 matching traces in the specified format.
486 Supported formats:
487 - json: Standard JSON array
488 - csv: Comma-separated values
489 - ndjson: Newline-delimited JSON (streaming)
491 Args:
492 request_body: JSON request body with filter criteria (same as /traces/query)
493 format: Export format (json, csv, ndjson)
494 db: Database session
496 Returns:
497 StreamingResponse or JSONResponse with exported data
499 Raises:
500 HTTPException: 400 error if format is invalid or export fails
502 Examples:
503 >>> import asyncio
504 >>> from datetime import datetime
505 >>> from fastapi import HTTPException
506 >>> import mcpgateway.routers.observability as obs
507 >>> from mcpgateway.config import settings
508 >>> async def run_invalid_export():
509 ... try:
510 ... await export_traces({}, format="xml", db=None, _user={"email": settings.platform_admin_email, "db": None})
511 ... except HTTPException as e:
512 ... return (e.status_code, "format must be one of" in str(e.detail))
513 >>> asyncio.run(run_invalid_export())
514 (400, True)
515 >>> class FakeTrace:
516 ... def __init__(self):
517 ... self.trace_id = 'tx'
518 ... self.name = 'name'
519 ... self.start_time = datetime(2025,1,1)
520 ... self.end_time = None
521 ... self.duration_ms = 100
522 ... self.status = 'ok'
523 ... self.http_method = 'GET'
524 ... self.http_url = '/'
525 ... self.http_status_code = 200
526 ... self.user_email = 'u'
527 >>> class FakeService:
528 ... def query_traces(self, **kwargs):
529 ... return [FakeTrace()]
530 >>> obs.ObservabilityService = FakeService
531 >>> async def run_json_export():
532 ... out = await obs.export_traces({}, format="json", db=None, _user={"email": settings.platform_admin_email, "db": None})
533 ... return out[0]["trace_id"]
534 >>> asyncio.run(run_json_export())
535 'tx'
536 >>> async def run_csv_export():
537 ... resp = await obs.export_traces({}, format="csv", db=None, _user={"email": settings.platform_admin_email, "db": None})
538 ... return hasattr(resp, "media_type") and "csv" in resp.media_type
539 >>> asyncio.run(run_csv_export())
540 True
541 >>> async def run_ndjson_export():
542 ... resp2 = await obs.export_traces({}, format="ndjson", db=None, _user={"email": settings.platform_admin_email, "db": None})
543 ... return type(resp2).__name__
544 >>> asyncio.run(run_ndjson_export())
545 'StreamingResponse'
546 """
547 # Standard
548 import csv
549 import io
551 # Third-Party
552 from starlette.responses import Response, StreamingResponse
554 # Validate format
555 if format not in ["json", "csv", "ndjson"]:
556 raise HTTPException(status_code=400, detail="format must be one of: json, csv, ndjson")
558 try:
559 service = ObservabilityService()
561 # Parse datetime strings
562 start_time = request_body.get("start_time")
563 if start_time and isinstance(start_time, str):
564 start_time = datetime.fromisoformat(start_time.replace("Z", "+00:00"))
566 end_time = request_body.get("end_time")
567 if end_time and isinstance(end_time, str):
568 end_time = datetime.fromisoformat(end_time.replace("Z", "+00:00"))
570 # Query traces
571 traces = service.query_traces(
572 db=db,
573 start_time=start_time,
574 end_time=end_time,
575 min_duration_ms=request_body.get("min_duration_ms"),
576 max_duration_ms=request_body.get("max_duration_ms"),
577 status=request_body.get("status"),
578 status_in=request_body.get("status_in"),
579 http_status_code=request_body.get("http_status_code"),
580 http_method=request_body.get("http_method"),
581 user_email=request_body.get("user_email"),
582 order_by=request_body.get("order_by", "start_time_desc"),
583 limit=request_body.get("limit", 1000), # Higher limit for export
584 offset=request_body.get("offset", 0),
585 )
587 if format == "json":
588 # Standard JSON response
589 return [
590 {
591 "trace_id": t.trace_id,
592 "name": t.name,
593 "start_time": t.start_time.isoformat() if t.start_time else None,
594 "end_time": t.end_time.isoformat() if t.end_time else None,
595 "duration_ms": t.duration_ms,
596 "status": t.status,
597 "http_method": t.http_method,
598 "http_url": t.http_url,
599 "http_status_code": t.http_status_code,
600 "user_email": t.user_email,
601 }
602 for t in traces
603 ]
605 elif format == "csv":
606 # CSV export
607 output = io.StringIO()
608 writer = csv.writer(output)
610 # Write header
611 writer.writerow(["trace_id", "name", "start_time", "duration_ms", "status", "http_method", "http_status_code", "user_email"])
613 # Write data
614 for t in traces:
615 writer.writerow(
616 [t.trace_id, t.name, t.start_time.isoformat() if t.start_time else "", t.duration_ms or "", t.status, t.http_method or "", t.http_status_code or "", t.user_email or ""]
617 )
619 output.seek(0)
620 return Response(content=output.getvalue(), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=traces.csv"})
622 else: # format == "ndjson" (validated above)
623 # Newline-delimited JSON (streaming)
624 def generate():
625 """Yield newline-delimited JSON strings for each trace.
627 This nested generator is used to stream NDJSON responses.
629 Yields:
630 str: A JSON-encoded line (with trailing newline) for a trace.
631 """
632 for t in traces:
633 yield orjson.dumps(
634 {
635 "trace_id": t.trace_id,
636 "name": t.name,
637 "start_time": t.start_time.isoformat() if t.start_time else None,
638 "duration_ms": t.duration_ms,
639 "status": t.status,
640 "http_method": t.http_method,
641 "http_status_code": t.http_status_code,
642 "user_email": t.user_email,
643 }
644 ).decode() + "\n"
646 return StreamingResponse(generate(), media_type="application/x-ndjson", headers={"Content-Disposition": "attachment; filename=traces.ndjson"})
648 except (ValueError, Exception) as e:
649 raise HTTPException(status_code=400, detail=f"Export failed: {e}")
652@router.get("/analytics/query-performance")
653@require_permission("admin.system_config")
654async def get_query_performance(
655 hours: int = Query(24, ge=1, le=168, description="Time window in hours"),
656 db: Session = Depends(get_db),
657 _user=Depends(get_current_user_with_permissions),
658):
659 """Get query performance analytics.
661 Returns performance metrics about trace queries including:
662 - Average, min, max, p50, p95, p99 durations
663 - Query volume over time
664 - Error rate trends
666 Args:
667 hours: Time window in hours
668 db: Database session
670 Returns:
671 dict: Performance analytics
673 Examples:
674 >>> import asyncio
675 >>> import mcpgateway.routers.observability as obs
676 >>> from mcpgateway.config import settings
677 >>> class MockDialect:
678 ... name = "sqlite"
679 >>> class MockBind:
680 ... dialect = MockDialect()
681 >>> class EmptyDB:
682 ... def get_bind(self):
683 ... return MockBind()
684 ... def query(self, *a, **k):
685 ... return self
686 ... def filter(self, *a, **k):
687 ... return self
688 ... def all(self):
689 ... return []
690 >>> async def run_empty_stats():
691 ... return (await obs.get_query_performance(hours=1, db=EmptyDB(), _user={"email": settings.platform_admin_email, "db": None}))["total_traces"]
692 >>> asyncio.run(run_empty_stats())
693 0
695 >>> class SmallDB:
696 ... def get_bind(self):
697 ... return MockBind()
698 ... def query(self, *a, **k):
699 ... return self
700 ... def filter(self, *a, **k):
701 ... return self
702 ... def all(self):
703 ... return [(10,), (20,), (30,), (40,)]
704 >>> async def run_small_stats():
705 ... return await obs.get_query_performance(hours=1, db=SmallDB(), _user={"email": settings.platform_admin_email, "db": None})
706 >>> res = asyncio.run(run_small_stats())
707 >>> res["total_traces"]
708 4
710 """
712 # First-Party
714 ObservabilityService()
715 cutoff_time = datetime.now() - timedelta(hours=hours)
717 # Use SQL aggregation for PostgreSQL, Python fallback for SQLite
718 dialect_name = db.get_bind().dialect.name
719 if dialect_name == "postgresql":
720 return _get_query_performance_postgresql(db, cutoff_time, hours)
721 return _get_query_performance_python(db, cutoff_time, hours)
724def _get_query_performance_postgresql(db: Session, cutoff_time: datetime, hours: int) -> dict:
725 """Compute query performance using PostgreSQL percentile_cont.
727 Args:
728 db: Database session
729 cutoff_time: Start time for analysis
730 hours: Time window in hours
732 Returns:
733 dict: Performance analytics computed via SQL
734 """
735 stats_sql = text(
736 """
737 SELECT
738 COUNT(*) as total_traces,
739 percentile_cont(0.50) WITHIN GROUP (ORDER BY duration_ms) as p50,
740 percentile_cont(0.75) WITHIN GROUP (ORDER BY duration_ms) as p75,
741 percentile_cont(0.90) WITHIN GROUP (ORDER BY duration_ms) as p90,
742 percentile_cont(0.95) WITHIN GROUP (ORDER BY duration_ms) as p95,
743 percentile_cont(0.99) WITHIN GROUP (ORDER BY duration_ms) as p99,
744 AVG(duration_ms) as avg_duration,
745 MIN(duration_ms) as min_duration,
746 MAX(duration_ms) as max_duration
747 FROM observability_traces
748 WHERE start_time >= :cutoff_time AND duration_ms IS NOT NULL
749 """
750 )
752 result = db.execute(stats_sql, {"cutoff_time": cutoff_time}).fetchone()
754 if not result or result.total_traces == 0:
755 return {
756 "time_window_hours": hours,
757 "total_traces": 0,
758 "percentiles": {},
759 "avg_duration_ms": 0,
760 "min_duration_ms": 0,
761 "max_duration_ms": 0,
762 }
764 return {
765 "time_window_hours": hours,
766 "total_traces": result.total_traces,
767 "percentiles": {
768 "p50": round(float(result.p50), 2) if result.p50 else 0,
769 "p75": round(float(result.p75), 2) if result.p75 else 0,
770 "p90": round(float(result.p90), 2) if result.p90 else 0,
771 "p95": round(float(result.p95), 2) if result.p95 else 0,
772 "p99": round(float(result.p99), 2) if result.p99 else 0,
773 },
774 "avg_duration_ms": round(float(result.avg_duration), 2) if result.avg_duration else 0,
775 "min_duration_ms": round(float(result.min_duration), 2) if result.min_duration else 0,
776 "max_duration_ms": round(float(result.max_duration), 2) if result.max_duration else 0,
777 }
780def _get_query_performance_python(db: Session, cutoff_time: datetime, hours: int) -> dict:
781 """Compute query performance using Python (fallback for SQLite).
783 Args:
784 db: Database session
785 cutoff_time: Start time for analysis
786 hours: Time window in hours
788 Returns:
789 dict: Performance analytics computed in Python
790 """
791 # First-Party
792 from mcpgateway.db import ObservabilityTrace
794 # Get duration percentiles
795 traces_with_duration = db.query(ObservabilityTrace.duration_ms).filter(ObservabilityTrace.start_time >= cutoff_time, ObservabilityTrace.duration_ms.isnot(None)).all()
797 durations = sorted([t[0] for t in traces_with_duration if t[0] is not None])
799 if not durations:
800 return {
801 "time_window_hours": hours,
802 "total_traces": 0,
803 "percentiles": {},
804 "avg_duration_ms": 0,
805 "min_duration_ms": 0,
806 "max_duration_ms": 0,
807 }
809 def percentile(data, p):
810 """Compute percentile using linear interpolation matching PostgreSQL percentile_cont.
812 Args:
813 data: Sorted list of numeric values.
814 p: Percentile value between 0 and 1.
816 Returns:
817 Interpolated percentile value.
818 """
819 n = len(data) # data is guaranteed non-empty by caller guard above
820 k = (n - 1) * p
821 f = int(k)
822 c = k - f
823 if f + 1 < n:
824 return data[f] + (c * (data[f + 1] - data[f]))
825 return data[f]
827 return {
828 "time_window_hours": hours,
829 "total_traces": len(durations),
830 "percentiles": {
831 "p50": round(percentile(durations, 0.50), 2),
832 "p75": round(percentile(durations, 0.75), 2),
833 "p90": round(percentile(durations, 0.90), 2),
834 "p95": round(percentile(durations, 0.95), 2),
835 "p99": round(percentile(durations, 0.99), 2),
836 },
837 "avg_duration_ms": round(sum(durations) / len(durations), 2),
838 "min_duration_ms": round(durations[0], 2),
839 "max_duration_ms": round(durations[-1], 2),
840 }