Coverage for mcpgateway / routers / observability.py: 100%

145 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/routers/observability.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Observability API Router. 

8Provides REST endpoints for querying traces, spans, events, and metrics. 

9""" 

10 

11# Standard 

12from datetime import datetime, timedelta 

13from typing import List, Optional 

14 

15# Third-Party 

16from fastapi import APIRouter, Depends, HTTPException, Query 

17import orjson 

18from sqlalchemy import text 

19from sqlalchemy.orm import Session 

20 

21# First-Party 

22from mcpgateway.db import SessionLocal 

23from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission 

24from mcpgateway.schemas import ObservabilitySpanRead, ObservabilityTraceRead, ObservabilityTraceWithSpans 

25from mcpgateway.services.observability_service import ObservabilityService 

26 

27router = APIRouter(prefix="/observability", tags=["Observability"]) 

28 

29 

30def get_db(): 

31 """Database session dependency. 

32 

33 Commits the transaction on successful completion to avoid implicit rollbacks 

34 for read-only operations. Rolls back explicitly on exception. 

35 

36 Yields: 

37 Session: SQLAlchemy database session 

38 

39 Raises: 

40 Exception: Re-raises any exception after rolling back the transaction. 

41 """ 

42 db = SessionLocal() 

43 try: 

44 yield db 

45 db.commit() 

46 except Exception: 

47 try: 

48 db.rollback() 

49 except Exception: 

50 try: 

51 db.invalidate() 

52 except Exception: 

53 pass # nosec B110 - Best effort cleanup on connection failure 

54 raise 

55 finally: 

56 db.close() 

57 

58 

59@router.get("/traces", response_model=List[ObservabilityTraceRead]) 

60@require_permission("admin.system_config") 

61async def list_traces( 

62 start_time: Optional[datetime] = Query(None, description="Filter traces after this time"), 

63 end_time: Optional[datetime] = Query(None, description="Filter traces before this time"), 

64 min_duration_ms: Optional[float] = Query(None, ge=0, description="Minimum duration in milliseconds"), 

65 max_duration_ms: Optional[float] = Query(None, ge=0, description="Maximum duration in milliseconds"), 

66 status: Optional[str] = Query(None, description="Filter by status (ok, error)"), 

67 http_status_code: Optional[int] = Query(None, description="Filter by HTTP status code"), 

68 http_method: Optional[str] = Query(None, description="Filter by HTTP method (GET, POST, etc.)"), 

69 user_email: Optional[str] = Query(None, description="Filter by user email"), 

70 attribute_search: Optional[str] = Query(None, description="Free-text search within trace attributes"), 

71 limit: int = Query(100, ge=1, le=1000, description="Maximum results"), 

72 offset: int = Query(0, ge=0, description="Result offset"), 

73 db: Session = Depends(get_db), 

74 _user=Depends(get_current_user_with_permissions), 

75): 

76 """List traces with optional filtering. 

77 

78 Query traces with various filters including time range, duration, status, HTTP method, 

79 HTTP status code, user email, and attribute search. Results are paginated. 

80 

81 Note: For structured attribute filtering (key-value pairs with AND logic), 

82 use a JSON request body via POST endpoint or the Python SDK. 

83 

84 Args: 

85 start_time: Filter traces after this time 

86 end_time: Filter traces before this time 

87 min_duration_ms: Minimum duration in milliseconds 

88 max_duration_ms: Maximum duration in milliseconds 

89 status: Filter by status (ok, error) 

90 http_status_code: Filter by HTTP status code 

91 http_method: Filter by HTTP method (GET, POST, etc.) 

92 user_email: Filter by user email 

93 attribute_search: Free-text search across all trace attributes 

94 limit: Maximum results 

95 offset: Result offset 

96 db: Database session 

97 

98 Returns: 

99 List[ObservabilityTraceRead]: List of traces matching filters 

100 

101 Examples: 

102 >>> import asyncio 

103 >>> import mcpgateway.routers.observability as obs 

104 >>> from mcpgateway.config import settings 

105 >>> class FakeTrace: 

106 ... def __init__(self, trace_id='t1'): 

107 ... self.trace_id = trace_id 

108 ... self.name = 'n' 

109 ... self.start_time = None 

110 ... self.end_time = None 

111 ... self.duration_ms = 100 

112 ... self.status = 'ok' 

113 ... self.http_method = 'GET' 

114 ... self.http_url = '/' 

115 ... self.http_status_code = 200 

116 ... self.user_email = 'u' 

117 >>> class FakeService: 

118 ... def query_traces(self, **kwargs): 

119 ... return [FakeTrace('t1')] 

120 >>> obs.ObservabilityService = FakeService 

121 >>> async def run_list_traces(): 

122 ... traces = await obs.list_traces(db=None, _user={"email": settings.platform_admin_email, "db": None}) 

123 ... return traces[0].trace_id 

124 >>> asyncio.run(run_list_traces()) 

125 't1' 

126 """ 

127 service = ObservabilityService() 

128 traces = service.query_traces( 

129 db=db, 

130 start_time=start_time, 

131 end_time=end_time, 

132 min_duration_ms=min_duration_ms, 

133 max_duration_ms=max_duration_ms, 

134 status=status, 

135 http_status_code=http_status_code, 

136 http_method=http_method, 

137 user_email=user_email, 

138 attribute_search=attribute_search, 

139 limit=limit, 

140 offset=offset, 

141 ) 

142 return traces 

143 

144 

145@router.post("/traces/query", response_model=List[ObservabilityTraceRead]) 

146@require_permission("admin.system_config") 

147async def query_traces_advanced( 

148 # Third-Party 

149 request_body: dict, 

150 db: Session = Depends(get_db), 

151 _user=Depends(get_current_user_with_permissions), 

152): 

153 """Advanced trace querying with attribute filtering. 

154 

155 POST endpoint that accepts a JSON body with complex filtering criteria, 

156 including structured attribute filters with AND logic. 

157 

158 Request Body: 

159 { 

160 "start_time": "2025-01-01T00:00:00Z", # Optional datetime 

161 "end_time": "2025-01-02T00:00:00Z", # Optional datetime 

162 "min_duration_ms": 100.0, # Optional float 

163 "max_duration_ms": 5000.0, # Optional float 

164 "status": "error", # Optional string 

165 "http_status_code": 500, # Optional int 

166 "http_method": "POST", # Optional string 

167 "user_email": "user@example.com", # Optional string 

168 "attribute_filters": { # Optional dict (AND logic) 

169 "http.route": "/api/tools", 

170 "service.name": "mcp-gateway" 

171 }, 

172 "attribute_search": "error", # Optional string (OR logic) 

173 "limit": 100, # Optional int 

174 "offset": 0 # Optional int 

175 } 

176 

177 Args: 

178 request_body: JSON request body with filter criteria 

179 db: Database session 

180 

181 Returns: 

182 List[ObservabilityTraceRead]: List of traces matching filters 

183 

184 Raises: 

185 HTTPException: 400 error if request body is invalid 

186 

187 Examples: 

188 >>> import asyncio 

189 >>> from fastapi import HTTPException 

190 >>> from mcpgateway.config import settings 

191 >>> async def run_invalid_query(): 

192 ... try: 

193 ... await query_traces_advanced({"start_time": "not-a-date"}, db=None, _user={"email": settings.platform_admin_email, "db": None}) 

194 ... except HTTPException as e: 

195 ... return (e.status_code, "Invalid request body" in str(e.detail)) 

196 >>> asyncio.run(run_invalid_query()) 

197 (400, True) 

198 

199 >>> import mcpgateway.routers.observability as obs 

200 >>> class FakeTrace: 

201 ... def __init__(self): 

202 ... self.trace_id = 'tx' 

203 ... self.name = 'n' 

204 

205 >>> class FakeService2: 

206 ... def query_traces(self, **kwargs): 

207 ... return [FakeTrace()] 

208 >>> obs.ObservabilityService = FakeService2 

209 >>> async def run_query_traces(): 

210 ... traces = await obs.query_traces_advanced({}, db=None, _user={"email": settings.platform_admin_email, "db": None}) 

211 ... return traces[0].trace_id 

212 >>> asyncio.run(run_query_traces()) 

213 'tx' 

214 """ 

215 # Third-Party 

216 from pydantic import ValidationError 

217 

218 try: 

219 # Extract filters from request body 

220 service = ObservabilityService() 

221 

222 # Parse datetime strings if provided 

223 start_time = request_body.get("start_time") 

224 if start_time and isinstance(start_time, str): 

225 start_time = datetime.fromisoformat(start_time.replace("Z", "+00:00")) 

226 

227 end_time = request_body.get("end_time") 

228 if end_time and isinstance(end_time, str): 

229 end_time = datetime.fromisoformat(end_time.replace("Z", "+00:00")) 

230 

231 traces = service.query_traces( 

232 db=db, 

233 start_time=start_time, 

234 end_time=end_time, 

235 min_duration_ms=request_body.get("min_duration_ms"), 

236 max_duration_ms=request_body.get("max_duration_ms"), 

237 status=request_body.get("status"), 

238 status_in=request_body.get("status_in"), 

239 status_not_in=request_body.get("status_not_in"), 

240 http_status_code=request_body.get("http_status_code"), 

241 http_status_code_in=request_body.get("http_status_code_in"), 

242 http_method=request_body.get("http_method"), 

243 http_method_in=request_body.get("http_method_in"), 

244 user_email=request_body.get("user_email"), 

245 user_email_in=request_body.get("user_email_in"), 

246 attribute_filters=request_body.get("attribute_filters"), 

247 attribute_filters_or=request_body.get("attribute_filters_or"), 

248 attribute_search=request_body.get("attribute_search"), 

249 name_contains=request_body.get("name_contains"), 

250 order_by=request_body.get("order_by", "start_time_desc"), 

251 limit=request_body.get("limit", 100), 

252 offset=request_body.get("offset", 0), 

253 ) 

254 return traces 

255 except (ValidationError, ValueError) as e: 

256 raise HTTPException(status_code=400, detail=f"Invalid request body: {e}") 

257 

258 

259@router.get("/traces/{trace_id}", response_model=ObservabilityTraceWithSpans) 

260@require_permission("admin.system_config") 

261async def get_trace(trace_id: str, db: Session = Depends(get_db), _user=Depends(get_current_user_with_permissions)): 

262 """Get a trace by ID with all its spans and events. 

263 

264 Returns a complete trace with all nested spans and their events, 

265 providing a full view of the request flow. 

266 

267 Args: 

268 trace_id: UUID of the trace to retrieve 

269 db: Database session 

270 

271 Returns: 

272 ObservabilityTraceWithSpans: Complete trace with all spans and events 

273 

274 Raises: 

275 HTTPException: 404 if trace not found 

276 

277 Examples: 

278 >>> import asyncio 

279 >>> import mcpgateway.routers.observability as obs 

280 >>> from mcpgateway.config import settings 

281 >>> class FakeService: 

282 ... def get_trace_with_spans(self, db, trace_id): 

283 ... return None 

284 >>> obs.ObservabilityService = FakeService 

285 >>> async def run_missing_trace(): 

286 ... try: 

287 ... await obs.get_trace("missing", db=None, _user={"email": settings.platform_admin_email, "db": None}) 

288 ... except obs.HTTPException as e: 

289 ... return e.status_code 

290 >>> asyncio.run(run_missing_trace()) 

291 404 

292 >>> class FakeService2: 

293 ... def get_trace_with_spans(self, db, trace_id): 

294 ... return {'trace_id': trace_id} 

295 >>> obs.ObservabilityService = FakeService2 

296 >>> async def run_found_trace(): 

297 ... trace = await obs.get_trace("found", db=None, _user={"email": settings.platform_admin_email, "db": None}) 

298 ... return trace["trace_id"] 

299 >>> asyncio.run(run_found_trace()) 

300 'found' 

301 """ 

302 service = ObservabilityService() 

303 trace = service.get_trace_with_spans(db, trace_id) 

304 if not trace: 

305 raise HTTPException(status_code=404, detail="Trace not found") 

306 return trace 

307 

308 

309@router.get("/spans", response_model=List[ObservabilitySpanRead]) 

310@require_permission("admin.system_config") 

311async def list_spans( 

312 trace_id: Optional[str] = Query(None, description="Filter by trace ID"), 

313 resource_type: Optional[str] = Query(None, description="Filter by resource type"), 

314 resource_name: Optional[str] = Query(None, description="Filter by resource name"), 

315 start_time: Optional[datetime] = Query(None, description="Filter spans after this time"), 

316 end_time: Optional[datetime] = Query(None, description="Filter spans before this time"), 

317 limit: int = Query(100, ge=1, le=1000, description="Maximum results"), 

318 offset: int = Query(0, ge=0, description="Result offset"), 

319 db: Session = Depends(get_db), 

320 _user=Depends(get_current_user_with_permissions), 

321): 

322 """List spans with optional filtering. 

323 

324 Query spans by trace ID, resource type, resource name, or time range. 

325 Useful for analyzing specific operations or resource performance. 

326 

327 Args: 

328 trace_id: Filter by trace ID 

329 resource_type: Filter by resource type 

330 resource_name: Filter by resource name 

331 start_time: Filter spans after this time 

332 end_time: Filter spans before this time 

333 limit: Maximum results 

334 offset: Result offset 

335 db: Database session 

336 

337 Returns: 

338 List[ObservabilitySpanRead]: List of spans matching filters 

339 

340 Examples: 

341 >>> import asyncio 

342 >>> import mcpgateway.routers.observability as obs 

343 >>> from mcpgateway.config import settings 

344 >>> class FakeSpan: 

345 ... def __init__(self): 

346 ... self.span_id = 's1' 

347 ... self.trace_id = 't1' 

348 ... self.name = 'op' 

349 >>> class FakeService: 

350 ... def query_spans(self, **kwargs): 

351 ... return [FakeSpan()] 

352 >>> obs.ObservabilityService = FakeService 

353 >>> async def run_list_spans(): 

354 ... spans = await obs.list_spans(db=None, _user={"email": settings.platform_admin_email, "db": None}) 

355 ... return spans[0].span_id 

356 >>> asyncio.run(run_list_spans()) 

357 's1' 

358 """ 

359 service = ObservabilityService() 

360 spans = service.query_spans( 

361 db=db, 

362 trace_id=trace_id, 

363 resource_type=resource_type, 

364 resource_name=resource_name, 

365 start_time=start_time, 

366 end_time=end_time, 

367 limit=limit, 

368 offset=offset, 

369 ) 

370 return spans 

371 

372 

373@router.delete("/traces/cleanup") 

374@require_permission("admin.system_config") 

375async def cleanup_old_traces( 

376 days: int = Query(7, ge=1, description="Delete traces older than this many days"), 

377 db: Session = Depends(get_db), 

378 _user=Depends(get_current_user_with_permissions), 

379): 

380 """Delete traces older than a specified number of days. 

381 

382 Cleans up old trace data to manage storage. Cascading deletes will 

383 also remove associated spans, events, and metrics. 

384 

385 Args: 

386 days: Delete traces older than this many days 

387 db: Database session 

388 

389 Returns: 

390 dict: Number of deleted traces and cutoff time 

391 

392 Examples: 

393 >>> import asyncio 

394 >>> import mcpgateway.routers.observability as obs 

395 >>> from mcpgateway.config import settings 

396 >>> class FakeService: 

397 ... def delete_old_traces(self, db, cutoff): 

398 ... return 5 

399 >>> obs.ObservabilityService = FakeService 

400 >>> async def run_cleanup(): 

401 ... res = await obs.cleanup_old_traces(days=7, db=None, _user={"email": settings.platform_admin_email, "db": None}) 

402 ... return res["deleted"] 

403 >>> asyncio.run(run_cleanup()) 

404 5 

405 """ 

406 service = ObservabilityService() 

407 cutoff_time = datetime.now() - timedelta(days=days) 

408 deleted = service.delete_old_traces(db, cutoff_time) 

409 return {"deleted": deleted, "cutoff_time": cutoff_time} 

410 

411 

412@router.get("/stats") 

413@require_permission("admin.system_config") 

414async def get_stats( 

415 hours: int = Query(24, ge=1, le=168, description="Time window in hours"), 

416 db: Session = Depends(get_db), 

417 _user=Depends(get_current_user_with_permissions), 

418): 

419 """Get observability statistics. 

420 

421 Returns summary statistics including: 

422 - Total traces in time window 

423 - Success/error counts 

424 - Average response time 

425 - Top slowest endpoints 

426 

427 Args: 

428 hours: Time window in hours 

429 db: Database session 

430 

431 Returns: 

432 dict: Statistics including counts, error rate, and slowest endpoints 

433 """ 

434 # Third-Party 

435 from sqlalchemy import func 

436 

437 # First-Party 

438 from mcpgateway.db import ObservabilityTrace 

439 

440 ObservabilityService() 

441 cutoff_time = datetime.now() - timedelta(hours=hours) 

442 

443 # Get basic counts 

444 total_traces = db.query(func.count(ObservabilityTrace.trace_id)).filter(ObservabilityTrace.start_time >= cutoff_time).scalar() 

445 

446 success_count = db.query(func.count(ObservabilityTrace.trace_id)).filter(ObservabilityTrace.start_time >= cutoff_time, ObservabilityTrace.status == "ok").scalar() 

447 

448 error_count = db.query(func.count(ObservabilityTrace.trace_id)).filter(ObservabilityTrace.start_time >= cutoff_time, ObservabilityTrace.status == "error").scalar() 

449 

450 avg_duration = db.query(func.avg(ObservabilityTrace.duration_ms)).filter(ObservabilityTrace.start_time >= cutoff_time, ObservabilityTrace.duration_ms.isnot(None)).scalar() or 0 

451 

452 # Get slowest endpoints 

453 slowest = ( 

454 db.query(ObservabilityTrace.name, func.avg(ObservabilityTrace.duration_ms).label("avg_duration"), func.count(ObservabilityTrace.trace_id).label("count")) 

455 .filter(ObservabilityTrace.start_time >= cutoff_time, ObservabilityTrace.duration_ms.isnot(None)) 

456 .group_by(ObservabilityTrace.name) 

457 .order_by(func.avg(ObservabilityTrace.duration_ms).desc()) 

458 .limit(10) 

459 .all() 

460 ) 

461 

462 return { 

463 "time_window_hours": hours, 

464 "total_traces": total_traces, 

465 "success_count": success_count, 

466 "error_count": error_count, 

467 "error_rate": (error_count / total_traces * 100) if total_traces > 0 else 0, 

468 "avg_duration_ms": round(avg_duration, 2), 

469 "slowest_endpoints": [{"name": row[0], "avg_duration_ms": round(row[1], 2), "count": row[2]} for row in slowest], 

470 } 

471 

472 

473@router.post("/traces/export") 

474@require_permission("admin.system_config") 

475async def export_traces( 

476 request_body: dict, 

477 format: str = Query("json", description="Export format (json, csv, ndjson)"), 

478 db: Session = Depends(get_db), 

479 _user=Depends(get_current_user_with_permissions), 

480): 

481 """Export traces in various formats. 

482 

483 POST endpoint that accepts filter criteria (same as /traces/query) and exports 

484 matching traces in the specified format. 

485 

486 Supported formats: 

487 - json: Standard JSON array 

488 - csv: Comma-separated values 

489 - ndjson: Newline-delimited JSON (streaming) 

490 

491 Args: 

492 request_body: JSON request body with filter criteria (same as /traces/query) 

493 format: Export format (json, csv, ndjson) 

494 db: Database session 

495 

496 Returns: 

497 StreamingResponse or JSONResponse with exported data 

498 

499 Raises: 

500 HTTPException: 400 error if format is invalid or export fails 

501 

502 Examples: 

503 >>> import asyncio 

504 >>> from datetime import datetime 

505 >>> from fastapi import HTTPException 

506 >>> import mcpgateway.routers.observability as obs 

507 >>> from mcpgateway.config import settings 

508 >>> async def run_invalid_export(): 

509 ... try: 

510 ... await export_traces({}, format="xml", db=None, _user={"email": settings.platform_admin_email, "db": None}) 

511 ... except HTTPException as e: 

512 ... return (e.status_code, "format must be one of" in str(e.detail)) 

513 >>> asyncio.run(run_invalid_export()) 

514 (400, True) 

515 >>> class FakeTrace: 

516 ... def __init__(self): 

517 ... self.trace_id = 'tx' 

518 ... self.name = 'name' 

519 ... self.start_time = datetime(2025,1,1) 

520 ... self.end_time = None 

521 ... self.duration_ms = 100 

522 ... self.status = 'ok' 

523 ... self.http_method = 'GET' 

524 ... self.http_url = '/' 

525 ... self.http_status_code = 200 

526 ... self.user_email = 'u' 

527 >>> class FakeService: 

528 ... def query_traces(self, **kwargs): 

529 ... return [FakeTrace()] 

530 >>> obs.ObservabilityService = FakeService 

531 >>> async def run_json_export(): 

532 ... out = await obs.export_traces({}, format="json", db=None, _user={"email": settings.platform_admin_email, "db": None}) 

533 ... return out[0]["trace_id"] 

534 >>> asyncio.run(run_json_export()) 

535 'tx' 

536 >>> async def run_csv_export(): 

537 ... resp = await obs.export_traces({}, format="csv", db=None, _user={"email": settings.platform_admin_email, "db": None}) 

538 ... return hasattr(resp, "media_type") and "csv" in resp.media_type 

539 >>> asyncio.run(run_csv_export()) 

540 True 

541 >>> async def run_ndjson_export(): 

542 ... resp2 = await obs.export_traces({}, format="ndjson", db=None, _user={"email": settings.platform_admin_email, "db": None}) 

543 ... return type(resp2).__name__ 

544 >>> asyncio.run(run_ndjson_export()) 

545 'StreamingResponse' 

546 """ 

547 # Standard 

548 import csv 

549 import io 

550 

551 # Third-Party 

552 from starlette.responses import Response, StreamingResponse 

553 

554 # Validate format 

555 if format not in ["json", "csv", "ndjson"]: 

556 raise HTTPException(status_code=400, detail="format must be one of: json, csv, ndjson") 

557 

558 try: 

559 service = ObservabilityService() 

560 

561 # Parse datetime strings 

562 start_time = request_body.get("start_time") 

563 if start_time and isinstance(start_time, str): 

564 start_time = datetime.fromisoformat(start_time.replace("Z", "+00:00")) 

565 

566 end_time = request_body.get("end_time") 

567 if end_time and isinstance(end_time, str): 

568 end_time = datetime.fromisoformat(end_time.replace("Z", "+00:00")) 

569 

570 # Query traces 

571 traces = service.query_traces( 

572 db=db, 

573 start_time=start_time, 

574 end_time=end_time, 

575 min_duration_ms=request_body.get("min_duration_ms"), 

576 max_duration_ms=request_body.get("max_duration_ms"), 

577 status=request_body.get("status"), 

578 status_in=request_body.get("status_in"), 

579 http_status_code=request_body.get("http_status_code"), 

580 http_method=request_body.get("http_method"), 

581 user_email=request_body.get("user_email"), 

582 order_by=request_body.get("order_by", "start_time_desc"), 

583 limit=request_body.get("limit", 1000), # Higher limit for export 

584 offset=request_body.get("offset", 0), 

585 ) 

586 

587 if format == "json": 

588 # Standard JSON response 

589 return [ 

590 { 

591 "trace_id": t.trace_id, 

592 "name": t.name, 

593 "start_time": t.start_time.isoformat() if t.start_time else None, 

594 "end_time": t.end_time.isoformat() if t.end_time else None, 

595 "duration_ms": t.duration_ms, 

596 "status": t.status, 

597 "http_method": t.http_method, 

598 "http_url": t.http_url, 

599 "http_status_code": t.http_status_code, 

600 "user_email": t.user_email, 

601 } 

602 for t in traces 

603 ] 

604 

605 elif format == "csv": 

606 # CSV export 

607 output = io.StringIO() 

608 writer = csv.writer(output) 

609 

610 # Write header 

611 writer.writerow(["trace_id", "name", "start_time", "duration_ms", "status", "http_method", "http_status_code", "user_email"]) 

612 

613 # Write data 

614 for t in traces: 

615 writer.writerow( 

616 [t.trace_id, t.name, t.start_time.isoformat() if t.start_time else "", t.duration_ms or "", t.status, t.http_method or "", t.http_status_code or "", t.user_email or ""] 

617 ) 

618 

619 output.seek(0) 

620 return Response(content=output.getvalue(), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=traces.csv"}) 

621 

622 else: # format == "ndjson" (validated above) 

623 # Newline-delimited JSON (streaming) 

624 def generate(): 

625 """Yield newline-delimited JSON strings for each trace. 

626 

627 This nested generator is used to stream NDJSON responses. 

628 

629 Yields: 

630 str: A JSON-encoded line (with trailing newline) for a trace. 

631 """ 

632 for t in traces: 

633 yield orjson.dumps( 

634 { 

635 "trace_id": t.trace_id, 

636 "name": t.name, 

637 "start_time": t.start_time.isoformat() if t.start_time else None, 

638 "duration_ms": t.duration_ms, 

639 "status": t.status, 

640 "http_method": t.http_method, 

641 "http_status_code": t.http_status_code, 

642 "user_email": t.user_email, 

643 } 

644 ).decode() + "\n" 

645 

646 return StreamingResponse(generate(), media_type="application/x-ndjson", headers={"Content-Disposition": "attachment; filename=traces.ndjson"}) 

647 

648 except (ValueError, Exception) as e: 

649 raise HTTPException(status_code=400, detail=f"Export failed: {e}") 

650 

651 

652@router.get("/analytics/query-performance") 

653@require_permission("admin.system_config") 

654async def get_query_performance( 

655 hours: int = Query(24, ge=1, le=168, description="Time window in hours"), 

656 db: Session = Depends(get_db), 

657 _user=Depends(get_current_user_with_permissions), 

658): 

659 """Get query performance analytics. 

660 

661 Returns performance metrics about trace queries including: 

662 - Average, min, max, p50, p95, p99 durations 

663 - Query volume over time 

664 - Error rate trends 

665 

666 Args: 

667 hours: Time window in hours 

668 db: Database session 

669 

670 Returns: 

671 dict: Performance analytics 

672 

673 Examples: 

674 >>> import asyncio 

675 >>> import mcpgateway.routers.observability as obs 

676 >>> from mcpgateway.config import settings 

677 >>> class MockDialect: 

678 ... name = "sqlite" 

679 >>> class MockBind: 

680 ... dialect = MockDialect() 

681 >>> class EmptyDB: 

682 ... def get_bind(self): 

683 ... return MockBind() 

684 ... def query(self, *a, **k): 

685 ... return self 

686 ... def filter(self, *a, **k): 

687 ... return self 

688 ... def all(self): 

689 ... return [] 

690 >>> async def run_empty_stats(): 

691 ... return (await obs.get_query_performance(hours=1, db=EmptyDB(), _user={"email": settings.platform_admin_email, "db": None}))["total_traces"] 

692 >>> asyncio.run(run_empty_stats()) 

693 0 

694 

695 >>> class SmallDB: 

696 ... def get_bind(self): 

697 ... return MockBind() 

698 ... def query(self, *a, **k): 

699 ... return self 

700 ... def filter(self, *a, **k): 

701 ... return self 

702 ... def all(self): 

703 ... return [(10,), (20,), (30,), (40,)] 

704 >>> async def run_small_stats(): 

705 ... return await obs.get_query_performance(hours=1, db=SmallDB(), _user={"email": settings.platform_admin_email, "db": None}) 

706 >>> res = asyncio.run(run_small_stats()) 

707 >>> res["total_traces"] 

708 4 

709 

710 """ 

711 

712 # First-Party 

713 

714 ObservabilityService() 

715 cutoff_time = datetime.now() - timedelta(hours=hours) 

716 

717 # Use SQL aggregation for PostgreSQL, Python fallback for SQLite 

718 dialect_name = db.get_bind().dialect.name 

719 if dialect_name == "postgresql": 

720 return _get_query_performance_postgresql(db, cutoff_time, hours) 

721 return _get_query_performance_python(db, cutoff_time, hours) 

722 

723 

724def _get_query_performance_postgresql(db: Session, cutoff_time: datetime, hours: int) -> dict: 

725 """Compute query performance using PostgreSQL percentile_cont. 

726 

727 Args: 

728 db: Database session 

729 cutoff_time: Start time for analysis 

730 hours: Time window in hours 

731 

732 Returns: 

733 dict: Performance analytics computed via SQL 

734 """ 

735 stats_sql = text( 

736 """ 

737 SELECT 

738 COUNT(*) as total_traces, 

739 percentile_cont(0.50) WITHIN GROUP (ORDER BY duration_ms) as p50, 

740 percentile_cont(0.75) WITHIN GROUP (ORDER BY duration_ms) as p75, 

741 percentile_cont(0.90) WITHIN GROUP (ORDER BY duration_ms) as p90, 

742 percentile_cont(0.95) WITHIN GROUP (ORDER BY duration_ms) as p95, 

743 percentile_cont(0.99) WITHIN GROUP (ORDER BY duration_ms) as p99, 

744 AVG(duration_ms) as avg_duration, 

745 MIN(duration_ms) as min_duration, 

746 MAX(duration_ms) as max_duration 

747 FROM observability_traces 

748 WHERE start_time >= :cutoff_time AND duration_ms IS NOT NULL 

749 """ 

750 ) 

751 

752 result = db.execute(stats_sql, {"cutoff_time": cutoff_time}).fetchone() 

753 

754 if not result or result.total_traces == 0: 

755 return { 

756 "time_window_hours": hours, 

757 "total_traces": 0, 

758 "percentiles": {}, 

759 "avg_duration_ms": 0, 

760 "min_duration_ms": 0, 

761 "max_duration_ms": 0, 

762 } 

763 

764 return { 

765 "time_window_hours": hours, 

766 "total_traces": result.total_traces, 

767 "percentiles": { 

768 "p50": round(float(result.p50), 2) if result.p50 else 0, 

769 "p75": round(float(result.p75), 2) if result.p75 else 0, 

770 "p90": round(float(result.p90), 2) if result.p90 else 0, 

771 "p95": round(float(result.p95), 2) if result.p95 else 0, 

772 "p99": round(float(result.p99), 2) if result.p99 else 0, 

773 }, 

774 "avg_duration_ms": round(float(result.avg_duration), 2) if result.avg_duration else 0, 

775 "min_duration_ms": round(float(result.min_duration), 2) if result.min_duration else 0, 

776 "max_duration_ms": round(float(result.max_duration), 2) if result.max_duration else 0, 

777 } 

778 

779 

780def _get_query_performance_python(db: Session, cutoff_time: datetime, hours: int) -> dict: 

781 """Compute query performance using Python (fallback for SQLite). 

782 

783 Args: 

784 db: Database session 

785 cutoff_time: Start time for analysis 

786 hours: Time window in hours 

787 

788 Returns: 

789 dict: Performance analytics computed in Python 

790 """ 

791 # First-Party 

792 from mcpgateway.db import ObservabilityTrace 

793 

794 # Get duration percentiles 

795 traces_with_duration = db.query(ObservabilityTrace.duration_ms).filter(ObservabilityTrace.start_time >= cutoff_time, ObservabilityTrace.duration_ms.isnot(None)).all() 

796 

797 durations = sorted([t[0] for t in traces_with_duration if t[0] is not None]) 

798 

799 if not durations: 

800 return { 

801 "time_window_hours": hours, 

802 "total_traces": 0, 

803 "percentiles": {}, 

804 "avg_duration_ms": 0, 

805 "min_duration_ms": 0, 

806 "max_duration_ms": 0, 

807 } 

808 

809 def percentile(data, p): 

810 """Compute percentile using linear interpolation matching PostgreSQL percentile_cont. 

811 

812 Args: 

813 data: Sorted list of numeric values. 

814 p: Percentile value between 0 and 1. 

815 

816 Returns: 

817 Interpolated percentile value. 

818 """ 

819 n = len(data) # data is guaranteed non-empty by caller guard above 

820 k = (n - 1) * p 

821 f = int(k) 

822 c = k - f 

823 if f + 1 < n: 

824 return data[f] + (c * (data[f + 1] - data[f])) 

825 return data[f] 

826 

827 return { 

828 "time_window_hours": hours, 

829 "total_traces": len(durations), 

830 "percentiles": { 

831 "p50": round(percentile(durations, 0.50), 2), 

832 "p75": round(percentile(durations, 0.75), 2), 

833 "p90": round(percentile(durations, 0.90), 2), 

834 "p95": round(percentile(durations, 0.95), 2), 

835 "p99": round(percentile(durations, 0.99), 2), 

836 }, 

837 "avg_duration_ms": round(sum(durations) / len(durations), 2), 

838 "min_duration_ms": round(durations[0], 2), 

839 "max_duration_ms": round(durations[-1], 2), 

840 }