Coverage for mcpgateway / services / metrics_buffer_service.py: 99%
256 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Buffered metrics service for batching metric writes to the database.
4This service accumulates metrics in memory and flushes them to the database
5periodically, reducing DB write pressure under high load.
7Copyright 2025
8SPDX-License-Identifier: Apache-2.0
9"""
11# Standard
12import asyncio
13from collections import deque
14from dataclasses import dataclass
15from datetime import datetime, timezone
16import logging
17import threading
18import time
19from typing import Deque, Optional
21# First-Party
22from mcpgateway.config import settings
23from mcpgateway.db import A2AAgentMetric, fresh_db_session, PromptMetric, ResourceMetric, ServerMetric, ToolMetric
25logger = logging.getLogger(__name__)
28@dataclass
29class BufferedToolMetric:
30 """Buffered tool metric entry."""
32 tool_id: str
33 timestamp: datetime
34 response_time: float
35 is_success: bool
36 error_message: Optional[str] = None
39@dataclass
40class BufferedResourceMetric:
41 """Buffered resource metric entry."""
43 resource_id: str
44 timestamp: datetime
45 response_time: float
46 is_success: bool
47 error_message: Optional[str] = None
50@dataclass
51class BufferedPromptMetric:
52 """Buffered prompt metric entry."""
54 prompt_id: str
55 timestamp: datetime
56 response_time: float
57 is_success: bool
58 error_message: Optional[str] = None
61@dataclass
62class BufferedServerMetric:
63 """Buffered server metric entry."""
65 server_id: str
66 timestamp: datetime
67 response_time: float
68 is_success: bool
69 error_message: Optional[str] = None
72@dataclass
73class BufferedA2AAgentMetric:
74 """Buffered A2A agent metric entry."""
76 a2a_agent_id: str
77 timestamp: datetime
78 response_time: float
79 is_success: bool
80 interaction_type: str = "invoke"
81 error_message: Optional[str] = None
84class MetricsBufferService:
85 """Service for buffering and batching metrics writes to the database.
87 This service provides:
88 - Thread-safe buffering of tool, resource, prompt, server, and A2A agent metrics
89 - Periodic flushing to database (configurable interval)
90 - Graceful shutdown with final flush
92 Configuration (via environment variables):
93 - METRICS_BUFFER_ENABLED: Enable buffered metrics (default: True)
94 - METRICS_BUFFER_FLUSH_INTERVAL: Seconds between flushes (default: 60)
95 - METRICS_BUFFER_MAX_SIZE: Max entries before forced flush (default: 1000)
96 """
98 def __init__(
99 self,
100 flush_interval: Optional[int] = None,
101 max_buffer_size: Optional[int] = None,
102 enabled: Optional[bool] = None,
103 ):
104 """Initialize the metrics buffer service.
106 Args:
107 flush_interval: Seconds between automatic flushes (default: from settings or 60)
108 max_buffer_size: Maximum buffer entries before forced flush (default: from settings or 1000)
109 enabled: Whether buffering is enabled (default: from settings or True)
110 """
111 self.flush_interval = flush_interval or getattr(settings, "metrics_buffer_flush_interval", 60)
112 self.max_buffer_size = max_buffer_size or getattr(settings, "metrics_buffer_max_size", 1000)
113 self.enabled = enabled if enabled is not None else getattr(settings, "metrics_buffer_enabled", True)
114 self.recording_enabled = getattr(settings, "db_metrics_recording_enabled", True)
116 # Thread-safe buffers using deque with locks
117 self._tool_metrics: Deque[BufferedToolMetric] = deque()
118 self._resource_metrics: Deque[BufferedResourceMetric] = deque()
119 self._prompt_metrics: Deque[BufferedPromptMetric] = deque()
120 self._server_metrics: Deque[BufferedServerMetric] = deque()
121 self._a2a_agent_metrics: Deque[BufferedA2AAgentMetric] = deque()
122 self._lock = threading.Lock()
124 # Background flush task
125 self._flush_task: Optional[asyncio.Task] = None
126 self._shutdown_event = asyncio.Event()
128 # Stats for monitoring
129 self._total_buffered = 0
130 self._total_flushed = 0
131 self._flush_count = 0
133 logger.info(
134 f"MetricsBufferService initialized: recording_enabled={self.recording_enabled}, "
135 f"buffer_enabled={self.enabled}, flush_interval={self.flush_interval}s, max_buffer_size={self.max_buffer_size}"
136 )
138 async def start(self) -> None:
139 """Start the background flush task."""
140 if not self.recording_enabled:
141 logger.info("MetricsBufferService: recording disabled, skipping flush loop")
142 return
143 if not self.enabled:
144 logger.info("MetricsBufferService disabled, skipping start")
145 return
147 if self._flush_task is None or self._flush_task.done(): 147 ↛ exitline 147 didn't return from function 'start' because the condition on line 147 was always true
148 self._shutdown_event.clear()
149 self._flush_task = asyncio.create_task(self._flush_loop())
150 logger.info("MetricsBufferService flush task started")
152 async def shutdown(self) -> None:
153 """Shutdown service with final flush."""
154 logger.info("MetricsBufferService shutting down...")
156 # Signal shutdown
157 self._shutdown_event.set()
159 # Cancel the flush task
160 if self._flush_task:
161 self._flush_task.cancel()
162 try:
163 await self._flush_task
164 except asyncio.CancelledError:
165 pass
167 # Final flush to persist any remaining metrics
168 await self._flush_all()
170 logger.info(f"MetricsBufferService shutdown complete: " f"total_buffered={self._total_buffered}, total_flushed={self._total_flushed}, " f"flush_count={self._flush_count}")
172 def record_tool_metric(
173 self,
174 tool_id: str,
175 start_time: float,
176 success: bool,
177 error_message: Optional[str] = None,
178 ) -> None:
179 """Buffer a tool metric for later flush.
181 Args:
182 tool_id: The UUID string of the tool.
183 start_time: The monotonic start time of the invocation.
184 success: True if the invocation succeeded.
185 error_message: Error message if failed.
186 """
187 if not self.recording_enabled:
188 return # Execution metrics recording disabled
189 if not self.enabled:
190 # Fall back to immediate write
191 self._write_tool_metric_immediately(tool_id, start_time, success, error_message)
192 return
194 metric = BufferedToolMetric(
195 tool_id=tool_id,
196 timestamp=datetime.now(timezone.utc),
197 response_time=time.monotonic() - start_time,
198 is_success=success,
199 error_message=error_message,
200 )
202 with self._lock:
203 self._tool_metrics.append(metric)
204 self._total_buffered += 1
206 def record_resource_metric(
207 self,
208 resource_id: str,
209 start_time: float,
210 success: bool,
211 error_message: Optional[str] = None,
212 ) -> None:
213 """Buffer a resource metric for later flush.
215 Args:
216 resource_id: UUID of the resource.
217 start_time: Monotonic start time for response_time calculation.
218 success: Whether the operation succeeded.
219 error_message: Optional error message if failed.
220 """
221 if not self.recording_enabled:
222 return # Execution metrics recording disabled
223 if not self.enabled:
224 self._write_resource_metric_immediately(resource_id, start_time, success, error_message)
225 return
227 metric = BufferedResourceMetric(
228 resource_id=resource_id,
229 timestamp=datetime.now(timezone.utc),
230 response_time=time.monotonic() - start_time,
231 is_success=success,
232 error_message=error_message,
233 )
235 with self._lock:
236 self._resource_metrics.append(metric)
237 self._total_buffered += 1
239 def record_prompt_metric(
240 self,
241 prompt_id: str,
242 start_time: float,
243 success: bool,
244 error_message: Optional[str] = None,
245 ) -> None:
246 """Buffer a prompt metric for later flush.
248 Args:
249 prompt_id: UUID of the prompt.
250 start_time: Monotonic start time for response_time calculation.
251 success: Whether the operation succeeded.
252 error_message: Optional error message if failed.
253 """
254 if not self.recording_enabled:
255 return # Execution metrics recording disabled
256 if not self.enabled:
257 self._write_prompt_metric_immediately(prompt_id, start_time, success, error_message)
258 return
260 metric = BufferedPromptMetric(
261 prompt_id=prompt_id,
262 timestamp=datetime.now(timezone.utc),
263 response_time=time.monotonic() - start_time,
264 is_success=success,
265 error_message=error_message,
266 )
268 with self._lock:
269 self._prompt_metrics.append(metric)
270 self._total_buffered += 1
272 def record_server_metric(
273 self,
274 server_id: str,
275 start_time: float,
276 success: bool,
277 error_message: Optional[str] = None,
278 ) -> None:
279 """Buffer a server metric for later flush.
281 Args:
282 server_id: UUID of the server.
283 start_time: Monotonic start time for response_time calculation.
284 success: Whether the operation succeeded.
285 error_message: Optional error message if failed.
286 """
287 if not self.recording_enabled:
288 return # Execution metrics recording disabled
289 if not self.enabled:
290 self._write_server_metric_immediately(server_id, start_time, success, error_message)
291 return
293 metric = BufferedServerMetric(
294 server_id=server_id,
295 timestamp=datetime.now(timezone.utc),
296 response_time=time.monotonic() - start_time,
297 is_success=success,
298 error_message=error_message,
299 )
301 with self._lock:
302 self._server_metrics.append(metric)
303 self._total_buffered += 1
305 def record_a2a_agent_metric(
306 self,
307 a2a_agent_id: str,
308 start_time: float,
309 success: bool,
310 interaction_type: str = "invoke",
311 error_message: Optional[str] = None,
312 ) -> None:
313 """Buffer an A2A agent metric for later flush.
315 Args:
316 a2a_agent_id: UUID of the A2A agent.
317 start_time: Monotonic start time for response_time calculation.
318 success: Whether the operation succeeded.
319 interaction_type: Type of interaction (e.g., "invoke").
320 error_message: Optional error message if failed.
321 """
322 if not self.recording_enabled:
323 return # Execution metrics recording disabled
324 if not self.enabled:
325 self._write_a2a_agent_metric_immediately(a2a_agent_id, start_time, success, interaction_type, error_message)
326 return
328 metric = BufferedA2AAgentMetric(
329 a2a_agent_id=a2a_agent_id,
330 timestamp=datetime.now(timezone.utc),
331 response_time=time.monotonic() - start_time,
332 is_success=success,
333 interaction_type=interaction_type,
334 error_message=error_message,
335 )
337 with self._lock:
338 self._a2a_agent_metrics.append(metric)
339 self._total_buffered += 1
341 def record_a2a_agent_metric_with_duration(
342 self,
343 a2a_agent_id: str,
344 response_time: float,
345 success: bool,
346 interaction_type: str = "invoke",
347 error_message: Optional[str] = None,
348 ) -> None:
349 """Buffer an A2A agent metric with pre-calculated response time.
351 Args:
352 a2a_agent_id: UUID of the A2A agent.
353 response_time: Pre-calculated response time in seconds.
354 success: Whether the operation succeeded.
355 interaction_type: Type of interaction (e.g., "invoke").
356 error_message: Optional error message if failed.
357 """
358 if not self.recording_enabled:
359 return # Execution metrics recording disabled
360 if not self.enabled:
361 self._write_a2a_agent_metric_with_duration_immediately(a2a_agent_id, response_time, success, interaction_type, error_message)
362 return
364 metric = BufferedA2AAgentMetric(
365 a2a_agent_id=a2a_agent_id,
366 timestamp=datetime.now(timezone.utc),
367 response_time=response_time,
368 is_success=success,
369 interaction_type=interaction_type,
370 error_message=error_message,
371 )
373 with self._lock:
374 self._a2a_agent_metrics.append(metric)
375 self._total_buffered += 1
377 async def _flush_loop(self) -> None:
378 """Background task that periodically flushes buffered metrics.
380 Raises:
381 asyncio.CancelledError: When the flush loop is cancelled.
382 """
383 logger.info(f"Metrics flush loop started (interval={self.flush_interval}s)")
385 while not self._shutdown_event.is_set():
386 try:
387 # Wait for flush interval or shutdown
388 try:
389 await asyncio.wait_for(
390 self._shutdown_event.wait(),
391 timeout=self.flush_interval,
392 )
393 # Shutdown signaled
394 break
395 except asyncio.TimeoutError:
396 # Normal timeout, proceed to flush
397 pass
399 await self._flush_all()
401 except asyncio.CancelledError:
402 logger.debug("Flush loop cancelled")
403 raise
404 except Exception as e:
405 logger.error(f"Error in metrics flush loop: {e}", exc_info=True)
406 # Continue the loop despite errors
407 await asyncio.sleep(5)
409 async def _flush_all(self) -> None:
410 """Flush all buffered metrics to the database."""
411 # Swap out buffers atomically
412 with self._lock:
413 tool_metrics = list(self._tool_metrics)
414 resource_metrics = list(self._resource_metrics)
415 prompt_metrics = list(self._prompt_metrics)
416 server_metrics = list(self._server_metrics)
417 a2a_agent_metrics = list(self._a2a_agent_metrics)
418 self._tool_metrics.clear()
419 self._resource_metrics.clear()
420 self._prompt_metrics.clear()
421 self._server_metrics.clear()
422 self._a2a_agent_metrics.clear()
424 total = len(tool_metrics) + len(resource_metrics) + len(prompt_metrics) + len(server_metrics) + len(a2a_agent_metrics)
425 if total == 0:
426 return
428 logger.debug(
429 f"Flushing {total} metrics: "
430 f"tools={len(tool_metrics)}, resources={len(resource_metrics)}, prompts={len(prompt_metrics)}, "
431 f"servers={len(server_metrics)}, a2a_agents={len(a2a_agent_metrics)}"
432 )
434 # Flush in thread to avoid blocking event loop
435 await asyncio.to_thread(
436 self._flush_to_db,
437 tool_metrics,
438 resource_metrics,
439 prompt_metrics,
440 server_metrics,
441 a2a_agent_metrics,
442 )
444 self._total_flushed += total
445 self._flush_count += 1
447 logger.info(
448 f"Metrics flush #{self._flush_count}: wrote {total} records "
449 f"(tools={len(tool_metrics)}, resources={len(resource_metrics)}, prompts={len(prompt_metrics)}, "
450 f"servers={len(server_metrics)}, a2a={len(a2a_agent_metrics)})"
451 )
453 def _flush_to_db(
454 self,
455 tool_metrics: list[BufferedToolMetric],
456 resource_metrics: list[BufferedResourceMetric],
457 prompt_metrics: list[BufferedPromptMetric],
458 server_metrics: list[BufferedServerMetric],
459 a2a_agent_metrics: list[BufferedA2AAgentMetric],
460 ) -> None:
461 """Write buffered metrics to database (runs in thread).
463 Args:
464 tool_metrics: List of buffered tool metrics to write.
465 resource_metrics: List of buffered resource metrics to write.
466 prompt_metrics: List of buffered prompt metrics to write.
467 server_metrics: List of buffered server metrics to write.
468 a2a_agent_metrics: List of buffered A2A agent metrics to write.
469 """
470 try:
471 with fresh_db_session() as db:
472 # Bulk insert tool metrics
473 if tool_metrics:
474 db.bulk_insert_mappings(
475 ToolMetric,
476 [
477 {
478 "tool_id": m.tool_id,
479 "timestamp": m.timestamp,
480 "response_time": m.response_time,
481 "is_success": m.is_success,
482 "error_message": m.error_message,
483 }
484 for m in tool_metrics
485 ],
486 )
488 # Bulk insert resource metrics
489 if resource_metrics: 489 ↛ 505line 489 didn't jump to line 505 because the condition on line 489 was always true
490 db.bulk_insert_mappings(
491 ResourceMetric,
492 [
493 {
494 "resource_id": m.resource_id,
495 "timestamp": m.timestamp,
496 "response_time": m.response_time,
497 "is_success": m.is_success,
498 "error_message": m.error_message,
499 }
500 for m in resource_metrics
501 ],
502 )
504 # Bulk insert prompt metrics
505 if prompt_metrics:
506 db.bulk_insert_mappings(
507 PromptMetric,
508 [
509 {
510 "prompt_id": m.prompt_id,
511 "timestamp": m.timestamp,
512 "response_time": m.response_time,
513 "is_success": m.is_success,
514 "error_message": m.error_message,
515 }
516 for m in prompt_metrics
517 ],
518 )
520 # Bulk insert server metrics
521 if server_metrics:
522 db.bulk_insert_mappings(
523 ServerMetric,
524 [
525 {
526 "server_id": m.server_id,
527 "timestamp": m.timestamp,
528 "response_time": m.response_time,
529 "is_success": m.is_success,
530 "error_message": m.error_message,
531 }
532 for m in server_metrics
533 ],
534 )
536 # Bulk insert A2A agent metrics
537 if a2a_agent_metrics:
538 db.bulk_insert_mappings(
539 A2AAgentMetric,
540 [
541 {
542 "a2a_agent_id": m.a2a_agent_id,
543 "timestamp": m.timestamp,
544 "response_time": m.response_time,
545 "is_success": m.is_success,
546 "interaction_type": m.interaction_type,
547 "error_message": m.error_message,
548 }
549 for m in a2a_agent_metrics
550 ],
551 )
553 db.commit()
555 except Exception as e:
556 logger.error(f"Failed to flush metrics to database: {e}", exc_info=True)
557 # Metrics are lost on failure - acceptable trade-off for performance
558 # Could implement retry queue if needed
560 def _write_tool_metric_immediately(
561 self,
562 tool_id: str,
563 start_time: float,
564 success: bool,
565 error_message: Optional[str],
566 ) -> None:
567 """Write a single tool metric immediately (fallback when buffering disabled).
569 Args:
570 tool_id: UUID of the tool.
571 start_time: Monotonic start time for response_time calculation.
572 success: Whether the operation succeeded.
573 error_message: Optional error message if failed.
574 """
575 try:
576 with fresh_db_session() as db:
577 metric = ToolMetric(
578 tool_id=tool_id,
579 timestamp=datetime.now(timezone.utc),
580 response_time=time.monotonic() - start_time,
581 is_success=success,
582 error_message=error_message,
583 )
584 db.add(metric)
585 db.commit()
586 except Exception as e:
587 logger.error(f"Failed to write tool metric: {e}")
589 def _write_resource_metric_immediately(
590 self,
591 resource_id: str,
592 start_time: float,
593 success: bool,
594 error_message: Optional[str],
595 ) -> None:
596 """Write a single resource metric immediately.
598 Args:
599 resource_id: UUID of the resource.
600 start_time: Monotonic start time for response_time calculation.
601 success: Whether the operation succeeded.
602 error_message: Optional error message if failed.
603 """
604 try:
605 with fresh_db_session() as db:
606 metric = ResourceMetric(
607 resource_id=resource_id,
608 timestamp=datetime.now(timezone.utc),
609 response_time=time.monotonic() - start_time,
610 is_success=success,
611 error_message=error_message,
612 )
613 db.add(metric)
614 db.commit()
615 except Exception as e:
616 logger.error(f"Failed to write resource metric: {e}")
618 def _write_prompt_metric_immediately(
619 self,
620 prompt_id: str,
621 start_time: float,
622 success: bool,
623 error_message: Optional[str],
624 ) -> None:
625 """Write a single prompt metric immediately.
627 Args:
628 prompt_id: UUID of the prompt.
629 start_time: Monotonic start time for response_time calculation.
630 success: Whether the operation succeeded.
631 error_message: Optional error message if failed.
632 """
633 try:
634 with fresh_db_session() as db:
635 metric = PromptMetric(
636 prompt_id=prompt_id,
637 timestamp=datetime.now(timezone.utc),
638 response_time=time.monotonic() - start_time,
639 is_success=success,
640 error_message=error_message,
641 )
642 db.add(metric)
643 db.commit()
644 except Exception as e:
645 logger.error(f"Failed to write prompt metric: {e}")
647 def _write_server_metric_immediately(
648 self,
649 server_id: str,
650 start_time: float,
651 success: bool,
652 error_message: Optional[str],
653 ) -> None:
654 """Write a single server metric immediately.
656 Args:
657 server_id: UUID of the server.
658 start_time: Monotonic start time for response_time calculation.
659 success: Whether the operation succeeded.
660 error_message: Optional error message if failed.
661 """
662 try:
663 with fresh_db_session() as db:
664 metric = ServerMetric(
665 server_id=server_id,
666 timestamp=datetime.now(timezone.utc),
667 response_time=time.monotonic() - start_time,
668 is_success=success,
669 error_message=error_message,
670 )
671 db.add(metric)
672 db.commit()
673 except Exception as e:
674 logger.error(f"Failed to write server metric: {e}")
676 def _write_a2a_agent_metric_immediately(
677 self,
678 a2a_agent_id: str,
679 start_time: float,
680 success: bool,
681 interaction_type: str,
682 error_message: Optional[str],
683 ) -> None:
684 """Write a single A2A agent metric immediately.
686 Args:
687 a2a_agent_id: UUID of the A2A agent.
688 start_time: Monotonic start time for response_time calculation.
689 success: Whether the operation succeeded.
690 interaction_type: Type of interaction (e.g., "invoke").
691 error_message: Optional error message if failed.
692 """
693 try:
694 with fresh_db_session() as db:
695 metric = A2AAgentMetric(
696 a2a_agent_id=a2a_agent_id,
697 timestamp=datetime.now(timezone.utc),
698 response_time=time.monotonic() - start_time,
699 is_success=success,
700 interaction_type=interaction_type,
701 error_message=error_message,
702 )
703 db.add(metric)
704 db.commit()
705 except Exception as e:
706 logger.error(f"Failed to write A2A agent metric: {e}")
708 def _write_a2a_agent_metric_with_duration_immediately(
709 self,
710 a2a_agent_id: str,
711 response_time: float,
712 success: bool,
713 interaction_type: str,
714 error_message: Optional[str],
715 ) -> None:
716 """Write a single A2A agent metric with pre-calculated duration immediately.
718 Args:
719 a2a_agent_id: UUID of the A2A agent.
720 response_time: Pre-calculated response time in seconds.
721 success: Whether the operation succeeded.
722 interaction_type: Type of interaction (e.g., "invoke").
723 error_message: Optional error message if failed.
724 """
725 try:
726 with fresh_db_session() as db:
727 metric = A2AAgentMetric(
728 a2a_agent_id=a2a_agent_id,
729 timestamp=datetime.now(timezone.utc),
730 response_time=response_time,
731 is_success=success,
732 interaction_type=interaction_type,
733 error_message=error_message,
734 )
735 db.add(metric)
736 db.commit()
737 except Exception as e:
738 logger.error(f"Failed to write A2A agent metric: {e}")
740 def get_stats(self) -> dict:
741 """Get buffer statistics for monitoring.
743 Returns:
744 dict: Buffer statistics including enabled state, sizes, and counts.
745 """
746 with self._lock:
747 current_size = len(self._tool_metrics) + len(self._resource_metrics) + len(self._prompt_metrics) + len(self._server_metrics) + len(self._a2a_agent_metrics)
749 return {
750 "recording_enabled": self.recording_enabled,
751 "enabled": self.enabled,
752 "flush_interval": self.flush_interval,
753 "max_buffer_size": self.max_buffer_size,
754 "current_buffer_size": current_size,
755 "total_buffered": self._total_buffered,
756 "total_flushed": self._total_flushed,
757 "flush_count": self._flush_count,
758 }
761# Singleton instance
762_metrics_buffer_service: Optional[MetricsBufferService] = None
765def get_metrics_buffer_service() -> MetricsBufferService:
766 """Get or create the singleton MetricsBufferService instance.
768 Returns:
769 MetricsBufferService: The singleton metrics buffer service instance.
770 """
771 global _metrics_buffer_service # pylint: disable=global-statement
772 if _metrics_buffer_service is None:
773 _metrics_buffer_service = MetricsBufferService()
774 return _metrics_buffer_service