Coverage for mcpgateway / services / metrics_buffer_service.py: 92%
328 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Buffered metrics service for batching metric writes to the database.
4This service accumulates metrics in memory and flushes them to the database
5periodically, reducing DB write pressure under high load.
7Copyright 2025
8SPDX-License-Identifier: Apache-2.0
9"""
11# Standard
12import asyncio
13from collections import deque
14from dataclasses import dataclass
15from datetime import datetime, timezone
16import logging
17import threading
18import time
19from typing import Deque, Optional
21# First-Party
22from mcpgateway.config import settings
23from mcpgateway.db import A2AAgentMetric, fresh_db_session, PromptMetric, ResourceMetric, ServerMetric, ToolMetric
25logger = logging.getLogger(__name__)
28@dataclass
29class BufferedToolMetric:
30 """Buffered tool metric entry."""
32 tool_id: str
33 timestamp: datetime
34 response_time: float
35 is_success: bool
36 error_message: Optional[str] = None
39@dataclass
40class BufferedResourceMetric:
41 """Buffered resource metric entry."""
43 resource_id: str
44 timestamp: datetime
45 response_time: float
46 is_success: bool
47 error_message: Optional[str] = None
50@dataclass
51class BufferedPromptMetric:
52 """Buffered prompt metric entry."""
54 prompt_id: str
55 timestamp: datetime
56 response_time: float
57 is_success: bool
58 error_message: Optional[str] = None
61@dataclass
62class BufferedServerMetric:
63 """Buffered server metric entry."""
65 server_id: str
66 timestamp: datetime
67 response_time: float
68 is_success: bool
69 error_message: Optional[str] = None
72@dataclass
73class BufferedA2AAgentMetric:
74 """Buffered A2A agent metric entry."""
76 a2a_agent_id: str
77 timestamp: datetime
78 response_time: float
79 is_success: bool
80 interaction_type: str = "invoke"
81 error_message: Optional[str] = None
84class MetricsBufferService:
85 """Service for buffering and batching metrics writes to the database.
87 This service provides:
88 - Thread-safe buffering of tool, resource, prompt, server, and A2A agent metrics
89 - Periodic flushing to database (configurable interval)
90 - Graceful shutdown with final flush
92 Configuration (via environment variables):
93 - METRICS_BUFFER_ENABLED: Enable buffered metrics (default: True)
94 - METRICS_BUFFER_FLUSH_INTERVAL: Seconds between flushes (default: 60)
95 - METRICS_BUFFER_MAX_SIZE: Max entries before forced flush (default: 1000)
96 """
98 def __init__(
99 self,
100 flush_interval: Optional[int] = None,
101 max_buffer_size: Optional[int] = None,
102 enabled: Optional[bool] = None,
103 ):
104 """Initialize the metrics buffer service.
106 Args:
107 flush_interval: Seconds between automatic flushes (default: from settings or 60)
108 max_buffer_size: Maximum buffer entries before forced flush (default: from settings or 1000)
109 enabled: Whether buffering is enabled (default: from settings or True)
110 """
111 self.flush_interval = flush_interval or getattr(settings, "metrics_buffer_flush_interval", 60)
112 self.max_buffer_size = max_buffer_size or getattr(settings, "metrics_buffer_max_size", 1000)
113 self.enabled = enabled if enabled is not None else getattr(settings, "metrics_buffer_enabled", True)
114 self.recording_enabled = getattr(settings, "db_metrics_recording_enabled", True)
116 # Thread-safe buffers using deque with locks
117 self._tool_metrics: Deque[BufferedToolMetric] = deque()
118 self._resource_metrics: Deque[BufferedResourceMetric] = deque()
119 self._prompt_metrics: Deque[BufferedPromptMetric] = deque()
120 self._server_metrics: Deque[BufferedServerMetric] = deque()
121 self._a2a_agent_metrics: Deque[BufferedA2AAgentMetric] = deque()
122 self._lock = threading.Lock()
124 # Background flush task
125 self._flush_task: Optional[asyncio.Task] = None
126 self._shutdown_event = asyncio.Event()
128 # Stats for monitoring
129 self._total_buffered = 0
130 self._total_flushed = 0
131 self._flush_count = 0
133 logger.info(
134 f"MetricsBufferService initialized: recording_enabled={self.recording_enabled}, "
135 f"buffer_enabled={self.enabled}, flush_interval={self.flush_interval}s, max_buffer_size={self.max_buffer_size}"
136 )
138 async def start(self) -> None:
139 """Start the background flush task."""
140 if not self.recording_enabled:
141 logger.info("MetricsBufferService: recording disabled, skipping flush loop")
142 return
143 if not self.enabled:
144 logger.info("MetricsBufferService disabled, skipping start")
145 return
147 current_loop = asyncio.get_running_loop()
148 if not self._flush_task_is_active_for_loop(current_loop):
149 self._shutdown_event.clear()
150 self._flush_task = asyncio.create_task(self._flush_loop())
151 logger.info("MetricsBufferService flush task started")
153 def _flush_task_is_active_for_loop(self, loop: asyncio.AbstractEventLoop) -> bool:
154 """Return whether the current flush task is usable for the active loop.
156 Args:
157 loop: The currently running event loop for this worker process.
159 Returns:
160 True when the cached flush task belongs to the current live loop.
161 """
162 task = self._flush_task
163 if task is None or task.done() or task.cancelled():
164 return False
166 try:
167 task_loop = task.get_loop()
168 except (AttributeError, RuntimeError):
169 return False
171 return task_loop is loop and task_loop.is_running()
173 def _ensure_flush_task_started(self) -> None:
174 """Best-effort lazy-start for the background flush task.
176 In preloaded multi-worker deployments, the singleton may exist in a
177 worker before the worker's startup hook has started the flush loop. A
178 first buffered metric should recover from that state instead of sitting
179 in memory forever.
180 """
181 if not self.recording_enabled or not self.enabled:
182 return
184 try:
185 loop = asyncio.get_running_loop()
186 except RuntimeError:
187 return
189 if self._flush_task_is_active_for_loop(loop):
190 return
192 if self._shutdown_event.is_set():
193 self._shutdown_event = asyncio.Event()
195 self._flush_task = loop.create_task(self._flush_loop())
196 logger.info("MetricsBufferService flush task started lazily")
198 async def shutdown(self) -> None:
199 """Shutdown service with final flush."""
200 logger.info("MetricsBufferService shutting down...")
202 # Signal shutdown
203 self._shutdown_event.set()
205 # Cancel the flush task
206 if self._flush_task:
207 self._flush_task.cancel()
208 try:
209 await self._flush_task
210 except asyncio.CancelledError:
211 pass
213 # Final flush to persist any remaining metrics
214 await self._flush_all()
216 logger.info(f"MetricsBufferService shutdown complete: " f"total_buffered={self._total_buffered}, total_flushed={self._total_flushed}, " f"flush_count={self._flush_count}")
218 def record_tool_metric(
219 self,
220 tool_id: str,
221 start_time: float,
222 success: bool,
223 error_message: Optional[str] = None,
224 ) -> None:
225 """Buffer a tool metric for later flush.
227 Args:
228 tool_id: The UUID string of the tool.
229 start_time: The monotonic start time of the invocation.
230 success: True if the invocation succeeded.
231 error_message: Error message if failed.
232 """
233 if not self.recording_enabled:
234 return # Execution metrics recording disabled
235 if not self.enabled:
236 # Fall back to immediate write
237 self._write_tool_metric_immediately(tool_id, start_time, success, error_message)
238 return
240 metric = BufferedToolMetric(
241 tool_id=tool_id,
242 timestamp=datetime.now(timezone.utc),
243 response_time=time.monotonic() - start_time,
244 is_success=success,
245 error_message=error_message,
246 )
248 self._ensure_flush_task_started()
249 with self._lock:
250 self._tool_metrics.append(metric)
251 self._total_buffered += 1
253 def record_tool_metric_with_duration(
254 self,
255 tool_id: str,
256 response_time: float,
257 success: bool,
258 error_message: Optional[str] = None,
259 ) -> None:
260 """Buffer a tool metric with pre-calculated response time.
262 Args:
263 tool_id: UUID of the tool.
264 response_time: Pre-calculated response time in seconds.
265 success: Whether the operation succeeded.
266 error_message: Optional error message if failed.
267 """
268 if not self.recording_enabled:
269 return # Execution metrics recording disabled
270 if not self.enabled:
271 self._write_tool_metric_with_duration_immediately(tool_id, response_time, success, error_message)
272 return
274 metric = BufferedToolMetric(
275 tool_id=tool_id,
276 timestamp=datetime.now(timezone.utc),
277 response_time=response_time,
278 is_success=success,
279 error_message=error_message,
280 )
282 self._ensure_flush_task_started()
283 with self._lock:
284 self._tool_metrics.append(metric)
285 self._total_buffered += 1
287 def record_resource_metric(
288 self,
289 resource_id: str,
290 start_time: float,
291 success: bool,
292 error_message: Optional[str] = None,
293 ) -> None:
294 """Buffer a resource metric for later flush.
296 Args:
297 resource_id: UUID of the resource.
298 start_time: Monotonic start time for response_time calculation.
299 success: Whether the operation succeeded.
300 error_message: Optional error message if failed.
301 """
302 if not self.recording_enabled:
303 return # Execution metrics recording disabled
304 if not self.enabled:
305 self._write_resource_metric_immediately(resource_id, start_time, success, error_message)
306 return
308 metric = BufferedResourceMetric(
309 resource_id=resource_id,
310 timestamp=datetime.now(timezone.utc),
311 response_time=time.monotonic() - start_time,
312 is_success=success,
313 error_message=error_message,
314 )
316 self._ensure_flush_task_started()
317 with self._lock:
318 self._resource_metrics.append(metric)
319 self._total_buffered += 1
321 def record_prompt_metric(
322 self,
323 prompt_id: str,
324 start_time: float,
325 success: bool,
326 error_message: Optional[str] = None,
327 ) -> None:
328 """Buffer a prompt metric for later flush.
330 Args:
331 prompt_id: UUID of the prompt.
332 start_time: Monotonic start time for response_time calculation.
333 success: Whether the operation succeeded.
334 error_message: Optional error message if failed.
335 """
336 if not self.recording_enabled:
337 return # Execution metrics recording disabled
338 if not self.enabled:
339 self._write_prompt_metric_immediately(prompt_id, start_time, success, error_message)
340 return
342 metric = BufferedPromptMetric(
343 prompt_id=prompt_id,
344 timestamp=datetime.now(timezone.utc),
345 response_time=time.monotonic() - start_time,
346 is_success=success,
347 error_message=error_message,
348 )
350 self._ensure_flush_task_started()
351 with self._lock:
352 self._prompt_metrics.append(metric)
353 self._total_buffered += 1
355 def record_server_metric(
356 self,
357 server_id: str,
358 start_time: float,
359 success: bool,
360 error_message: Optional[str] = None,
361 ) -> None:
362 """Buffer a server metric for later flush.
364 Args:
365 server_id: UUID of the server.
366 start_time: Monotonic start time for response_time calculation.
367 success: Whether the operation succeeded.
368 error_message: Optional error message if failed.
369 """
370 if not self.recording_enabled:
371 return # Execution metrics recording disabled
372 if not self.enabled:
373 self._write_server_metric_immediately(server_id, start_time, success, error_message)
374 return
376 metric = BufferedServerMetric(
377 server_id=server_id,
378 timestamp=datetime.now(timezone.utc),
379 response_time=time.monotonic() - start_time,
380 is_success=success,
381 error_message=error_message,
382 )
384 self._ensure_flush_task_started()
385 with self._lock:
386 self._server_metrics.append(metric)
387 self._total_buffered += 1
389 def record_server_metric_with_duration(
390 self,
391 server_id: str,
392 response_time: float,
393 success: bool,
394 error_message: Optional[str] = None,
395 ) -> None:
396 """Buffer a server metric with pre-calculated response time.
398 Args:
399 server_id: UUID of the server.
400 response_time: Pre-calculated response time in seconds.
401 success: Whether the operation succeeded.
402 error_message: Optional error message if failed.
403 """
404 if not self.recording_enabled:
405 return # Execution metrics recording disabled
406 if not self.enabled:
407 self._write_server_metric_with_duration_immediately(server_id, response_time, success, error_message)
408 return
410 metric = BufferedServerMetric(
411 server_id=server_id,
412 timestamp=datetime.now(timezone.utc),
413 response_time=response_time,
414 is_success=success,
415 error_message=error_message,
416 )
418 self._ensure_flush_task_started()
419 with self._lock:
420 self._server_metrics.append(metric)
421 self._total_buffered += 1
423 def record_a2a_agent_metric(
424 self,
425 a2a_agent_id: str,
426 start_time: float,
427 success: bool,
428 interaction_type: str = "invoke",
429 error_message: Optional[str] = None,
430 ) -> None:
431 """Buffer an A2A agent metric for later flush.
433 Args:
434 a2a_agent_id: UUID of the A2A agent.
435 start_time: Monotonic start time for response_time calculation.
436 success: Whether the operation succeeded.
437 interaction_type: Type of interaction (e.g., "invoke").
438 error_message: Optional error message if failed.
439 """
440 if not self.recording_enabled:
441 return # Execution metrics recording disabled
442 if not self.enabled:
443 self._write_a2a_agent_metric_immediately(a2a_agent_id, start_time, success, interaction_type, error_message)
444 return
446 metric = BufferedA2AAgentMetric(
447 a2a_agent_id=a2a_agent_id,
448 timestamp=datetime.now(timezone.utc),
449 response_time=time.monotonic() - start_time,
450 is_success=success,
451 interaction_type=interaction_type,
452 error_message=error_message,
453 )
455 self._ensure_flush_task_started()
456 with self._lock:
457 self._a2a_agent_metrics.append(metric)
458 self._total_buffered += 1
460 def record_a2a_agent_metric_with_duration(
461 self,
462 a2a_agent_id: str,
463 response_time: float,
464 success: bool,
465 interaction_type: str = "invoke",
466 error_message: Optional[str] = None,
467 ) -> None:
468 """Buffer an A2A agent metric with pre-calculated response time.
470 Args:
471 a2a_agent_id: UUID of the A2A agent.
472 response_time: Pre-calculated response time in seconds.
473 success: Whether the operation succeeded.
474 interaction_type: Type of interaction (e.g., "invoke").
475 error_message: Optional error message if failed.
476 """
477 if not self.recording_enabled:
478 return # Execution metrics recording disabled
479 if not self.enabled:
480 self._write_a2a_agent_metric_with_duration_immediately(a2a_agent_id, response_time, success, interaction_type, error_message)
481 return
483 metric = BufferedA2AAgentMetric(
484 a2a_agent_id=a2a_agent_id,
485 timestamp=datetime.now(timezone.utc),
486 response_time=response_time,
487 is_success=success,
488 interaction_type=interaction_type,
489 error_message=error_message,
490 )
492 self._ensure_flush_task_started()
493 with self._lock:
494 self._a2a_agent_metrics.append(metric)
495 self._total_buffered += 1
497 async def _flush_loop(self) -> None:
498 """Background task that periodically flushes buffered metrics.
500 Raises:
501 asyncio.CancelledError: When the flush loop is cancelled.
502 """
503 logger.info(f"Metrics flush loop started (interval={self.flush_interval}s)")
505 while not self._shutdown_event.is_set():
506 try:
507 # Wait for flush interval or shutdown
508 try:
509 await asyncio.wait_for(
510 self._shutdown_event.wait(),
511 timeout=self.flush_interval,
512 )
513 # Shutdown signaled
514 break
515 except asyncio.TimeoutError:
516 # Normal timeout, proceed to flush
517 pass
519 await self._flush_all()
521 except asyncio.CancelledError:
522 logger.debug("Flush loop cancelled")
523 raise
524 except Exception as e:
525 logger.error(f"Error in metrics flush loop: {e}", exc_info=True)
526 # Continue the loop despite errors
527 await asyncio.sleep(5)
529 async def _flush_all(self) -> None:
530 """Flush all buffered metrics to the database."""
531 # Swap out buffers atomically
532 with self._lock:
533 tool_metrics = list(self._tool_metrics)
534 resource_metrics = list(self._resource_metrics)
535 prompt_metrics = list(self._prompt_metrics)
536 server_metrics = list(self._server_metrics)
537 a2a_agent_metrics = list(self._a2a_agent_metrics)
538 self._tool_metrics.clear()
539 self._resource_metrics.clear()
540 self._prompt_metrics.clear()
541 self._server_metrics.clear()
542 self._a2a_agent_metrics.clear()
544 total = len(tool_metrics) + len(resource_metrics) + len(prompt_metrics) + len(server_metrics) + len(a2a_agent_metrics)
545 if total == 0:
546 return
548 logger.debug(
549 f"Flushing {total} metrics: "
550 f"tools={len(tool_metrics)}, resources={len(resource_metrics)}, prompts={len(prompt_metrics)}, "
551 f"servers={len(server_metrics)}, a2a_agents={len(a2a_agent_metrics)}"
552 )
554 # Flush in thread to avoid blocking event loop
555 await asyncio.to_thread(
556 self._flush_to_db,
557 tool_metrics,
558 resource_metrics,
559 prompt_metrics,
560 server_metrics,
561 a2a_agent_metrics,
562 )
564 self._total_flushed += total
565 self._flush_count += 1
567 logger.info(
568 f"Metrics flush #{self._flush_count}: wrote {total} records "
569 f"(tools={len(tool_metrics)}, resources={len(resource_metrics)}, prompts={len(prompt_metrics)}, "
570 f"servers={len(server_metrics)}, a2a={len(a2a_agent_metrics)})"
571 )
573 def _flush_to_db(
574 self,
575 tool_metrics: list[BufferedToolMetric],
576 resource_metrics: list[BufferedResourceMetric],
577 prompt_metrics: list[BufferedPromptMetric],
578 server_metrics: list[BufferedServerMetric],
579 a2a_agent_metrics: list[BufferedA2AAgentMetric],
580 ) -> None:
581 """Write buffered metrics to database (runs in thread).
583 Args:
584 tool_metrics: List of buffered tool metrics to write.
585 resource_metrics: List of buffered resource metrics to write.
586 prompt_metrics: List of buffered prompt metrics to write.
587 server_metrics: List of buffered server metrics to write.
588 a2a_agent_metrics: List of buffered A2A agent metrics to write.
589 """
590 try:
591 with fresh_db_session() as db:
592 # Bulk insert tool metrics
593 if tool_metrics:
594 db.bulk_insert_mappings(
595 ToolMetric,
596 [
597 {
598 "tool_id": m.tool_id,
599 "timestamp": m.timestamp,
600 "response_time": m.response_time,
601 "is_success": m.is_success,
602 "error_message": m.error_message,
603 }
604 for m in tool_metrics
605 ],
606 )
608 # Bulk insert resource metrics
609 if resource_metrics:
610 db.bulk_insert_mappings(
611 ResourceMetric,
612 [
613 {
614 "resource_id": m.resource_id,
615 "timestamp": m.timestamp,
616 "response_time": m.response_time,
617 "is_success": m.is_success,
618 "error_message": m.error_message,
619 }
620 for m in resource_metrics
621 ],
622 )
624 # Bulk insert prompt metrics
625 if prompt_metrics:
626 db.bulk_insert_mappings(
627 PromptMetric,
628 [
629 {
630 "prompt_id": m.prompt_id,
631 "timestamp": m.timestamp,
632 "response_time": m.response_time,
633 "is_success": m.is_success,
634 "error_message": m.error_message,
635 }
636 for m in prompt_metrics
637 ],
638 )
640 # Bulk insert A2A agent metrics
641 if a2a_agent_metrics:
642 db.bulk_insert_mappings(
643 A2AAgentMetric,
644 [
645 {
646 "a2a_agent_id": m.a2a_agent_id,
647 "timestamp": m.timestamp,
648 "response_time": m.response_time,
649 "is_success": m.is_success,
650 "interaction_type": m.interaction_type,
651 "error_message": m.error_message,
652 }
653 for m in a2a_agent_metrics
654 ],
655 )
657 db.commit()
659 except Exception as e:
660 logger.error(f"Failed to flush metrics to database: {e}", exc_info=True)
661 # Metrics are lost on failure - acceptable trade-off for performance
662 # Could implement retry queue if needed
664 # Flush server metrics in a separate transaction so that an invalid
665 # server_id (FK violation) does not roll back tool/resource/prompt/a2a
666 # metrics. server_id can originate from untrusted headers (X-Server-ID)
667 # in admin API paths, so it may reference a nonexistent server.
668 if server_metrics:
669 try:
670 with fresh_db_session() as db:
671 db.bulk_insert_mappings(
672 ServerMetric,
673 [
674 {
675 "server_id": m.server_id,
676 "timestamp": m.timestamp,
677 "response_time": m.response_time,
678 "is_success": m.is_success,
679 "error_message": m.error_message,
680 }
681 for m in server_metrics
682 ],
683 )
684 db.commit()
685 except Exception as e:
686 logger.error(f"Failed to flush server metrics to database: {e}", exc_info=True)
688 def _write_tool_metric_immediately(
689 self,
690 tool_id: str,
691 start_time: float,
692 success: bool,
693 error_message: Optional[str],
694 ) -> None:
695 """Write a single tool metric immediately (fallback when buffering disabled).
697 Args:
698 tool_id: UUID of the tool.
699 start_time: Monotonic start time for response_time calculation.
700 success: Whether the operation succeeded.
701 error_message: Optional error message if failed.
702 """
703 try:
704 with fresh_db_session() as db:
705 metric = ToolMetric(
706 tool_id=tool_id,
707 timestamp=datetime.now(timezone.utc),
708 response_time=time.monotonic() - start_time,
709 is_success=success,
710 error_message=error_message,
711 )
712 db.add(metric)
713 db.commit()
714 except Exception as e:
715 logger.error(f"Failed to write tool metric: {e}")
717 def _write_tool_metric_with_duration_immediately(
718 self,
719 tool_id: str,
720 response_time: float,
721 success: bool,
722 error_message: Optional[str],
723 ) -> None:
724 """Write a single tool metric with pre-calculated duration immediately.
726 Args:
727 tool_id: UUID of the tool.
728 response_time: Pre-calculated response time in seconds.
729 success: Whether the operation succeeded.
730 error_message: Optional error message if failed.
731 """
732 try:
733 with fresh_db_session() as db:
734 metric = ToolMetric(
735 tool_id=tool_id,
736 timestamp=datetime.now(timezone.utc),
737 response_time=response_time,
738 is_success=success,
739 error_message=error_message,
740 )
741 db.add(metric)
742 db.commit()
743 except Exception as e:
744 logger.error(f"Failed to write tool metric: {e}")
746 def _write_resource_metric_immediately(
747 self,
748 resource_id: str,
749 start_time: float,
750 success: bool,
751 error_message: Optional[str],
752 ) -> None:
753 """Write a single resource metric immediately.
755 Args:
756 resource_id: UUID of the resource.
757 start_time: Monotonic start time for response_time calculation.
758 success: Whether the operation succeeded.
759 error_message: Optional error message if failed.
760 """
761 try:
762 with fresh_db_session() as db:
763 metric = ResourceMetric(
764 resource_id=resource_id,
765 timestamp=datetime.now(timezone.utc),
766 response_time=time.monotonic() - start_time,
767 is_success=success,
768 error_message=error_message,
769 )
770 db.add(metric)
771 db.commit()
772 except Exception as e:
773 logger.error(f"Failed to write resource metric: {e}")
775 def _write_prompt_metric_immediately(
776 self,
777 prompt_id: str,
778 start_time: float,
779 success: bool,
780 error_message: Optional[str],
781 ) -> None:
782 """Write a single prompt metric immediately.
784 Args:
785 prompt_id: UUID of the prompt.
786 start_time: Monotonic start time for response_time calculation.
787 success: Whether the operation succeeded.
788 error_message: Optional error message if failed.
789 """
790 try:
791 with fresh_db_session() as db:
792 metric = PromptMetric(
793 prompt_id=prompt_id,
794 timestamp=datetime.now(timezone.utc),
795 response_time=time.monotonic() - start_time,
796 is_success=success,
797 error_message=error_message,
798 )
799 db.add(metric)
800 db.commit()
801 except Exception as e:
802 logger.error(f"Failed to write prompt metric: {e}")
804 def _write_server_metric_immediately(
805 self,
806 server_id: str,
807 start_time: float,
808 success: bool,
809 error_message: Optional[str],
810 ) -> None:
811 """Write a single server metric immediately.
813 Args:
814 server_id: UUID of the server.
815 start_time: Monotonic start time for response_time calculation.
816 success: Whether the operation succeeded.
817 error_message: Optional error message if failed.
818 """
819 try:
820 with fresh_db_session() as db:
821 metric = ServerMetric(
822 server_id=server_id,
823 timestamp=datetime.now(timezone.utc),
824 response_time=time.monotonic() - start_time,
825 is_success=success,
826 error_message=error_message,
827 )
828 db.add(metric)
829 db.commit()
830 except Exception as e:
831 logger.error(f"Failed to write server metric: {e}")
833 def _write_server_metric_with_duration_immediately(
834 self,
835 server_id: str,
836 response_time: float,
837 success: bool,
838 error_message: Optional[str],
839 ) -> None:
840 """Write a single server metric with pre-calculated duration immediately.
842 Args:
843 server_id: UUID of the server.
844 response_time: Pre-calculated response time in seconds.
845 success: Whether the operation succeeded.
846 error_message: Optional error message if failed.
847 """
848 try:
849 with fresh_db_session() as db:
850 metric = ServerMetric(
851 server_id=server_id,
852 timestamp=datetime.now(timezone.utc),
853 response_time=response_time,
854 is_success=success,
855 error_message=error_message,
856 )
857 db.add(metric)
858 db.commit()
859 except Exception as e:
860 logger.error(f"Failed to write server metric: {e}")
862 def _write_a2a_agent_metric_immediately(
863 self,
864 a2a_agent_id: str,
865 start_time: float,
866 success: bool,
867 interaction_type: str,
868 error_message: Optional[str],
869 ) -> None:
870 """Write a single A2A agent metric immediately.
872 Args:
873 a2a_agent_id: UUID of the A2A agent.
874 start_time: Monotonic start time for response_time calculation.
875 success: Whether the operation succeeded.
876 interaction_type: Type of interaction (e.g., "invoke").
877 error_message: Optional error message if failed.
878 """
879 try:
880 with fresh_db_session() as db:
881 metric = A2AAgentMetric(
882 a2a_agent_id=a2a_agent_id,
883 timestamp=datetime.now(timezone.utc),
884 response_time=time.monotonic() - start_time,
885 is_success=success,
886 interaction_type=interaction_type,
887 error_message=error_message,
888 )
889 db.add(metric)
890 db.commit()
891 except Exception as e:
892 logger.error(f"Failed to write A2A agent metric: {e}")
894 def _write_a2a_agent_metric_with_duration_immediately(
895 self,
896 a2a_agent_id: str,
897 response_time: float,
898 success: bool,
899 interaction_type: str,
900 error_message: Optional[str],
901 ) -> None:
902 """Write a single A2A agent metric with pre-calculated duration immediately.
904 Args:
905 a2a_agent_id: UUID of the A2A agent.
906 response_time: Pre-calculated response time in seconds.
907 success: Whether the operation succeeded.
908 interaction_type: Type of interaction (e.g., "invoke").
909 error_message: Optional error message if failed.
910 """
911 try:
912 with fresh_db_session() as db:
913 metric = A2AAgentMetric(
914 a2a_agent_id=a2a_agent_id,
915 timestamp=datetime.now(timezone.utc),
916 response_time=response_time,
917 is_success=success,
918 interaction_type=interaction_type,
919 error_message=error_message,
920 )
921 db.add(metric)
922 db.commit()
923 except Exception as e:
924 logger.error(f"Failed to write A2A agent metric: {e}")
926 def get_stats(self) -> dict:
927 """Get buffer statistics for monitoring.
929 Returns:
930 dict: Buffer statistics including enabled state, sizes, and counts.
931 """
932 with self._lock:
933 current_size = len(self._tool_metrics) + len(self._resource_metrics) + len(self._prompt_metrics) + len(self._server_metrics) + len(self._a2a_agent_metrics)
935 return {
936 "recording_enabled": self.recording_enabled,
937 "enabled": self.enabled,
938 "flush_interval": self.flush_interval,
939 "max_buffer_size": self.max_buffer_size,
940 "current_buffer_size": current_size,
941 "total_buffered": self._total_buffered,
942 "total_flushed": self._total_flushed,
943 "flush_count": self._flush_count,
944 }
947# Singleton instance
948_metrics_buffer_service: Optional[MetricsBufferService] = None
951def get_metrics_buffer_service() -> MetricsBufferService:
952 """Get or create the singleton MetricsBufferService instance.
954 Returns:
955 MetricsBufferService: The singleton metrics buffer service instance.
956 """
957 global _metrics_buffer_service # pylint: disable=global-statement
958 if _metrics_buffer_service is None:
959 _metrics_buffer_service = MetricsBufferService()
960 return _metrics_buffer_service