Coverage for mcpgateway / services / metrics_buffer_service.py: 92%

328 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Buffered metrics service for batching metric writes to the database. 

3 

4This service accumulates metrics in memory and flushes them to the database 

5periodically, reducing DB write pressure under high load. 

6 

7Copyright 2025 

8SPDX-License-Identifier: Apache-2.0 

9""" 

10 

11# Standard 

12import asyncio 

13from collections import deque 

14from dataclasses import dataclass 

15from datetime import datetime, timezone 

16import logging 

17import threading 

18import time 

19from typing import Deque, Optional 

20 

21# First-Party 

22from mcpgateway.config import settings 

23from mcpgateway.db import A2AAgentMetric, fresh_db_session, PromptMetric, ResourceMetric, ServerMetric, ToolMetric 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28@dataclass 

29class BufferedToolMetric: 

30 """Buffered tool metric entry.""" 

31 

32 tool_id: str 

33 timestamp: datetime 

34 response_time: float 

35 is_success: bool 

36 error_message: Optional[str] = None 

37 

38 

39@dataclass 

40class BufferedResourceMetric: 

41 """Buffered resource metric entry.""" 

42 

43 resource_id: str 

44 timestamp: datetime 

45 response_time: float 

46 is_success: bool 

47 error_message: Optional[str] = None 

48 

49 

50@dataclass 

51class BufferedPromptMetric: 

52 """Buffered prompt metric entry.""" 

53 

54 prompt_id: str 

55 timestamp: datetime 

56 response_time: float 

57 is_success: bool 

58 error_message: Optional[str] = None 

59 

60 

61@dataclass 

62class BufferedServerMetric: 

63 """Buffered server metric entry.""" 

64 

65 server_id: str 

66 timestamp: datetime 

67 response_time: float 

68 is_success: bool 

69 error_message: Optional[str] = None 

70 

71 

72@dataclass 

73class BufferedA2AAgentMetric: 

74 """Buffered A2A agent metric entry.""" 

75 

76 a2a_agent_id: str 

77 timestamp: datetime 

78 response_time: float 

79 is_success: bool 

80 interaction_type: str = "invoke" 

81 error_message: Optional[str] = None 

82 

83 

84class MetricsBufferService: 

85 """Service for buffering and batching metrics writes to the database. 

86 

87 This service provides: 

88 - Thread-safe buffering of tool, resource, prompt, server, and A2A agent metrics 

89 - Periodic flushing to database (configurable interval) 

90 - Graceful shutdown with final flush 

91 

92 Configuration (via environment variables): 

93 - METRICS_BUFFER_ENABLED: Enable buffered metrics (default: True) 

94 - METRICS_BUFFER_FLUSH_INTERVAL: Seconds between flushes (default: 60) 

95 - METRICS_BUFFER_MAX_SIZE: Max entries before forced flush (default: 1000) 

96 """ 

97 

98 def __init__( 

99 self, 

100 flush_interval: Optional[int] = None, 

101 max_buffer_size: Optional[int] = None, 

102 enabled: Optional[bool] = None, 

103 ): 

104 """Initialize the metrics buffer service. 

105 

106 Args: 

107 flush_interval: Seconds between automatic flushes (default: from settings or 60) 

108 max_buffer_size: Maximum buffer entries before forced flush (default: from settings or 1000) 

109 enabled: Whether buffering is enabled (default: from settings or True) 

110 """ 

111 self.flush_interval = flush_interval or getattr(settings, "metrics_buffer_flush_interval", 60) 

112 self.max_buffer_size = max_buffer_size or getattr(settings, "metrics_buffer_max_size", 1000) 

113 self.enabled = enabled if enabled is not None else getattr(settings, "metrics_buffer_enabled", True) 

114 self.recording_enabled = getattr(settings, "db_metrics_recording_enabled", True) 

115 

116 # Thread-safe buffers using deque with locks 

117 self._tool_metrics: Deque[BufferedToolMetric] = deque() 

118 self._resource_metrics: Deque[BufferedResourceMetric] = deque() 

119 self._prompt_metrics: Deque[BufferedPromptMetric] = deque() 

120 self._server_metrics: Deque[BufferedServerMetric] = deque() 

121 self._a2a_agent_metrics: Deque[BufferedA2AAgentMetric] = deque() 

122 self._lock = threading.Lock() 

123 

124 # Background flush task 

125 self._flush_task: Optional[asyncio.Task] = None 

126 self._shutdown_event = asyncio.Event() 

127 

128 # Stats for monitoring 

129 self._total_buffered = 0 

130 self._total_flushed = 0 

131 self._flush_count = 0 

132 

133 logger.info( 

134 f"MetricsBufferService initialized: recording_enabled={self.recording_enabled}, " 

135 f"buffer_enabled={self.enabled}, flush_interval={self.flush_interval}s, max_buffer_size={self.max_buffer_size}" 

136 ) 

137 

138 async def start(self) -> None: 

139 """Start the background flush task.""" 

140 if not self.recording_enabled: 

141 logger.info("MetricsBufferService: recording disabled, skipping flush loop") 

142 return 

143 if not self.enabled: 

144 logger.info("MetricsBufferService disabled, skipping start") 

145 return 

146 

147 current_loop = asyncio.get_running_loop() 

148 if not self._flush_task_is_active_for_loop(current_loop): 

149 self._shutdown_event.clear() 

150 self._flush_task = asyncio.create_task(self._flush_loop()) 

151 logger.info("MetricsBufferService flush task started") 

152 

153 def _flush_task_is_active_for_loop(self, loop: asyncio.AbstractEventLoop) -> bool: 

154 """Return whether the current flush task is usable for the active loop. 

155 

156 Args: 

157 loop: The currently running event loop for this worker process. 

158 

159 Returns: 

160 True when the cached flush task belongs to the current live loop. 

161 """ 

162 task = self._flush_task 

163 if task is None or task.done() or task.cancelled(): 

164 return False 

165 

166 try: 

167 task_loop = task.get_loop() 

168 except (AttributeError, RuntimeError): 

169 return False 

170 

171 return task_loop is loop and task_loop.is_running() 

172 

173 def _ensure_flush_task_started(self) -> None: 

174 """Best-effort lazy-start for the background flush task. 

175 

176 In preloaded multi-worker deployments, the singleton may exist in a 

177 worker before the worker's startup hook has started the flush loop. A 

178 first buffered metric should recover from that state instead of sitting 

179 in memory forever. 

180 """ 

181 if not self.recording_enabled or not self.enabled: 

182 return 

183 

184 try: 

185 loop = asyncio.get_running_loop() 

186 except RuntimeError: 

187 return 

188 

189 if self._flush_task_is_active_for_loop(loop): 

190 return 

191 

192 if self._shutdown_event.is_set(): 

193 self._shutdown_event = asyncio.Event() 

194 

195 self._flush_task = loop.create_task(self._flush_loop()) 

196 logger.info("MetricsBufferService flush task started lazily") 

197 

198 async def shutdown(self) -> None: 

199 """Shutdown service with final flush.""" 

200 logger.info("MetricsBufferService shutting down...") 

201 

202 # Signal shutdown 

203 self._shutdown_event.set() 

204 

205 # Cancel the flush task 

206 if self._flush_task: 

207 self._flush_task.cancel() 

208 try: 

209 await self._flush_task 

210 except asyncio.CancelledError: 

211 pass 

212 

213 # Final flush to persist any remaining metrics 

214 await self._flush_all() 

215 

216 logger.info(f"MetricsBufferService shutdown complete: " f"total_buffered={self._total_buffered}, total_flushed={self._total_flushed}, " f"flush_count={self._flush_count}") 

217 

218 def record_tool_metric( 

219 self, 

220 tool_id: str, 

221 start_time: float, 

222 success: bool, 

223 error_message: Optional[str] = None, 

224 ) -> None: 

225 """Buffer a tool metric for later flush. 

226 

227 Args: 

228 tool_id: The UUID string of the tool. 

229 start_time: The monotonic start time of the invocation. 

230 success: True if the invocation succeeded. 

231 error_message: Error message if failed. 

232 """ 

233 if not self.recording_enabled: 

234 return # Execution metrics recording disabled 

235 if not self.enabled: 

236 # Fall back to immediate write 

237 self._write_tool_metric_immediately(tool_id, start_time, success, error_message) 

238 return 

239 

240 metric = BufferedToolMetric( 

241 tool_id=tool_id, 

242 timestamp=datetime.now(timezone.utc), 

243 response_time=time.monotonic() - start_time, 

244 is_success=success, 

245 error_message=error_message, 

246 ) 

247 

248 self._ensure_flush_task_started() 

249 with self._lock: 

250 self._tool_metrics.append(metric) 

251 self._total_buffered += 1 

252 

253 def record_tool_metric_with_duration( 

254 self, 

255 tool_id: str, 

256 response_time: float, 

257 success: bool, 

258 error_message: Optional[str] = None, 

259 ) -> None: 

260 """Buffer a tool metric with pre-calculated response time. 

261 

262 Args: 

263 tool_id: UUID of the tool. 

264 response_time: Pre-calculated response time in seconds. 

265 success: Whether the operation succeeded. 

266 error_message: Optional error message if failed. 

267 """ 

268 if not self.recording_enabled: 

269 return # Execution metrics recording disabled 

270 if not self.enabled: 

271 self._write_tool_metric_with_duration_immediately(tool_id, response_time, success, error_message) 

272 return 

273 

274 metric = BufferedToolMetric( 

275 tool_id=tool_id, 

276 timestamp=datetime.now(timezone.utc), 

277 response_time=response_time, 

278 is_success=success, 

279 error_message=error_message, 

280 ) 

281 

282 self._ensure_flush_task_started() 

283 with self._lock: 

284 self._tool_metrics.append(metric) 

285 self._total_buffered += 1 

286 

287 def record_resource_metric( 

288 self, 

289 resource_id: str, 

290 start_time: float, 

291 success: bool, 

292 error_message: Optional[str] = None, 

293 ) -> None: 

294 """Buffer a resource metric for later flush. 

295 

296 Args: 

297 resource_id: UUID of the resource. 

298 start_time: Monotonic start time for response_time calculation. 

299 success: Whether the operation succeeded. 

300 error_message: Optional error message if failed. 

301 """ 

302 if not self.recording_enabled: 

303 return # Execution metrics recording disabled 

304 if not self.enabled: 

305 self._write_resource_metric_immediately(resource_id, start_time, success, error_message) 

306 return 

307 

308 metric = BufferedResourceMetric( 

309 resource_id=resource_id, 

310 timestamp=datetime.now(timezone.utc), 

311 response_time=time.monotonic() - start_time, 

312 is_success=success, 

313 error_message=error_message, 

314 ) 

315 

316 self._ensure_flush_task_started() 

317 with self._lock: 

318 self._resource_metrics.append(metric) 

319 self._total_buffered += 1 

320 

321 def record_prompt_metric( 

322 self, 

323 prompt_id: str, 

324 start_time: float, 

325 success: bool, 

326 error_message: Optional[str] = None, 

327 ) -> None: 

328 """Buffer a prompt metric for later flush. 

329 

330 Args: 

331 prompt_id: UUID of the prompt. 

332 start_time: Monotonic start time for response_time calculation. 

333 success: Whether the operation succeeded. 

334 error_message: Optional error message if failed. 

335 """ 

336 if not self.recording_enabled: 

337 return # Execution metrics recording disabled 

338 if not self.enabled: 

339 self._write_prompt_metric_immediately(prompt_id, start_time, success, error_message) 

340 return 

341 

342 metric = BufferedPromptMetric( 

343 prompt_id=prompt_id, 

344 timestamp=datetime.now(timezone.utc), 

345 response_time=time.monotonic() - start_time, 

346 is_success=success, 

347 error_message=error_message, 

348 ) 

349 

350 self._ensure_flush_task_started() 

351 with self._lock: 

352 self._prompt_metrics.append(metric) 

353 self._total_buffered += 1 

354 

355 def record_server_metric( 

356 self, 

357 server_id: str, 

358 start_time: float, 

359 success: bool, 

360 error_message: Optional[str] = None, 

361 ) -> None: 

362 """Buffer a server metric for later flush. 

363 

364 Args: 

365 server_id: UUID of the server. 

366 start_time: Monotonic start time for response_time calculation. 

367 success: Whether the operation succeeded. 

368 error_message: Optional error message if failed. 

369 """ 

370 if not self.recording_enabled: 

371 return # Execution metrics recording disabled 

372 if not self.enabled: 

373 self._write_server_metric_immediately(server_id, start_time, success, error_message) 

374 return 

375 

376 metric = BufferedServerMetric( 

377 server_id=server_id, 

378 timestamp=datetime.now(timezone.utc), 

379 response_time=time.monotonic() - start_time, 

380 is_success=success, 

381 error_message=error_message, 

382 ) 

383 

384 self._ensure_flush_task_started() 

385 with self._lock: 

386 self._server_metrics.append(metric) 

387 self._total_buffered += 1 

388 

389 def record_server_metric_with_duration( 

390 self, 

391 server_id: str, 

392 response_time: float, 

393 success: bool, 

394 error_message: Optional[str] = None, 

395 ) -> None: 

396 """Buffer a server metric with pre-calculated response time. 

397 

398 Args: 

399 server_id: UUID of the server. 

400 response_time: Pre-calculated response time in seconds. 

401 success: Whether the operation succeeded. 

402 error_message: Optional error message if failed. 

403 """ 

404 if not self.recording_enabled: 

405 return # Execution metrics recording disabled 

406 if not self.enabled: 

407 self._write_server_metric_with_duration_immediately(server_id, response_time, success, error_message) 

408 return 

409 

410 metric = BufferedServerMetric( 

411 server_id=server_id, 

412 timestamp=datetime.now(timezone.utc), 

413 response_time=response_time, 

414 is_success=success, 

415 error_message=error_message, 

416 ) 

417 

418 self._ensure_flush_task_started() 

419 with self._lock: 

420 self._server_metrics.append(metric) 

421 self._total_buffered += 1 

422 

423 def record_a2a_agent_metric( 

424 self, 

425 a2a_agent_id: str, 

426 start_time: float, 

427 success: bool, 

428 interaction_type: str = "invoke", 

429 error_message: Optional[str] = None, 

430 ) -> None: 

431 """Buffer an A2A agent metric for later flush. 

432 

433 Args: 

434 a2a_agent_id: UUID of the A2A agent. 

435 start_time: Monotonic start time for response_time calculation. 

436 success: Whether the operation succeeded. 

437 interaction_type: Type of interaction (e.g., "invoke"). 

438 error_message: Optional error message if failed. 

439 """ 

440 if not self.recording_enabled: 

441 return # Execution metrics recording disabled 

442 if not self.enabled: 

443 self._write_a2a_agent_metric_immediately(a2a_agent_id, start_time, success, interaction_type, error_message) 

444 return 

445 

446 metric = BufferedA2AAgentMetric( 

447 a2a_agent_id=a2a_agent_id, 

448 timestamp=datetime.now(timezone.utc), 

449 response_time=time.monotonic() - start_time, 

450 is_success=success, 

451 interaction_type=interaction_type, 

452 error_message=error_message, 

453 ) 

454 

455 self._ensure_flush_task_started() 

456 with self._lock: 

457 self._a2a_agent_metrics.append(metric) 

458 self._total_buffered += 1 

459 

460 def record_a2a_agent_metric_with_duration( 

461 self, 

462 a2a_agent_id: str, 

463 response_time: float, 

464 success: bool, 

465 interaction_type: str = "invoke", 

466 error_message: Optional[str] = None, 

467 ) -> None: 

468 """Buffer an A2A agent metric with pre-calculated response time. 

469 

470 Args: 

471 a2a_agent_id: UUID of the A2A agent. 

472 response_time: Pre-calculated response time in seconds. 

473 success: Whether the operation succeeded. 

474 interaction_type: Type of interaction (e.g., "invoke"). 

475 error_message: Optional error message if failed. 

476 """ 

477 if not self.recording_enabled: 

478 return # Execution metrics recording disabled 

479 if not self.enabled: 

480 self._write_a2a_agent_metric_with_duration_immediately(a2a_agent_id, response_time, success, interaction_type, error_message) 

481 return 

482 

483 metric = BufferedA2AAgentMetric( 

484 a2a_agent_id=a2a_agent_id, 

485 timestamp=datetime.now(timezone.utc), 

486 response_time=response_time, 

487 is_success=success, 

488 interaction_type=interaction_type, 

489 error_message=error_message, 

490 ) 

491 

492 self._ensure_flush_task_started() 

493 with self._lock: 

494 self._a2a_agent_metrics.append(metric) 

495 self._total_buffered += 1 

496 

497 async def _flush_loop(self) -> None: 

498 """Background task that periodically flushes buffered metrics. 

499 

500 Raises: 

501 asyncio.CancelledError: When the flush loop is cancelled. 

502 """ 

503 logger.info(f"Metrics flush loop started (interval={self.flush_interval}s)") 

504 

505 while not self._shutdown_event.is_set(): 

506 try: 

507 # Wait for flush interval or shutdown 

508 try: 

509 await asyncio.wait_for( 

510 self._shutdown_event.wait(), 

511 timeout=self.flush_interval, 

512 ) 

513 # Shutdown signaled 

514 break 

515 except asyncio.TimeoutError: 

516 # Normal timeout, proceed to flush 

517 pass 

518 

519 await self._flush_all() 

520 

521 except asyncio.CancelledError: 

522 logger.debug("Flush loop cancelled") 

523 raise 

524 except Exception as e: 

525 logger.error(f"Error in metrics flush loop: {e}", exc_info=True) 

526 # Continue the loop despite errors 

527 await asyncio.sleep(5) 

528 

529 async def _flush_all(self) -> None: 

530 """Flush all buffered metrics to the database.""" 

531 # Swap out buffers atomically 

532 with self._lock: 

533 tool_metrics = list(self._tool_metrics) 

534 resource_metrics = list(self._resource_metrics) 

535 prompt_metrics = list(self._prompt_metrics) 

536 server_metrics = list(self._server_metrics) 

537 a2a_agent_metrics = list(self._a2a_agent_metrics) 

538 self._tool_metrics.clear() 

539 self._resource_metrics.clear() 

540 self._prompt_metrics.clear() 

541 self._server_metrics.clear() 

542 self._a2a_agent_metrics.clear() 

543 

544 total = len(tool_metrics) + len(resource_metrics) + len(prompt_metrics) + len(server_metrics) + len(a2a_agent_metrics) 

545 if total == 0: 

546 return 

547 

548 logger.debug( 

549 f"Flushing {total} metrics: " 

550 f"tools={len(tool_metrics)}, resources={len(resource_metrics)}, prompts={len(prompt_metrics)}, " 

551 f"servers={len(server_metrics)}, a2a_agents={len(a2a_agent_metrics)}" 

552 ) 

553 

554 # Flush in thread to avoid blocking event loop 

555 await asyncio.to_thread( 

556 self._flush_to_db, 

557 tool_metrics, 

558 resource_metrics, 

559 prompt_metrics, 

560 server_metrics, 

561 a2a_agent_metrics, 

562 ) 

563 

564 self._total_flushed += total 

565 self._flush_count += 1 

566 

567 logger.info( 

568 f"Metrics flush #{self._flush_count}: wrote {total} records " 

569 f"(tools={len(tool_metrics)}, resources={len(resource_metrics)}, prompts={len(prompt_metrics)}, " 

570 f"servers={len(server_metrics)}, a2a={len(a2a_agent_metrics)})" 

571 ) 

572 

573 def _flush_to_db( 

574 self, 

575 tool_metrics: list[BufferedToolMetric], 

576 resource_metrics: list[BufferedResourceMetric], 

577 prompt_metrics: list[BufferedPromptMetric], 

578 server_metrics: list[BufferedServerMetric], 

579 a2a_agent_metrics: list[BufferedA2AAgentMetric], 

580 ) -> None: 

581 """Write buffered metrics to database (runs in thread). 

582 

583 Args: 

584 tool_metrics: List of buffered tool metrics to write. 

585 resource_metrics: List of buffered resource metrics to write. 

586 prompt_metrics: List of buffered prompt metrics to write. 

587 server_metrics: List of buffered server metrics to write. 

588 a2a_agent_metrics: List of buffered A2A agent metrics to write. 

589 """ 

590 try: 

591 with fresh_db_session() as db: 

592 # Bulk insert tool metrics 

593 if tool_metrics: 

594 db.bulk_insert_mappings( 

595 ToolMetric, 

596 [ 

597 { 

598 "tool_id": m.tool_id, 

599 "timestamp": m.timestamp, 

600 "response_time": m.response_time, 

601 "is_success": m.is_success, 

602 "error_message": m.error_message, 

603 } 

604 for m in tool_metrics 

605 ], 

606 ) 

607 

608 # Bulk insert resource metrics 

609 if resource_metrics: 

610 db.bulk_insert_mappings( 

611 ResourceMetric, 

612 [ 

613 { 

614 "resource_id": m.resource_id, 

615 "timestamp": m.timestamp, 

616 "response_time": m.response_time, 

617 "is_success": m.is_success, 

618 "error_message": m.error_message, 

619 } 

620 for m in resource_metrics 

621 ], 

622 ) 

623 

624 # Bulk insert prompt metrics 

625 if prompt_metrics: 

626 db.bulk_insert_mappings( 

627 PromptMetric, 

628 [ 

629 { 

630 "prompt_id": m.prompt_id, 

631 "timestamp": m.timestamp, 

632 "response_time": m.response_time, 

633 "is_success": m.is_success, 

634 "error_message": m.error_message, 

635 } 

636 for m in prompt_metrics 

637 ], 

638 ) 

639 

640 # Bulk insert A2A agent metrics 

641 if a2a_agent_metrics: 

642 db.bulk_insert_mappings( 

643 A2AAgentMetric, 

644 [ 

645 { 

646 "a2a_agent_id": m.a2a_agent_id, 

647 "timestamp": m.timestamp, 

648 "response_time": m.response_time, 

649 "is_success": m.is_success, 

650 "interaction_type": m.interaction_type, 

651 "error_message": m.error_message, 

652 } 

653 for m in a2a_agent_metrics 

654 ], 

655 ) 

656 

657 db.commit() 

658 

659 except Exception as e: 

660 logger.error(f"Failed to flush metrics to database: {e}", exc_info=True) 

661 # Metrics are lost on failure - acceptable trade-off for performance 

662 # Could implement retry queue if needed 

663 

664 # Flush server metrics in a separate transaction so that an invalid 

665 # server_id (FK violation) does not roll back tool/resource/prompt/a2a 

666 # metrics. server_id can originate from untrusted headers (X-Server-ID) 

667 # in admin API paths, so it may reference a nonexistent server. 

668 if server_metrics: 

669 try: 

670 with fresh_db_session() as db: 

671 db.bulk_insert_mappings( 

672 ServerMetric, 

673 [ 

674 { 

675 "server_id": m.server_id, 

676 "timestamp": m.timestamp, 

677 "response_time": m.response_time, 

678 "is_success": m.is_success, 

679 "error_message": m.error_message, 

680 } 

681 for m in server_metrics 

682 ], 

683 ) 

684 db.commit() 

685 except Exception as e: 

686 logger.error(f"Failed to flush server metrics to database: {e}", exc_info=True) 

687 

688 def _write_tool_metric_immediately( 

689 self, 

690 tool_id: str, 

691 start_time: float, 

692 success: bool, 

693 error_message: Optional[str], 

694 ) -> None: 

695 """Write a single tool metric immediately (fallback when buffering disabled). 

696 

697 Args: 

698 tool_id: UUID of the tool. 

699 start_time: Monotonic start time for response_time calculation. 

700 success: Whether the operation succeeded. 

701 error_message: Optional error message if failed. 

702 """ 

703 try: 

704 with fresh_db_session() as db: 

705 metric = ToolMetric( 

706 tool_id=tool_id, 

707 timestamp=datetime.now(timezone.utc), 

708 response_time=time.monotonic() - start_time, 

709 is_success=success, 

710 error_message=error_message, 

711 ) 

712 db.add(metric) 

713 db.commit() 

714 except Exception as e: 

715 logger.error(f"Failed to write tool metric: {e}") 

716 

717 def _write_tool_metric_with_duration_immediately( 

718 self, 

719 tool_id: str, 

720 response_time: float, 

721 success: bool, 

722 error_message: Optional[str], 

723 ) -> None: 

724 """Write a single tool metric with pre-calculated duration immediately. 

725 

726 Args: 

727 tool_id: UUID of the tool. 

728 response_time: Pre-calculated response time in seconds. 

729 success: Whether the operation succeeded. 

730 error_message: Optional error message if failed. 

731 """ 

732 try: 

733 with fresh_db_session() as db: 

734 metric = ToolMetric( 

735 tool_id=tool_id, 

736 timestamp=datetime.now(timezone.utc), 

737 response_time=response_time, 

738 is_success=success, 

739 error_message=error_message, 

740 ) 

741 db.add(metric) 

742 db.commit() 

743 except Exception as e: 

744 logger.error(f"Failed to write tool metric: {e}") 

745 

746 def _write_resource_metric_immediately( 

747 self, 

748 resource_id: str, 

749 start_time: float, 

750 success: bool, 

751 error_message: Optional[str], 

752 ) -> None: 

753 """Write a single resource metric immediately. 

754 

755 Args: 

756 resource_id: UUID of the resource. 

757 start_time: Monotonic start time for response_time calculation. 

758 success: Whether the operation succeeded. 

759 error_message: Optional error message if failed. 

760 """ 

761 try: 

762 with fresh_db_session() as db: 

763 metric = ResourceMetric( 

764 resource_id=resource_id, 

765 timestamp=datetime.now(timezone.utc), 

766 response_time=time.monotonic() - start_time, 

767 is_success=success, 

768 error_message=error_message, 

769 ) 

770 db.add(metric) 

771 db.commit() 

772 except Exception as e: 

773 logger.error(f"Failed to write resource metric: {e}") 

774 

775 def _write_prompt_metric_immediately( 

776 self, 

777 prompt_id: str, 

778 start_time: float, 

779 success: bool, 

780 error_message: Optional[str], 

781 ) -> None: 

782 """Write a single prompt metric immediately. 

783 

784 Args: 

785 prompt_id: UUID of the prompt. 

786 start_time: Monotonic start time for response_time calculation. 

787 success: Whether the operation succeeded. 

788 error_message: Optional error message if failed. 

789 """ 

790 try: 

791 with fresh_db_session() as db: 

792 metric = PromptMetric( 

793 prompt_id=prompt_id, 

794 timestamp=datetime.now(timezone.utc), 

795 response_time=time.monotonic() - start_time, 

796 is_success=success, 

797 error_message=error_message, 

798 ) 

799 db.add(metric) 

800 db.commit() 

801 except Exception as e: 

802 logger.error(f"Failed to write prompt metric: {e}") 

803 

804 def _write_server_metric_immediately( 

805 self, 

806 server_id: str, 

807 start_time: float, 

808 success: bool, 

809 error_message: Optional[str], 

810 ) -> None: 

811 """Write a single server metric immediately. 

812 

813 Args: 

814 server_id: UUID of the server. 

815 start_time: Monotonic start time for response_time calculation. 

816 success: Whether the operation succeeded. 

817 error_message: Optional error message if failed. 

818 """ 

819 try: 

820 with fresh_db_session() as db: 

821 metric = ServerMetric( 

822 server_id=server_id, 

823 timestamp=datetime.now(timezone.utc), 

824 response_time=time.monotonic() - start_time, 

825 is_success=success, 

826 error_message=error_message, 

827 ) 

828 db.add(metric) 

829 db.commit() 

830 except Exception as e: 

831 logger.error(f"Failed to write server metric: {e}") 

832 

833 def _write_server_metric_with_duration_immediately( 

834 self, 

835 server_id: str, 

836 response_time: float, 

837 success: bool, 

838 error_message: Optional[str], 

839 ) -> None: 

840 """Write a single server metric with pre-calculated duration immediately. 

841 

842 Args: 

843 server_id: UUID of the server. 

844 response_time: Pre-calculated response time in seconds. 

845 success: Whether the operation succeeded. 

846 error_message: Optional error message if failed. 

847 """ 

848 try: 

849 with fresh_db_session() as db: 

850 metric = ServerMetric( 

851 server_id=server_id, 

852 timestamp=datetime.now(timezone.utc), 

853 response_time=response_time, 

854 is_success=success, 

855 error_message=error_message, 

856 ) 

857 db.add(metric) 

858 db.commit() 

859 except Exception as e: 

860 logger.error(f"Failed to write server metric: {e}") 

861 

862 def _write_a2a_agent_metric_immediately( 

863 self, 

864 a2a_agent_id: str, 

865 start_time: float, 

866 success: bool, 

867 interaction_type: str, 

868 error_message: Optional[str], 

869 ) -> None: 

870 """Write a single A2A agent metric immediately. 

871 

872 Args: 

873 a2a_agent_id: UUID of the A2A agent. 

874 start_time: Monotonic start time for response_time calculation. 

875 success: Whether the operation succeeded. 

876 interaction_type: Type of interaction (e.g., "invoke"). 

877 error_message: Optional error message if failed. 

878 """ 

879 try: 

880 with fresh_db_session() as db: 

881 metric = A2AAgentMetric( 

882 a2a_agent_id=a2a_agent_id, 

883 timestamp=datetime.now(timezone.utc), 

884 response_time=time.monotonic() - start_time, 

885 is_success=success, 

886 interaction_type=interaction_type, 

887 error_message=error_message, 

888 ) 

889 db.add(metric) 

890 db.commit() 

891 except Exception as e: 

892 logger.error(f"Failed to write A2A agent metric: {e}") 

893 

894 def _write_a2a_agent_metric_with_duration_immediately( 

895 self, 

896 a2a_agent_id: str, 

897 response_time: float, 

898 success: bool, 

899 interaction_type: str, 

900 error_message: Optional[str], 

901 ) -> None: 

902 """Write a single A2A agent metric with pre-calculated duration immediately. 

903 

904 Args: 

905 a2a_agent_id: UUID of the A2A agent. 

906 response_time: Pre-calculated response time in seconds. 

907 success: Whether the operation succeeded. 

908 interaction_type: Type of interaction (e.g., "invoke"). 

909 error_message: Optional error message if failed. 

910 """ 

911 try: 

912 with fresh_db_session() as db: 

913 metric = A2AAgentMetric( 

914 a2a_agent_id=a2a_agent_id, 

915 timestamp=datetime.now(timezone.utc), 

916 response_time=response_time, 

917 is_success=success, 

918 interaction_type=interaction_type, 

919 error_message=error_message, 

920 ) 

921 db.add(metric) 

922 db.commit() 

923 except Exception as e: 

924 logger.error(f"Failed to write A2A agent metric: {e}") 

925 

926 def get_stats(self) -> dict: 

927 """Get buffer statistics for monitoring. 

928 

929 Returns: 

930 dict: Buffer statistics including enabled state, sizes, and counts. 

931 """ 

932 with self._lock: 

933 current_size = len(self._tool_metrics) + len(self._resource_metrics) + len(self._prompt_metrics) + len(self._server_metrics) + len(self._a2a_agent_metrics) 

934 

935 return { 

936 "recording_enabled": self.recording_enabled, 

937 "enabled": self.enabled, 

938 "flush_interval": self.flush_interval, 

939 "max_buffer_size": self.max_buffer_size, 

940 "current_buffer_size": current_size, 

941 "total_buffered": self._total_buffered, 

942 "total_flushed": self._total_flushed, 

943 "flush_count": self._flush_count, 

944 } 

945 

946 

947# Singleton instance 

948_metrics_buffer_service: Optional[MetricsBufferService] = None 

949 

950 

951def get_metrics_buffer_service() -> MetricsBufferService: 

952 """Get or create the singleton MetricsBufferService instance. 

953 

954 Returns: 

955 MetricsBufferService: The singleton metrics buffer service instance. 

956 """ 

957 global _metrics_buffer_service # pylint: disable=global-statement 

958 if _metrics_buffer_service is None: 

959 _metrics_buffer_service = MetricsBufferService() 

960 return _metrics_buffer_service