Coverage for mcpgateway / services / metrics_buffer_service.py: 100%
261 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Buffered metrics service for batching metric writes to the database.
4This service accumulates metrics in memory and flushes them to the database
5periodically, reducing DB write pressure under high load.
7Copyright 2025
8SPDX-License-Identifier: Apache-2.0
9"""
11# Standard
12import asyncio
13from collections import deque
14from dataclasses import dataclass
15from datetime import datetime, timezone
16import logging
17import threading
18import time
19from typing import Deque, Optional
21# First-Party
22from mcpgateway.config import settings
23from mcpgateway.db import A2AAgentMetric, fresh_db_session, PromptMetric, ResourceMetric, ServerMetric, ToolMetric
25logger = logging.getLogger(__name__)
28@dataclass
29class BufferedToolMetric:
30 """Buffered tool metric entry."""
32 tool_id: str
33 timestamp: datetime
34 response_time: float
35 is_success: bool
36 error_message: Optional[str] = None
39@dataclass
40class BufferedResourceMetric:
41 """Buffered resource metric entry."""
43 resource_id: str
44 timestamp: datetime
45 response_time: float
46 is_success: bool
47 error_message: Optional[str] = None
50@dataclass
51class BufferedPromptMetric:
52 """Buffered prompt metric entry."""
54 prompt_id: str
55 timestamp: datetime
56 response_time: float
57 is_success: bool
58 error_message: Optional[str] = None
61@dataclass
62class BufferedServerMetric:
63 """Buffered server metric entry."""
65 server_id: str
66 timestamp: datetime
67 response_time: float
68 is_success: bool
69 error_message: Optional[str] = None
72@dataclass
73class BufferedA2AAgentMetric:
74 """Buffered A2A agent metric entry."""
76 a2a_agent_id: str
77 timestamp: datetime
78 response_time: float
79 is_success: bool
80 interaction_type: str = "invoke"
81 error_message: Optional[str] = None
84class MetricsBufferService:
85 """Service for buffering and batching metrics writes to the database.
87 This service provides:
88 - Thread-safe buffering of tool, resource, prompt, server, and A2A agent metrics
89 - Periodic flushing to database (configurable interval)
90 - Graceful shutdown with final flush
92 Configuration (via environment variables):
93 - METRICS_BUFFER_ENABLED: Enable buffered metrics (default: True)
94 - METRICS_BUFFER_FLUSH_INTERVAL: Seconds between flushes (default: 60)
95 - METRICS_BUFFER_MAX_SIZE: Max entries before forced flush (default: 1000)
96 """
98 def __init__(
99 self,
100 flush_interval: Optional[int] = None,
101 max_buffer_size: Optional[int] = None,
102 enabled: Optional[bool] = None,
103 ):
104 """Initialize the metrics buffer service.
106 Args:
107 flush_interval: Seconds between automatic flushes (default: from settings or 60)
108 max_buffer_size: Maximum buffer entries before forced flush (default: from settings or 1000)
109 enabled: Whether buffering is enabled (default: from settings or True)
110 """
111 self.flush_interval = flush_interval or getattr(settings, "metrics_buffer_flush_interval", 60)
112 self.max_buffer_size = max_buffer_size or getattr(settings, "metrics_buffer_max_size", 1000)
113 self.enabled = enabled if enabled is not None else getattr(settings, "metrics_buffer_enabled", True)
114 self.recording_enabled = getattr(settings, "db_metrics_recording_enabled", True)
116 # Thread-safe buffers using deque with locks
117 self._tool_metrics: Deque[BufferedToolMetric] = deque()
118 self._resource_metrics: Deque[BufferedResourceMetric] = deque()
119 self._prompt_metrics: Deque[BufferedPromptMetric] = deque()
120 self._server_metrics: Deque[BufferedServerMetric] = deque()
121 self._a2a_agent_metrics: Deque[BufferedA2AAgentMetric] = deque()
122 self._lock = threading.Lock()
124 # Background flush task
125 self._flush_task: Optional[asyncio.Task] = None
126 self._shutdown_event = asyncio.Event()
128 # Stats for monitoring
129 self._total_buffered = 0
130 self._total_flushed = 0
131 self._flush_count = 0
133 logger.info(
134 f"MetricsBufferService initialized: recording_enabled={self.recording_enabled}, "
135 f"buffer_enabled={self.enabled}, flush_interval={self.flush_interval}s, max_buffer_size={self.max_buffer_size}"
136 )
138 async def start(self) -> None:
139 """Start the background flush task."""
140 if not self.recording_enabled:
141 logger.info("MetricsBufferService: recording disabled, skipping flush loop")
142 return
143 if not self.enabled:
144 logger.info("MetricsBufferService disabled, skipping start")
145 return
147 if self._flush_task is None or self._flush_task.done():
148 self._shutdown_event.clear()
149 self._flush_task = asyncio.create_task(self._flush_loop())
150 logger.info("MetricsBufferService flush task started")
152 async def shutdown(self) -> None:
153 """Shutdown service with final flush."""
154 logger.info("MetricsBufferService shutting down...")
156 # Signal shutdown
157 self._shutdown_event.set()
159 # Cancel the flush task
160 if self._flush_task:
161 self._flush_task.cancel()
162 try:
163 await self._flush_task
164 except asyncio.CancelledError:
165 pass
167 # Final flush to persist any remaining metrics
168 await self._flush_all()
170 logger.info(f"MetricsBufferService shutdown complete: " f"total_buffered={self._total_buffered}, total_flushed={self._total_flushed}, " f"flush_count={self._flush_count}")
172 def record_tool_metric(
173 self,
174 tool_id: str,
175 start_time: float,
176 success: bool,
177 error_message: Optional[str] = None,
178 ) -> None:
179 """Buffer a tool metric for later flush.
181 Args:
182 tool_id: The UUID string of the tool.
183 start_time: The monotonic start time of the invocation.
184 success: True if the invocation succeeded.
185 error_message: Error message if failed.
186 """
187 if not self.recording_enabled:
188 return # Execution metrics recording disabled
189 if not self.enabled:
190 # Fall back to immediate write
191 self._write_tool_metric_immediately(tool_id, start_time, success, error_message)
192 return
194 metric = BufferedToolMetric(
195 tool_id=tool_id,
196 timestamp=datetime.now(timezone.utc),
197 response_time=time.monotonic() - start_time,
198 is_success=success,
199 error_message=error_message,
200 )
202 with self._lock:
203 self._tool_metrics.append(metric)
204 self._total_buffered += 1
206 def record_resource_metric(
207 self,
208 resource_id: str,
209 start_time: float,
210 success: bool,
211 error_message: Optional[str] = None,
212 ) -> None:
213 """Buffer a resource metric for later flush.
215 Args:
216 resource_id: UUID of the resource.
217 start_time: Monotonic start time for response_time calculation.
218 success: Whether the operation succeeded.
219 error_message: Optional error message if failed.
220 """
221 if not self.recording_enabled:
222 return # Execution metrics recording disabled
223 if not self.enabled:
224 self._write_resource_metric_immediately(resource_id, start_time, success, error_message)
225 return
227 metric = BufferedResourceMetric(
228 resource_id=resource_id,
229 timestamp=datetime.now(timezone.utc),
230 response_time=time.monotonic() - start_time,
231 is_success=success,
232 error_message=error_message,
233 )
235 with self._lock:
236 self._resource_metrics.append(metric)
237 self._total_buffered += 1
239 def record_prompt_metric(
240 self,
241 prompt_id: str,
242 start_time: float,
243 success: bool,
244 error_message: Optional[str] = None,
245 ) -> None:
246 """Buffer a prompt metric for later flush.
248 Args:
249 prompt_id: UUID of the prompt.
250 start_time: Monotonic start time for response_time calculation.
251 success: Whether the operation succeeded.
252 error_message: Optional error message if failed.
253 """
254 if not self.recording_enabled:
255 return # Execution metrics recording disabled
256 if not self.enabled:
257 self._write_prompt_metric_immediately(prompt_id, start_time, success, error_message)
258 return
260 metric = BufferedPromptMetric(
261 prompt_id=prompt_id,
262 timestamp=datetime.now(timezone.utc),
263 response_time=time.monotonic() - start_time,
264 is_success=success,
265 error_message=error_message,
266 )
268 with self._lock:
269 self._prompt_metrics.append(metric)
270 self._total_buffered += 1
272 def record_server_metric(
273 self,
274 server_id: str,
275 start_time: float,
276 success: bool,
277 error_message: Optional[str] = None,
278 ) -> None:
279 """Buffer a server metric for later flush.
281 Args:
282 server_id: UUID of the server.
283 start_time: Monotonic start time for response_time calculation.
284 success: Whether the operation succeeded.
285 error_message: Optional error message if failed.
286 """
287 if not self.recording_enabled:
288 return # Execution metrics recording disabled
289 if not self.enabled:
290 self._write_server_metric_immediately(server_id, start_time, success, error_message)
291 return
293 metric = BufferedServerMetric(
294 server_id=server_id,
295 timestamp=datetime.now(timezone.utc),
296 response_time=time.monotonic() - start_time,
297 is_success=success,
298 error_message=error_message,
299 )
301 with self._lock:
302 self._server_metrics.append(metric)
303 self._total_buffered += 1
305 def record_a2a_agent_metric(
306 self,
307 a2a_agent_id: str,
308 start_time: float,
309 success: bool,
310 interaction_type: str = "invoke",
311 error_message: Optional[str] = None,
312 ) -> None:
313 """Buffer an A2A agent metric for later flush.
315 Args:
316 a2a_agent_id: UUID of the A2A agent.
317 start_time: Monotonic start time for response_time calculation.
318 success: Whether the operation succeeded.
319 interaction_type: Type of interaction (e.g., "invoke").
320 error_message: Optional error message if failed.
321 """
322 if not self.recording_enabled:
323 return # Execution metrics recording disabled
324 if not self.enabled:
325 self._write_a2a_agent_metric_immediately(a2a_agent_id, start_time, success, interaction_type, error_message)
326 return
328 metric = BufferedA2AAgentMetric(
329 a2a_agent_id=a2a_agent_id,
330 timestamp=datetime.now(timezone.utc),
331 response_time=time.monotonic() - start_time,
332 is_success=success,
333 interaction_type=interaction_type,
334 error_message=error_message,
335 )
337 with self._lock:
338 self._a2a_agent_metrics.append(metric)
339 self._total_buffered += 1
341 def record_a2a_agent_metric_with_duration(
342 self,
343 a2a_agent_id: str,
344 response_time: float,
345 success: bool,
346 interaction_type: str = "invoke",
347 error_message: Optional[str] = None,
348 ) -> None:
349 """Buffer an A2A agent metric with pre-calculated response time.
351 Args:
352 a2a_agent_id: UUID of the A2A agent.
353 response_time: Pre-calculated response time in seconds.
354 success: Whether the operation succeeded.
355 interaction_type: Type of interaction (e.g., "invoke").
356 error_message: Optional error message if failed.
357 """
358 if not self.recording_enabled:
359 return # Execution metrics recording disabled
360 if not self.enabled:
361 self._write_a2a_agent_metric_with_duration_immediately(a2a_agent_id, response_time, success, interaction_type, error_message)
362 return
364 metric = BufferedA2AAgentMetric(
365 a2a_agent_id=a2a_agent_id,
366 timestamp=datetime.now(timezone.utc),
367 response_time=response_time,
368 is_success=success,
369 interaction_type=interaction_type,
370 error_message=error_message,
371 )
373 with self._lock:
374 self._a2a_agent_metrics.append(metric)
375 self._total_buffered += 1
377 async def _flush_loop(self) -> None:
378 """Background task that periodically flushes buffered metrics.
380 Raises:
381 asyncio.CancelledError: When the flush loop is cancelled.
382 """
383 logger.info(f"Metrics flush loop started (interval={self.flush_interval}s)")
385 while not self._shutdown_event.is_set():
386 try:
387 # Wait for flush interval or shutdown
388 try:
389 await asyncio.wait_for(
390 self._shutdown_event.wait(),
391 timeout=self.flush_interval,
392 )
393 # Shutdown signaled
394 break
395 except asyncio.TimeoutError:
396 # Normal timeout, proceed to flush
397 pass
399 await self._flush_all()
401 except asyncio.CancelledError:
402 logger.debug("Flush loop cancelled")
403 raise
404 except Exception as e:
405 logger.error(f"Error in metrics flush loop: {e}", exc_info=True)
406 # Continue the loop despite errors
407 await asyncio.sleep(5)
409 async def _flush_all(self) -> None:
410 """Flush all buffered metrics to the database."""
411 # Swap out buffers atomically
412 with self._lock:
413 tool_metrics = list(self._tool_metrics)
414 resource_metrics = list(self._resource_metrics)
415 prompt_metrics = list(self._prompt_metrics)
416 server_metrics = list(self._server_metrics)
417 a2a_agent_metrics = list(self._a2a_agent_metrics)
418 self._tool_metrics.clear()
419 self._resource_metrics.clear()
420 self._prompt_metrics.clear()
421 self._server_metrics.clear()
422 self._a2a_agent_metrics.clear()
424 total = len(tool_metrics) + len(resource_metrics) + len(prompt_metrics) + len(server_metrics) + len(a2a_agent_metrics)
425 if total == 0:
426 return
428 logger.debug(
429 f"Flushing {total} metrics: "
430 f"tools={len(tool_metrics)}, resources={len(resource_metrics)}, prompts={len(prompt_metrics)}, "
431 f"servers={len(server_metrics)}, a2a_agents={len(a2a_agent_metrics)}"
432 )
434 # Flush in thread to avoid blocking event loop
435 await asyncio.to_thread(
436 self._flush_to_db,
437 tool_metrics,
438 resource_metrics,
439 prompt_metrics,
440 server_metrics,
441 a2a_agent_metrics,
442 )
444 self._total_flushed += total
445 self._flush_count += 1
447 logger.info(
448 f"Metrics flush #{self._flush_count}: wrote {total} records "
449 f"(tools={len(tool_metrics)}, resources={len(resource_metrics)}, prompts={len(prompt_metrics)}, "
450 f"servers={len(server_metrics)}, a2a={len(a2a_agent_metrics)})"
451 )
453 def _flush_to_db(
454 self,
455 tool_metrics: list[BufferedToolMetric],
456 resource_metrics: list[BufferedResourceMetric],
457 prompt_metrics: list[BufferedPromptMetric],
458 server_metrics: list[BufferedServerMetric],
459 a2a_agent_metrics: list[BufferedA2AAgentMetric],
460 ) -> None:
461 """Write buffered metrics to database (runs in thread).
463 Args:
464 tool_metrics: List of buffered tool metrics to write.
465 resource_metrics: List of buffered resource metrics to write.
466 prompt_metrics: List of buffered prompt metrics to write.
467 server_metrics: List of buffered server metrics to write.
468 a2a_agent_metrics: List of buffered A2A agent metrics to write.
469 """
470 try:
471 with fresh_db_session() as db:
472 # Bulk insert tool metrics
473 if tool_metrics:
474 db.bulk_insert_mappings(
475 ToolMetric,
476 [
477 {
478 "tool_id": m.tool_id,
479 "timestamp": m.timestamp,
480 "response_time": m.response_time,
481 "is_success": m.is_success,
482 "error_message": m.error_message,
483 }
484 for m in tool_metrics
485 ],
486 )
488 # Bulk insert resource metrics
489 if resource_metrics:
490 db.bulk_insert_mappings(
491 ResourceMetric,
492 [
493 {
494 "resource_id": m.resource_id,
495 "timestamp": m.timestamp,
496 "response_time": m.response_time,
497 "is_success": m.is_success,
498 "error_message": m.error_message,
499 }
500 for m in resource_metrics
501 ],
502 )
504 # Bulk insert prompt metrics
505 if prompt_metrics:
506 db.bulk_insert_mappings(
507 PromptMetric,
508 [
509 {
510 "prompt_id": m.prompt_id,
511 "timestamp": m.timestamp,
512 "response_time": m.response_time,
513 "is_success": m.is_success,
514 "error_message": m.error_message,
515 }
516 for m in prompt_metrics
517 ],
518 )
520 # Bulk insert A2A agent metrics
521 if a2a_agent_metrics:
522 db.bulk_insert_mappings(
523 A2AAgentMetric,
524 [
525 {
526 "a2a_agent_id": m.a2a_agent_id,
527 "timestamp": m.timestamp,
528 "response_time": m.response_time,
529 "is_success": m.is_success,
530 "interaction_type": m.interaction_type,
531 "error_message": m.error_message,
532 }
533 for m in a2a_agent_metrics
534 ],
535 )
537 db.commit()
539 except Exception as e:
540 logger.error(f"Failed to flush metrics to database: {e}", exc_info=True)
541 # Metrics are lost on failure - acceptable trade-off for performance
542 # Could implement retry queue if needed
544 # Flush server metrics in a separate transaction so that an invalid
545 # server_id (FK violation) does not roll back tool/resource/prompt/a2a
546 # metrics. server_id can originate from untrusted headers (X-Server-ID)
547 # in admin API paths, so it may reference a nonexistent server.
548 if server_metrics:
549 try:
550 with fresh_db_session() as db:
551 db.bulk_insert_mappings(
552 ServerMetric,
553 [
554 {
555 "server_id": m.server_id,
556 "timestamp": m.timestamp,
557 "response_time": m.response_time,
558 "is_success": m.is_success,
559 "error_message": m.error_message,
560 }
561 for m in server_metrics
562 ],
563 )
564 db.commit()
565 except Exception as e:
566 logger.error(f"Failed to flush server metrics to database: {e}", exc_info=True)
568 def _write_tool_metric_immediately(
569 self,
570 tool_id: str,
571 start_time: float,
572 success: bool,
573 error_message: Optional[str],
574 ) -> None:
575 """Write a single tool metric immediately (fallback when buffering disabled).
577 Args:
578 tool_id: UUID of the tool.
579 start_time: Monotonic start time for response_time calculation.
580 success: Whether the operation succeeded.
581 error_message: Optional error message if failed.
582 """
583 try:
584 with fresh_db_session() as db:
585 metric = ToolMetric(
586 tool_id=tool_id,
587 timestamp=datetime.now(timezone.utc),
588 response_time=time.monotonic() - start_time,
589 is_success=success,
590 error_message=error_message,
591 )
592 db.add(metric)
593 db.commit()
594 except Exception as e:
595 logger.error(f"Failed to write tool metric: {e}")
597 def _write_resource_metric_immediately(
598 self,
599 resource_id: str,
600 start_time: float,
601 success: bool,
602 error_message: Optional[str],
603 ) -> None:
604 """Write a single resource metric immediately.
606 Args:
607 resource_id: UUID of the resource.
608 start_time: Monotonic start time for response_time calculation.
609 success: Whether the operation succeeded.
610 error_message: Optional error message if failed.
611 """
612 try:
613 with fresh_db_session() as db:
614 metric = ResourceMetric(
615 resource_id=resource_id,
616 timestamp=datetime.now(timezone.utc),
617 response_time=time.monotonic() - start_time,
618 is_success=success,
619 error_message=error_message,
620 )
621 db.add(metric)
622 db.commit()
623 except Exception as e:
624 logger.error(f"Failed to write resource metric: {e}")
626 def _write_prompt_metric_immediately(
627 self,
628 prompt_id: str,
629 start_time: float,
630 success: bool,
631 error_message: Optional[str],
632 ) -> None:
633 """Write a single prompt metric immediately.
635 Args:
636 prompt_id: UUID of the prompt.
637 start_time: Monotonic start time for response_time calculation.
638 success: Whether the operation succeeded.
639 error_message: Optional error message if failed.
640 """
641 try:
642 with fresh_db_session() as db:
643 metric = PromptMetric(
644 prompt_id=prompt_id,
645 timestamp=datetime.now(timezone.utc),
646 response_time=time.monotonic() - start_time,
647 is_success=success,
648 error_message=error_message,
649 )
650 db.add(metric)
651 db.commit()
652 except Exception as e:
653 logger.error(f"Failed to write prompt metric: {e}")
655 def _write_server_metric_immediately(
656 self,
657 server_id: str,
658 start_time: float,
659 success: bool,
660 error_message: Optional[str],
661 ) -> None:
662 """Write a single server metric immediately.
664 Args:
665 server_id: UUID of the server.
666 start_time: Monotonic start time for response_time calculation.
667 success: Whether the operation succeeded.
668 error_message: Optional error message if failed.
669 """
670 try:
671 with fresh_db_session() as db:
672 metric = ServerMetric(
673 server_id=server_id,
674 timestamp=datetime.now(timezone.utc),
675 response_time=time.monotonic() - start_time,
676 is_success=success,
677 error_message=error_message,
678 )
679 db.add(metric)
680 db.commit()
681 except Exception as e:
682 logger.error(f"Failed to write server metric: {e}")
684 def _write_a2a_agent_metric_immediately(
685 self,
686 a2a_agent_id: str,
687 start_time: float,
688 success: bool,
689 interaction_type: str,
690 error_message: Optional[str],
691 ) -> None:
692 """Write a single A2A agent metric immediately.
694 Args:
695 a2a_agent_id: UUID of the A2A agent.
696 start_time: Monotonic start time for response_time calculation.
697 success: Whether the operation succeeded.
698 interaction_type: Type of interaction (e.g., "invoke").
699 error_message: Optional error message if failed.
700 """
701 try:
702 with fresh_db_session() as db:
703 metric = A2AAgentMetric(
704 a2a_agent_id=a2a_agent_id,
705 timestamp=datetime.now(timezone.utc),
706 response_time=time.monotonic() - start_time,
707 is_success=success,
708 interaction_type=interaction_type,
709 error_message=error_message,
710 )
711 db.add(metric)
712 db.commit()
713 except Exception as e:
714 logger.error(f"Failed to write A2A agent metric: {e}")
716 def _write_a2a_agent_metric_with_duration_immediately(
717 self,
718 a2a_agent_id: str,
719 response_time: float,
720 success: bool,
721 interaction_type: str,
722 error_message: Optional[str],
723 ) -> None:
724 """Write a single A2A agent metric with pre-calculated duration immediately.
726 Args:
727 a2a_agent_id: UUID of the A2A agent.
728 response_time: Pre-calculated response time in seconds.
729 success: Whether the operation succeeded.
730 interaction_type: Type of interaction (e.g., "invoke").
731 error_message: Optional error message if failed.
732 """
733 try:
734 with fresh_db_session() as db:
735 metric = A2AAgentMetric(
736 a2a_agent_id=a2a_agent_id,
737 timestamp=datetime.now(timezone.utc),
738 response_time=response_time,
739 is_success=success,
740 interaction_type=interaction_type,
741 error_message=error_message,
742 )
743 db.add(metric)
744 db.commit()
745 except Exception as e:
746 logger.error(f"Failed to write A2A agent metric: {e}")
748 def get_stats(self) -> dict:
749 """Get buffer statistics for monitoring.
751 Returns:
752 dict: Buffer statistics including enabled state, sizes, and counts.
753 """
754 with self._lock:
755 current_size = len(self._tool_metrics) + len(self._resource_metrics) + len(self._prompt_metrics) + len(self._server_metrics) + len(self._a2a_agent_metrics)
757 return {
758 "recording_enabled": self.recording_enabled,
759 "enabled": self.enabled,
760 "flush_interval": self.flush_interval,
761 "max_buffer_size": self.max_buffer_size,
762 "current_buffer_size": current_size,
763 "total_buffered": self._total_buffered,
764 "total_flushed": self._total_flushed,
765 "flush_count": self._flush_count,
766 }
769# Singleton instance
770_metrics_buffer_service: Optional[MetricsBufferService] = None
773def get_metrics_buffer_service() -> MetricsBufferService:
774 """Get or create the singleton MetricsBufferService instance.
776 Returns:
777 MetricsBufferService: The singleton metrics buffer service instance.
778 """
779 global _metrics_buffer_service # pylint: disable=global-statement
780 if _metrics_buffer_service is None:
781 _metrics_buffer_service = MetricsBufferService()
782 return _metrics_buffer_service