Coverage for mcpgateway / services / metrics_buffer_service.py: 99%

256 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Buffered metrics service for batching metric writes to the database. 

3 

4This service accumulates metrics in memory and flushes them to the database 

5periodically, reducing DB write pressure under high load. 

6 

7Copyright 2025 

8SPDX-License-Identifier: Apache-2.0 

9""" 

10 

11# Standard 

12import asyncio 

13from collections import deque 

14from dataclasses import dataclass 

15from datetime import datetime, timezone 

16import logging 

17import threading 

18import time 

19from typing import Deque, Optional 

20 

21# First-Party 

22from mcpgateway.config import settings 

23from mcpgateway.db import A2AAgentMetric, fresh_db_session, PromptMetric, ResourceMetric, ServerMetric, ToolMetric 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28@dataclass 

29class BufferedToolMetric: 

30 """Buffered tool metric entry.""" 

31 

32 tool_id: str 

33 timestamp: datetime 

34 response_time: float 

35 is_success: bool 

36 error_message: Optional[str] = None 

37 

38 

39@dataclass 

40class BufferedResourceMetric: 

41 """Buffered resource metric entry.""" 

42 

43 resource_id: str 

44 timestamp: datetime 

45 response_time: float 

46 is_success: bool 

47 error_message: Optional[str] = None 

48 

49 

50@dataclass 

51class BufferedPromptMetric: 

52 """Buffered prompt metric entry.""" 

53 

54 prompt_id: str 

55 timestamp: datetime 

56 response_time: float 

57 is_success: bool 

58 error_message: Optional[str] = None 

59 

60 

61@dataclass 

62class BufferedServerMetric: 

63 """Buffered server metric entry.""" 

64 

65 server_id: str 

66 timestamp: datetime 

67 response_time: float 

68 is_success: bool 

69 error_message: Optional[str] = None 

70 

71 

72@dataclass 

73class BufferedA2AAgentMetric: 

74 """Buffered A2A agent metric entry.""" 

75 

76 a2a_agent_id: str 

77 timestamp: datetime 

78 response_time: float 

79 is_success: bool 

80 interaction_type: str = "invoke" 

81 error_message: Optional[str] = None 

82 

83 

84class MetricsBufferService: 

85 """Service for buffering and batching metrics writes to the database. 

86 

87 This service provides: 

88 - Thread-safe buffering of tool, resource, prompt, server, and A2A agent metrics 

89 - Periodic flushing to database (configurable interval) 

90 - Graceful shutdown with final flush 

91 

92 Configuration (via environment variables): 

93 - METRICS_BUFFER_ENABLED: Enable buffered metrics (default: True) 

94 - METRICS_BUFFER_FLUSH_INTERVAL: Seconds between flushes (default: 60) 

95 - METRICS_BUFFER_MAX_SIZE: Max entries before forced flush (default: 1000) 

96 """ 

97 

98 def __init__( 

99 self, 

100 flush_interval: Optional[int] = None, 

101 max_buffer_size: Optional[int] = None, 

102 enabled: Optional[bool] = None, 

103 ): 

104 """Initialize the metrics buffer service. 

105 

106 Args: 

107 flush_interval: Seconds between automatic flushes (default: from settings or 60) 

108 max_buffer_size: Maximum buffer entries before forced flush (default: from settings or 1000) 

109 enabled: Whether buffering is enabled (default: from settings or True) 

110 """ 

111 self.flush_interval = flush_interval or getattr(settings, "metrics_buffer_flush_interval", 60) 

112 self.max_buffer_size = max_buffer_size or getattr(settings, "metrics_buffer_max_size", 1000) 

113 self.enabled = enabled if enabled is not None else getattr(settings, "metrics_buffer_enabled", True) 

114 self.recording_enabled = getattr(settings, "db_metrics_recording_enabled", True) 

115 

116 # Thread-safe buffers using deque with locks 

117 self._tool_metrics: Deque[BufferedToolMetric] = deque() 

118 self._resource_metrics: Deque[BufferedResourceMetric] = deque() 

119 self._prompt_metrics: Deque[BufferedPromptMetric] = deque() 

120 self._server_metrics: Deque[BufferedServerMetric] = deque() 

121 self._a2a_agent_metrics: Deque[BufferedA2AAgentMetric] = deque() 

122 self._lock = threading.Lock() 

123 

124 # Background flush task 

125 self._flush_task: Optional[asyncio.Task] = None 

126 self._shutdown_event = asyncio.Event() 

127 

128 # Stats for monitoring 

129 self._total_buffered = 0 

130 self._total_flushed = 0 

131 self._flush_count = 0 

132 

133 logger.info( 

134 f"MetricsBufferService initialized: recording_enabled={self.recording_enabled}, " 

135 f"buffer_enabled={self.enabled}, flush_interval={self.flush_interval}s, max_buffer_size={self.max_buffer_size}" 

136 ) 

137 

138 async def start(self) -> None: 

139 """Start the background flush task.""" 

140 if not self.recording_enabled: 

141 logger.info("MetricsBufferService: recording disabled, skipping flush loop") 

142 return 

143 if not self.enabled: 

144 logger.info("MetricsBufferService disabled, skipping start") 

145 return 

146 

147 if self._flush_task is None or self._flush_task.done(): 147 ↛ exitline 147 didn't return from function 'start' because the condition on line 147 was always true

148 self._shutdown_event.clear() 

149 self._flush_task = asyncio.create_task(self._flush_loop()) 

150 logger.info("MetricsBufferService flush task started") 

151 

152 async def shutdown(self) -> None: 

153 """Shutdown service with final flush.""" 

154 logger.info("MetricsBufferService shutting down...") 

155 

156 # Signal shutdown 

157 self._shutdown_event.set() 

158 

159 # Cancel the flush task 

160 if self._flush_task: 

161 self._flush_task.cancel() 

162 try: 

163 await self._flush_task 

164 except asyncio.CancelledError: 

165 pass 

166 

167 # Final flush to persist any remaining metrics 

168 await self._flush_all() 

169 

170 logger.info(f"MetricsBufferService shutdown complete: " f"total_buffered={self._total_buffered}, total_flushed={self._total_flushed}, " f"flush_count={self._flush_count}") 

171 

172 def record_tool_metric( 

173 self, 

174 tool_id: str, 

175 start_time: float, 

176 success: bool, 

177 error_message: Optional[str] = None, 

178 ) -> None: 

179 """Buffer a tool metric for later flush. 

180 

181 Args: 

182 tool_id: The UUID string of the tool. 

183 start_time: The monotonic start time of the invocation. 

184 success: True if the invocation succeeded. 

185 error_message: Error message if failed. 

186 """ 

187 if not self.recording_enabled: 

188 return # Execution metrics recording disabled 

189 if not self.enabled: 

190 # Fall back to immediate write 

191 self._write_tool_metric_immediately(tool_id, start_time, success, error_message) 

192 return 

193 

194 metric = BufferedToolMetric( 

195 tool_id=tool_id, 

196 timestamp=datetime.now(timezone.utc), 

197 response_time=time.monotonic() - start_time, 

198 is_success=success, 

199 error_message=error_message, 

200 ) 

201 

202 with self._lock: 

203 self._tool_metrics.append(metric) 

204 self._total_buffered += 1 

205 

206 def record_resource_metric( 

207 self, 

208 resource_id: str, 

209 start_time: float, 

210 success: bool, 

211 error_message: Optional[str] = None, 

212 ) -> None: 

213 """Buffer a resource metric for later flush. 

214 

215 Args: 

216 resource_id: UUID of the resource. 

217 start_time: Monotonic start time for response_time calculation. 

218 success: Whether the operation succeeded. 

219 error_message: Optional error message if failed. 

220 """ 

221 if not self.recording_enabled: 

222 return # Execution metrics recording disabled 

223 if not self.enabled: 

224 self._write_resource_metric_immediately(resource_id, start_time, success, error_message) 

225 return 

226 

227 metric = BufferedResourceMetric( 

228 resource_id=resource_id, 

229 timestamp=datetime.now(timezone.utc), 

230 response_time=time.monotonic() - start_time, 

231 is_success=success, 

232 error_message=error_message, 

233 ) 

234 

235 with self._lock: 

236 self._resource_metrics.append(metric) 

237 self._total_buffered += 1 

238 

239 def record_prompt_metric( 

240 self, 

241 prompt_id: str, 

242 start_time: float, 

243 success: bool, 

244 error_message: Optional[str] = None, 

245 ) -> None: 

246 """Buffer a prompt metric for later flush. 

247 

248 Args: 

249 prompt_id: UUID of the prompt. 

250 start_time: Monotonic start time for response_time calculation. 

251 success: Whether the operation succeeded. 

252 error_message: Optional error message if failed. 

253 """ 

254 if not self.recording_enabled: 

255 return # Execution metrics recording disabled 

256 if not self.enabled: 

257 self._write_prompt_metric_immediately(prompt_id, start_time, success, error_message) 

258 return 

259 

260 metric = BufferedPromptMetric( 

261 prompt_id=prompt_id, 

262 timestamp=datetime.now(timezone.utc), 

263 response_time=time.monotonic() - start_time, 

264 is_success=success, 

265 error_message=error_message, 

266 ) 

267 

268 with self._lock: 

269 self._prompt_metrics.append(metric) 

270 self._total_buffered += 1 

271 

272 def record_server_metric( 

273 self, 

274 server_id: str, 

275 start_time: float, 

276 success: bool, 

277 error_message: Optional[str] = None, 

278 ) -> None: 

279 """Buffer a server metric for later flush. 

280 

281 Args: 

282 server_id: UUID of the server. 

283 start_time: Monotonic start time for response_time calculation. 

284 success: Whether the operation succeeded. 

285 error_message: Optional error message if failed. 

286 """ 

287 if not self.recording_enabled: 

288 return # Execution metrics recording disabled 

289 if not self.enabled: 

290 self._write_server_metric_immediately(server_id, start_time, success, error_message) 

291 return 

292 

293 metric = BufferedServerMetric( 

294 server_id=server_id, 

295 timestamp=datetime.now(timezone.utc), 

296 response_time=time.monotonic() - start_time, 

297 is_success=success, 

298 error_message=error_message, 

299 ) 

300 

301 with self._lock: 

302 self._server_metrics.append(metric) 

303 self._total_buffered += 1 

304 

305 def record_a2a_agent_metric( 

306 self, 

307 a2a_agent_id: str, 

308 start_time: float, 

309 success: bool, 

310 interaction_type: str = "invoke", 

311 error_message: Optional[str] = None, 

312 ) -> None: 

313 """Buffer an A2A agent metric for later flush. 

314 

315 Args: 

316 a2a_agent_id: UUID of the A2A agent. 

317 start_time: Monotonic start time for response_time calculation. 

318 success: Whether the operation succeeded. 

319 interaction_type: Type of interaction (e.g., "invoke"). 

320 error_message: Optional error message if failed. 

321 """ 

322 if not self.recording_enabled: 

323 return # Execution metrics recording disabled 

324 if not self.enabled: 

325 self._write_a2a_agent_metric_immediately(a2a_agent_id, start_time, success, interaction_type, error_message) 

326 return 

327 

328 metric = BufferedA2AAgentMetric( 

329 a2a_agent_id=a2a_agent_id, 

330 timestamp=datetime.now(timezone.utc), 

331 response_time=time.monotonic() - start_time, 

332 is_success=success, 

333 interaction_type=interaction_type, 

334 error_message=error_message, 

335 ) 

336 

337 with self._lock: 

338 self._a2a_agent_metrics.append(metric) 

339 self._total_buffered += 1 

340 

341 def record_a2a_agent_metric_with_duration( 

342 self, 

343 a2a_agent_id: str, 

344 response_time: float, 

345 success: bool, 

346 interaction_type: str = "invoke", 

347 error_message: Optional[str] = None, 

348 ) -> None: 

349 """Buffer an A2A agent metric with pre-calculated response time. 

350 

351 Args: 

352 a2a_agent_id: UUID of the A2A agent. 

353 response_time: Pre-calculated response time in seconds. 

354 success: Whether the operation succeeded. 

355 interaction_type: Type of interaction (e.g., "invoke"). 

356 error_message: Optional error message if failed. 

357 """ 

358 if not self.recording_enabled: 

359 return # Execution metrics recording disabled 

360 if not self.enabled: 

361 self._write_a2a_agent_metric_with_duration_immediately(a2a_agent_id, response_time, success, interaction_type, error_message) 

362 return 

363 

364 metric = BufferedA2AAgentMetric( 

365 a2a_agent_id=a2a_agent_id, 

366 timestamp=datetime.now(timezone.utc), 

367 response_time=response_time, 

368 is_success=success, 

369 interaction_type=interaction_type, 

370 error_message=error_message, 

371 ) 

372 

373 with self._lock: 

374 self._a2a_agent_metrics.append(metric) 

375 self._total_buffered += 1 

376 

377 async def _flush_loop(self) -> None: 

378 """Background task that periodically flushes buffered metrics. 

379 

380 Raises: 

381 asyncio.CancelledError: When the flush loop is cancelled. 

382 """ 

383 logger.info(f"Metrics flush loop started (interval={self.flush_interval}s)") 

384 

385 while not self._shutdown_event.is_set(): 

386 try: 

387 # Wait for flush interval or shutdown 

388 try: 

389 await asyncio.wait_for( 

390 self._shutdown_event.wait(), 

391 timeout=self.flush_interval, 

392 ) 

393 # Shutdown signaled 

394 break 

395 except asyncio.TimeoutError: 

396 # Normal timeout, proceed to flush 

397 pass 

398 

399 await self._flush_all() 

400 

401 except asyncio.CancelledError: 

402 logger.debug("Flush loop cancelled") 

403 raise 

404 except Exception as e: 

405 logger.error(f"Error in metrics flush loop: {e}", exc_info=True) 

406 # Continue the loop despite errors 

407 await asyncio.sleep(5) 

408 

409 async def _flush_all(self) -> None: 

410 """Flush all buffered metrics to the database.""" 

411 # Swap out buffers atomically 

412 with self._lock: 

413 tool_metrics = list(self._tool_metrics) 

414 resource_metrics = list(self._resource_metrics) 

415 prompt_metrics = list(self._prompt_metrics) 

416 server_metrics = list(self._server_metrics) 

417 a2a_agent_metrics = list(self._a2a_agent_metrics) 

418 self._tool_metrics.clear() 

419 self._resource_metrics.clear() 

420 self._prompt_metrics.clear() 

421 self._server_metrics.clear() 

422 self._a2a_agent_metrics.clear() 

423 

424 total = len(tool_metrics) + len(resource_metrics) + len(prompt_metrics) + len(server_metrics) + len(a2a_agent_metrics) 

425 if total == 0: 

426 return 

427 

428 logger.debug( 

429 f"Flushing {total} metrics: " 

430 f"tools={len(tool_metrics)}, resources={len(resource_metrics)}, prompts={len(prompt_metrics)}, " 

431 f"servers={len(server_metrics)}, a2a_agents={len(a2a_agent_metrics)}" 

432 ) 

433 

434 # Flush in thread to avoid blocking event loop 

435 await asyncio.to_thread( 

436 self._flush_to_db, 

437 tool_metrics, 

438 resource_metrics, 

439 prompt_metrics, 

440 server_metrics, 

441 a2a_agent_metrics, 

442 ) 

443 

444 self._total_flushed += total 

445 self._flush_count += 1 

446 

447 logger.info( 

448 f"Metrics flush #{self._flush_count}: wrote {total} records " 

449 f"(tools={len(tool_metrics)}, resources={len(resource_metrics)}, prompts={len(prompt_metrics)}, " 

450 f"servers={len(server_metrics)}, a2a={len(a2a_agent_metrics)})" 

451 ) 

452 

453 def _flush_to_db( 

454 self, 

455 tool_metrics: list[BufferedToolMetric], 

456 resource_metrics: list[BufferedResourceMetric], 

457 prompt_metrics: list[BufferedPromptMetric], 

458 server_metrics: list[BufferedServerMetric], 

459 a2a_agent_metrics: list[BufferedA2AAgentMetric], 

460 ) -> None: 

461 """Write buffered metrics to database (runs in thread). 

462 

463 Args: 

464 tool_metrics: List of buffered tool metrics to write. 

465 resource_metrics: List of buffered resource metrics to write. 

466 prompt_metrics: List of buffered prompt metrics to write. 

467 server_metrics: List of buffered server metrics to write. 

468 a2a_agent_metrics: List of buffered A2A agent metrics to write. 

469 """ 

470 try: 

471 with fresh_db_session() as db: 

472 # Bulk insert tool metrics 

473 if tool_metrics: 

474 db.bulk_insert_mappings( 

475 ToolMetric, 

476 [ 

477 { 

478 "tool_id": m.tool_id, 

479 "timestamp": m.timestamp, 

480 "response_time": m.response_time, 

481 "is_success": m.is_success, 

482 "error_message": m.error_message, 

483 } 

484 for m in tool_metrics 

485 ], 

486 ) 

487 

488 # Bulk insert resource metrics 

489 if resource_metrics: 489 ↛ 505line 489 didn't jump to line 505 because the condition on line 489 was always true

490 db.bulk_insert_mappings( 

491 ResourceMetric, 

492 [ 

493 { 

494 "resource_id": m.resource_id, 

495 "timestamp": m.timestamp, 

496 "response_time": m.response_time, 

497 "is_success": m.is_success, 

498 "error_message": m.error_message, 

499 } 

500 for m in resource_metrics 

501 ], 

502 ) 

503 

504 # Bulk insert prompt metrics 

505 if prompt_metrics: 

506 db.bulk_insert_mappings( 

507 PromptMetric, 

508 [ 

509 { 

510 "prompt_id": m.prompt_id, 

511 "timestamp": m.timestamp, 

512 "response_time": m.response_time, 

513 "is_success": m.is_success, 

514 "error_message": m.error_message, 

515 } 

516 for m in prompt_metrics 

517 ], 

518 ) 

519 

520 # Bulk insert server metrics 

521 if server_metrics: 

522 db.bulk_insert_mappings( 

523 ServerMetric, 

524 [ 

525 { 

526 "server_id": m.server_id, 

527 "timestamp": m.timestamp, 

528 "response_time": m.response_time, 

529 "is_success": m.is_success, 

530 "error_message": m.error_message, 

531 } 

532 for m in server_metrics 

533 ], 

534 ) 

535 

536 # Bulk insert A2A agent metrics 

537 if a2a_agent_metrics: 

538 db.bulk_insert_mappings( 

539 A2AAgentMetric, 

540 [ 

541 { 

542 "a2a_agent_id": m.a2a_agent_id, 

543 "timestamp": m.timestamp, 

544 "response_time": m.response_time, 

545 "is_success": m.is_success, 

546 "interaction_type": m.interaction_type, 

547 "error_message": m.error_message, 

548 } 

549 for m in a2a_agent_metrics 

550 ], 

551 ) 

552 

553 db.commit() 

554 

555 except Exception as e: 

556 logger.error(f"Failed to flush metrics to database: {e}", exc_info=True) 

557 # Metrics are lost on failure - acceptable trade-off for performance 

558 # Could implement retry queue if needed 

559 

560 def _write_tool_metric_immediately( 

561 self, 

562 tool_id: str, 

563 start_time: float, 

564 success: bool, 

565 error_message: Optional[str], 

566 ) -> None: 

567 """Write a single tool metric immediately (fallback when buffering disabled). 

568 

569 Args: 

570 tool_id: UUID of the tool. 

571 start_time: Monotonic start time for response_time calculation. 

572 success: Whether the operation succeeded. 

573 error_message: Optional error message if failed. 

574 """ 

575 try: 

576 with fresh_db_session() as db: 

577 metric = ToolMetric( 

578 tool_id=tool_id, 

579 timestamp=datetime.now(timezone.utc), 

580 response_time=time.monotonic() - start_time, 

581 is_success=success, 

582 error_message=error_message, 

583 ) 

584 db.add(metric) 

585 db.commit() 

586 except Exception as e: 

587 logger.error(f"Failed to write tool metric: {e}") 

588 

589 def _write_resource_metric_immediately( 

590 self, 

591 resource_id: str, 

592 start_time: float, 

593 success: bool, 

594 error_message: Optional[str], 

595 ) -> None: 

596 """Write a single resource metric immediately. 

597 

598 Args: 

599 resource_id: UUID of the resource. 

600 start_time: Monotonic start time for response_time calculation. 

601 success: Whether the operation succeeded. 

602 error_message: Optional error message if failed. 

603 """ 

604 try: 

605 with fresh_db_session() as db: 

606 metric = ResourceMetric( 

607 resource_id=resource_id, 

608 timestamp=datetime.now(timezone.utc), 

609 response_time=time.monotonic() - start_time, 

610 is_success=success, 

611 error_message=error_message, 

612 ) 

613 db.add(metric) 

614 db.commit() 

615 except Exception as e: 

616 logger.error(f"Failed to write resource metric: {e}") 

617 

618 def _write_prompt_metric_immediately( 

619 self, 

620 prompt_id: str, 

621 start_time: float, 

622 success: bool, 

623 error_message: Optional[str], 

624 ) -> None: 

625 """Write a single prompt metric immediately. 

626 

627 Args: 

628 prompt_id: UUID of the prompt. 

629 start_time: Monotonic start time for response_time calculation. 

630 success: Whether the operation succeeded. 

631 error_message: Optional error message if failed. 

632 """ 

633 try: 

634 with fresh_db_session() as db: 

635 metric = PromptMetric( 

636 prompt_id=prompt_id, 

637 timestamp=datetime.now(timezone.utc), 

638 response_time=time.monotonic() - start_time, 

639 is_success=success, 

640 error_message=error_message, 

641 ) 

642 db.add(metric) 

643 db.commit() 

644 except Exception as e: 

645 logger.error(f"Failed to write prompt metric: {e}") 

646 

647 def _write_server_metric_immediately( 

648 self, 

649 server_id: str, 

650 start_time: float, 

651 success: bool, 

652 error_message: Optional[str], 

653 ) -> None: 

654 """Write a single server metric immediately. 

655 

656 Args: 

657 server_id: UUID of the server. 

658 start_time: Monotonic start time for response_time calculation. 

659 success: Whether the operation succeeded. 

660 error_message: Optional error message if failed. 

661 """ 

662 try: 

663 with fresh_db_session() as db: 

664 metric = ServerMetric( 

665 server_id=server_id, 

666 timestamp=datetime.now(timezone.utc), 

667 response_time=time.monotonic() - start_time, 

668 is_success=success, 

669 error_message=error_message, 

670 ) 

671 db.add(metric) 

672 db.commit() 

673 except Exception as e: 

674 logger.error(f"Failed to write server metric: {e}") 

675 

676 def _write_a2a_agent_metric_immediately( 

677 self, 

678 a2a_agent_id: str, 

679 start_time: float, 

680 success: bool, 

681 interaction_type: str, 

682 error_message: Optional[str], 

683 ) -> None: 

684 """Write a single A2A agent metric immediately. 

685 

686 Args: 

687 a2a_agent_id: UUID of the A2A agent. 

688 start_time: Monotonic start time for response_time calculation. 

689 success: Whether the operation succeeded. 

690 interaction_type: Type of interaction (e.g., "invoke"). 

691 error_message: Optional error message if failed. 

692 """ 

693 try: 

694 with fresh_db_session() as db: 

695 metric = A2AAgentMetric( 

696 a2a_agent_id=a2a_agent_id, 

697 timestamp=datetime.now(timezone.utc), 

698 response_time=time.monotonic() - start_time, 

699 is_success=success, 

700 interaction_type=interaction_type, 

701 error_message=error_message, 

702 ) 

703 db.add(metric) 

704 db.commit() 

705 except Exception as e: 

706 logger.error(f"Failed to write A2A agent metric: {e}") 

707 

708 def _write_a2a_agent_metric_with_duration_immediately( 

709 self, 

710 a2a_agent_id: str, 

711 response_time: float, 

712 success: bool, 

713 interaction_type: str, 

714 error_message: Optional[str], 

715 ) -> None: 

716 """Write a single A2A agent metric with pre-calculated duration immediately. 

717 

718 Args: 

719 a2a_agent_id: UUID of the A2A agent. 

720 response_time: Pre-calculated response time in seconds. 

721 success: Whether the operation succeeded. 

722 interaction_type: Type of interaction (e.g., "invoke"). 

723 error_message: Optional error message if failed. 

724 """ 

725 try: 

726 with fresh_db_session() as db: 

727 metric = A2AAgentMetric( 

728 a2a_agent_id=a2a_agent_id, 

729 timestamp=datetime.now(timezone.utc), 

730 response_time=response_time, 

731 is_success=success, 

732 interaction_type=interaction_type, 

733 error_message=error_message, 

734 ) 

735 db.add(metric) 

736 db.commit() 

737 except Exception as e: 

738 logger.error(f"Failed to write A2A agent metric: {e}") 

739 

740 def get_stats(self) -> dict: 

741 """Get buffer statistics for monitoring. 

742 

743 Returns: 

744 dict: Buffer statistics including enabled state, sizes, and counts. 

745 """ 

746 with self._lock: 

747 current_size = len(self._tool_metrics) + len(self._resource_metrics) + len(self._prompt_metrics) + len(self._server_metrics) + len(self._a2a_agent_metrics) 

748 

749 return { 

750 "recording_enabled": self.recording_enabled, 

751 "enabled": self.enabled, 

752 "flush_interval": self.flush_interval, 

753 "max_buffer_size": self.max_buffer_size, 

754 "current_buffer_size": current_size, 

755 "total_buffered": self._total_buffered, 

756 "total_flushed": self._total_flushed, 

757 "flush_count": self._flush_count, 

758 } 

759 

760 

761# Singleton instance 

762_metrics_buffer_service: Optional[MetricsBufferService] = None 

763 

764 

765def get_metrics_buffer_service() -> MetricsBufferService: 

766 """Get or create the singleton MetricsBufferService instance. 

767 

768 Returns: 

769 MetricsBufferService: The singleton metrics buffer service instance. 

770 """ 

771 global _metrics_buffer_service # pylint: disable=global-statement 

772 if _metrics_buffer_service is None: 

773 _metrics_buffer_service = MetricsBufferService() 

774 return _metrics_buffer_service