Coverage for mcpgateway / services / logging_service.py: 96%
238 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/logging_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Logging Service Implementation.
8This module implements structured logging according to the MCP specification.
9It supports RFC 5424 severity levels, log level management, and log event subscriptions.
10"""
12# Standard
13import asyncio
14from asyncio.events import AbstractEventLoop
15from datetime import datetime, timezone
16import logging
17from logging.handlers import RotatingFileHandler
18import os
19import socket
20from typing import Any, AsyncGenerator, Dict, List, NotRequired, Optional, TextIO, TypedDict
22# Third-Party
23from pythonjsonlogger import json as jsonlogger # You may need to install python-json-logger package
25# First-Party
26from mcpgateway.common.models import LogLevel
27from mcpgateway.config import settings
28from mcpgateway.services.log_storage_service import LogStorageService
29from mcpgateway.utils.correlation_id import get_correlation_id
31# Optional OpenTelemetry support (Third-Party)
32try:
33 # Third-Party
34 from opentelemetry import trace # type: ignore[import-untyped]
35except ImportError:
36 trace = None # type: ignore[assignment]
38AnyioClosedResourceError: Optional[type] # pylint: disable=invalid-name
39try:
40 # Optional import; only used for filtering a known benign upstream error (Third-Party)
41 # Third-Party
42 from anyio import ClosedResourceError as AnyioClosedResourceError # pylint: disable=invalid-name
43except Exception: # pragma: no cover - environment without anyio
44 AnyioClosedResourceError = None # pylint: disable=invalid-name
46# First-Party
47# Standard log format used across the codebase
48LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
49LOG_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
51# Cache static values at module load - these don't change during process lifetime
52_CACHED_HOSTNAME: str = socket.gethostname()
53_CACHED_PID: int = os.getpid()
55# Cache level mapping dictionaries at module load to avoid recreation on every log call
56# Maps Python log level names to MCP LogLevel enum (used in StorageHandler.emit)
57_PYTHON_TO_MCP_LEVEL_MAP: Dict[str, LogLevel] = {
58 "DEBUG": LogLevel.DEBUG,
59 "INFO": LogLevel.INFO,
60 "WARNING": LogLevel.WARNING,
61 "ERROR": LogLevel.ERROR,
62 "CRITICAL": LogLevel.CRITICAL,
63}
65# Maps MCP LogLevel to Python logging method names (used in notify)
66_MCP_TO_PYTHON_METHOD_MAP: Dict[LogLevel, str] = {
67 LogLevel.DEBUG: "debug",
68 LogLevel.INFO: "info",
69 LogLevel.NOTICE: "info", # Map NOTICE to INFO
70 LogLevel.WARNING: "warning",
71 LogLevel.ERROR: "error",
72 LogLevel.CRITICAL: "critical",
73 LogLevel.ALERT: "critical", # Map ALERT to CRITICAL
74 LogLevel.EMERGENCY: "critical", # Map EMERGENCY to CRITICAL
75}
77# Maps MCP LogLevel to numeric values for comparison (used in _should_log)
78_MCP_LEVEL_VALUES: Dict[LogLevel, int] = {
79 LogLevel.DEBUG: 0,
80 LogLevel.INFO: 1,
81 LogLevel.NOTICE: 2,
82 LogLevel.WARNING: 3,
83 LogLevel.ERROR: 4,
84 LogLevel.CRITICAL: 5,
85 LogLevel.ALERT: 6,
86 LogLevel.EMERGENCY: 7,
87}
89# Create a text formatter with standard format
90text_formatter = logging.Formatter(LOG_FORMAT, datefmt=LOG_DATE_FORMAT)
93class CorrelationIdJsonFormatter(jsonlogger.JsonFormatter):
94 """JSON formatter that includes correlation ID and OpenTelemetry trace context."""
96 def add_fields(self, log_record: dict, record: logging.LogRecord, message_dict: dict) -> None: # pylint: disable=arguments-renamed
97 """Add custom fields to the log record.
99 Args:
100 log_record: The dictionary that will be logged as JSON
101 record: The original LogRecord
102 message_dict: Additional message fields
104 """
105 super().add_fields(log_record, record, message_dict)
107 # Add timestamp in ISO 8601 format with 'Z' suffix for UTC
108 dt = datetime.fromtimestamp(record.created, tz=timezone.utc)
109 log_record["@timestamp"] = dt.isoformat().replace("+00:00", "Z")
111 # Add hostname and process ID for log aggregation - use cached values for performance
112 log_record["hostname"] = _CACHED_HOSTNAME
113 log_record["process_id"] = _CACHED_PID
115 # Add correlation ID from context
116 correlation_id = get_correlation_id()
117 if correlation_id:
118 log_record["request_id"] = correlation_id
120 # Add OpenTelemetry trace context if available
121 if trace is not None:
122 try:
123 span = trace.get_current_span()
124 if span and span.is_recording(): 124 ↛ exitline 124 didn't return from function 'add_fields' because the condition on line 124 was always true
125 span_context = span.get_span_context()
126 if span_context.is_valid:
127 # Format trace_id and span_id as hex strings
128 log_record["trace_id"] = format(span_context.trace_id, "032x")
129 log_record["span_id"] = format(span_context.span_id, "016x")
130 log_record["trace_flags"] = format(span_context.trace_flags, "02x")
131 except Exception: # nosec B110 - intentionally catching all exceptions for optional tracing
132 # Error accessing span context, continue without trace fields
133 pass
136# Create a JSON formatter with correlation ID support (uses same base format)
137json_formatter = CorrelationIdJsonFormatter(LOG_FORMAT, datefmt=LOG_DATE_FORMAT)
139# Note: Don't use basicConfig here as it conflicts with our custom dual logging setup
140# The LoggingService.initialize() method will properly configure all handlers
142# Global handlers will be created lazily
143_file_handler: Optional[logging.Handler] = None
144_text_handler: Optional[logging.StreamHandler[TextIO]] = None
147def _get_file_handler() -> logging.Handler:
148 """Get or create the file handler.
150 Returns:
151 logging.Handler: Either a RotatingFileHandler or regular FileHandler for JSON logging.
153 Raises:
154 ValueError: If file logging is disabled or no log file specified.
156 """
157 global _file_handler # pylint: disable=global-statement
158 if _file_handler is None: 158 ↛ 178line 158 didn't jump to line 178 because the condition on line 158 was always true
159 # Only create if file logging is enabled and file is specified
160 if not settings.log_to_file or not settings.log_file:
161 raise ValueError("File logging is disabled or no log file specified")
163 # Ensure log folder exists
164 if settings.log_folder:
165 os.makedirs(settings.log_folder, exist_ok=True)
166 log_path = os.path.join(settings.log_folder, settings.log_file)
167 else:
168 log_path = settings.log_file
170 # Create appropriate handler based on rotation settings
171 if settings.log_rotation_enabled:
172 max_bytes = settings.log_max_size_mb * 1024 * 1024 # Convert MB to bytes
173 _file_handler = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=settings.log_backup_count, mode=settings.log_filemode)
174 else:
175 _file_handler = logging.FileHandler(log_path, mode=settings.log_filemode)
177 _file_handler.setFormatter(json_formatter)
178 return _file_handler
181def _get_text_handler() -> logging.StreamHandler[TextIO]:
182 """Get or create the text handler.
184 Returns:
185 logging.StreamHandler: The stream handler for console logging.
187 """
188 global _text_handler # pylint: disable=global-statement
189 if _text_handler is None:
190 _text_handler = logging.StreamHandler()
191 _text_handler.setFormatter(text_formatter)
192 return _text_handler
195class StorageHandler(logging.Handler):
196 """Custom logging handler that stores logs in LogStorageService."""
198 def __init__(self, storage_service: LogStorageService):
199 """Initialize the storage handler.
201 Args:
202 storage_service: The LogStorageService instance to store logs in
204 """
205 super().__init__()
206 self.storage = storage_service
207 self.loop: AbstractEventLoop | None = None
209 def emit(self, record: logging.LogRecord) -> None:
210 """Emit a log record to storage.
212 Args:
213 record: The LogRecord to emit
215 """
216 if not self.storage:
217 return
219 # Map Python log levels to MCP LogLevel (uses module-level cached dict)
220 log_level = _PYTHON_TO_MCP_LEVEL_MAP.get(record.levelname, LogLevel.INFO)
222 # Extract entity context from record if available
223 entity_type = getattr(record, "entity_type", None)
224 entity_id = getattr(record, "entity_id", None)
225 entity_name = getattr(record, "entity_name", None)
226 request_id = getattr(record, "request_id", None)
228 # Format the message
229 try:
230 message = self.format(record)
231 except Exception:
232 message = record.getMessage()
234 # Store the log asynchronously
235 try:
236 coro = self.storage.add_log(
237 level=log_level,
238 message=message,
239 entity_type=entity_type,
240 entity_id=entity_id,
241 entity_name=entity_name,
242 logger=record.name,
243 request_id=request_id,
244 )
246 try:
247 # Fast path: we're already on an event loop thread.
248 loop = asyncio.get_running_loop()
249 self.loop = loop
250 task = loop.create_task(coro)
251 task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)
252 except RuntimeError:
253 # Fallback: no running loop in this thread; attempt to schedule on a known loop.
254 loop = self.loop
255 if loop is None or not loop.is_running():
256 coro.close()
257 return
259 future = asyncio.run_coroutine_threadsafe(coro, loop)
260 future.add_done_callback(lambda f: f.exception() if not f.cancelled() else None)
261 except Exception:
262 # Silently fail to avoid logging recursion
263 pass # nosec B110 - Intentional to prevent logging recursion
266class _LogMessageData(TypedDict):
267 """Log message data structure."""
269 level: LogLevel
270 data: Any
271 timestamp: str
272 logger: NotRequired[str]
275class _LogMessage(TypedDict):
276 """Log message event structure."""
278 type: str
279 data: _LogMessageData
282class LoggingService:
283 """MCP logging service.
285 Implements structured logging with:
286 - RFC 5424 severity levels
287 - Log level management
288 - Log event subscriptions
289 - Logger name tracking
290 """
292 def __init__(self) -> None:
293 """Initialize logging service."""
294 self._level = LogLevel.INFO
295 self._subscribers: List[asyncio.Queue[_LogMessage]] = []
296 self._loggers: Dict[str, logging.Logger] = {}
297 self._storage: LogStorageService | None = None # Will be initialized if admin UI is enabled
298 self._storage_handler: Optional[StorageHandler] = None # Track the storage handler for cleanup
300 async def initialize(self) -> None:
301 """Initialize logging service.
303 Examples:
304 >>> from mcpgateway.services.logging_service import LoggingService
305 >>> import asyncio
306 >>> service = LoggingService()
307 >>> asyncio.run(service.initialize())
309 """
310 # Update service log level from settings BEFORE configuring loggers
311 self._level = LogLevel[settings.log_level.upper()]
313 root_logger = logging.getLogger()
314 self._loggers[""] = root_logger
316 # Clear existing handlers to avoid duplicates
317 root_logger.handlers.clear()
319 # Set root logger level to match settings - this is critical for LOG_LEVEL to work
320 log_level = getattr(logging, settings.log_level.upper())
321 root_logger.setLevel(log_level)
323 # Console handler (stdout/stderr)
324 #
325 # LOG_FORMAT controls the console output format:
326 # - text: human-friendly
327 # - json: machine-friendly (Loki/ELK) and includes OTEL trace context when available
328 if getattr(settings, "log_format", "text").lower() == "json":
329 console_handler = logging.StreamHandler()
330 console_handler.setFormatter(json_formatter)
331 else:
332 console_handler = _get_text_handler()
333 console_handler.setLevel(log_level)
334 root_logger.addHandler(console_handler)
336 # Only add file handler if enabled
337 if settings.log_to_file and settings.log_file:
338 try:
339 file_handler = _get_file_handler()
340 file_handler.setLevel(log_level)
341 root_logger.addHandler(file_handler)
342 if settings.log_rotation_enabled:
343 logging.info(f"File logging enabled with rotation: {settings.log_folder or '.'}/{settings.log_file} (max: {settings.log_max_size_mb}MB, backups: {settings.log_backup_count})")
344 else:
345 logging.info(f"File logging enabled (no rotation): {settings.log_folder or '.'}/{settings.log_file}")
346 except Exception as e:
347 logging.warning(f"Failed to initialize file logging: {e}")
348 else:
349 logging.info("File logging disabled - logging to stdout/stderr only")
351 # Configure uvicorn loggers to use our handlers (for access logs)
352 # Note: This needs to be done both at init and dynamically as uvicorn creates loggers later
353 self._configure_uvicorn_loggers()
355 # Initialize log storage if admin UI is enabled
356 if settings.mcpgateway_ui_enabled or settings.mcpgateway_admin_api_enabled:
357 self._storage = LogStorageService()
359 # Add storage handler to capture all logs
360 self._storage_handler = StorageHandler(self._storage)
361 self._storage_handler.setFormatter(text_formatter)
362 self._storage_handler.setLevel(log_level)
363 root_logger.addHandler(self._storage_handler)
365 logging.info(f"Log storage initialized with {settings.log_buffer_size_mb}MB buffer")
367 logging.info("Logging service initialized")
369 # Suppress noisy upstream logs for normal stream closures in MCP streamable HTTP
370 self._install_closedresourceerror_filter()
372 async def shutdown(self) -> None:
373 """Shutdown logging service.
375 Examples:
376 >>> from mcpgateway.services.logging_service import LoggingService
377 >>> import asyncio
378 >>> service = LoggingService()
379 >>> asyncio.run(service.shutdown())
381 """
382 # Remove storage handler from root logger if it was added
383 if self._storage_handler:
384 root_logger = logging.getLogger()
385 root_logger.removeHandler(self._storage_handler)
386 self._storage_handler = None
388 # Clear subscribers
389 self._subscribers.clear()
390 logging.info("Logging service shutdown")
392 def _install_closedresourceerror_filter(self) -> None:
393 """Install a filter to drop benign ClosedResourceError logs from upstream MCP.
395 The MCP streamable HTTP server logs an ERROR when the in-memory channel is
396 closed during normal client disconnects, raising ``anyio.ClosedResourceError``.
397 This filter suppresses those specific records to keep logs clean.
399 Examples:
400 >>> # Initialize service (installs filter)
401 >>> import asyncio, logging, anyio
402 >>> service = LoggingService()
403 >>> asyncio.run(service.initialize())
404 >>> # Locate the installed filter on the target logger
405 >>> target = logging.getLogger('mcp.server.streamable_http')
406 >>> flts = [f for f in target.filters if f.__class__.__name__.endswith('SuppressClosedResourceErrorFilter')]
407 >>> len(flts) >= 1
408 True
409 >>> filt = flts[0]
410 >>> # Non-target logger should pass through even if message matches
411 >>> rec_other = logging.makeLogRecord({'name': 'other.logger', 'msg': 'ClosedResourceError'})
412 >>> filt.filter(rec_other)
413 True
414 >>> # Target logger with message containing ClosedResourceError should be suppressed
415 >>> rec_target_msg = logging.makeLogRecord({'name': 'mcp.server.streamable_http', 'msg': 'ClosedResourceError in normal shutdown'})
416 >>> filt.filter(rec_target_msg)
417 False
418 >>> # Target logger with ClosedResourceError in exc_info should be suppressed
419 >>> try:
420 ... raise anyio.ClosedResourceError
421 ... except anyio.ClosedResourceError as e:
422 ... rec_target_exc = logging.makeLogRecord({
423 ... 'name': 'mcp.server.streamable_http',
424 ... 'msg': 'Error in message router',
425 ... 'exc_info': (e.__class__, e, None),
426 ... })
427 >>> filt.filter(rec_target_exc)
428 False
429 >>> # Cleanup
430 >>> asyncio.run(service.shutdown())
432 """
434 class _SuppressClosedResourceErrorFilter(logging.Filter):
435 """Filter to suppress ClosedResourceError exceptions from MCP streamable HTTP logger.
437 This filter prevents noisy ClosedResourceError exceptions from the upstream
438 MCP streamable HTTP implementation from cluttering the logs. These errors
439 are typically harmless connection cleanup events.
440 """
442 def filter(self, record: logging.LogRecord) -> bool: # noqa: D401
443 """Filter log records to suppress ClosedResourceError exceptions.
445 Args:
446 record: The log record to evaluate
448 Returns:
449 True to allow the record through, False to suppress it
451 """
452 # Apply only to upstream MCP streamable HTTP logger
453 if not record.name.startswith("mcp.server.streamable_http"):
454 return True
456 # If exception info is present, check its type
457 exc_info = getattr(record, "exc_info", None)
458 if exc_info and AnyioClosedResourceError is not None:
459 exc_type, exc, _tb = exc_info
460 try:
461 if isinstance(exc, AnyioClosedResourceError) or (getattr(exc_type, "__name__", "") == "ClosedResourceError"):
462 return False
463 except Exception:
464 # Be permissive if anything goes wrong, don't drop logs accidentally
465 return True
467 # Fallback: drop if message text clearly indicates ClosedResourceError
468 try:
469 msg = record.getMessage()
470 if "ClosedResourceError" in msg:
471 return False
472 except Exception:
473 pass # nosec B110 - Intentional to prevent logging recursion
474 return True
476 target_logger = logging.getLogger("mcp.server.streamable_http")
477 target_logger.addFilter(_SuppressClosedResourceErrorFilter())
479 def get_logger(self, name: str) -> logging.Logger:
480 """Get or create logger instance.
482 Args:
483 name: Logger name
485 Returns:
486 Logger instance
488 Examples:
489 >>> from mcpgateway.services.logging_service import LoggingService
490 >>> service = LoggingService()
491 >>> logger = service.get_logger('test')
492 >>> import logging
493 >>> isinstance(logger, logging.Logger)
494 True
496 """
497 if name not in self._loggers:
498 logger = logging.getLogger(name)
500 # Don't add handlers to child loggers - let them inherit from root
501 # This prevents duplicate logging while maintaining dual output (console + file)
502 logger.propagate = True
504 # Set level to match service level
505 log_level = getattr(logging, self._level.upper())
506 logger.setLevel(log_level)
508 self._loggers[name] = logger
510 return self._loggers[name]
512 async def set_level(self, level: LogLevel) -> None:
513 """Set minimum log level.
515 This updates the level for all registered loggers.
517 Args:
518 level: New log level
520 Examples:
521 >>> from mcpgateway.services.logging_service import LoggingService
522 >>> from mcpgateway.common.models import LogLevel
523 >>> import asyncio
524 >>> service = LoggingService()
525 >>> asyncio.run(service.set_level(LogLevel.DEBUG))
527 """
528 self._level = level
530 # Update all loggers
531 log_level = getattr(logging, level.upper())
532 for logger in self._loggers.values():
533 logger.setLevel(log_level)
535 await self.notify(f"Log level set to {level}", LogLevel.INFO, "logging")
537 async def notify( # pylint: disable=too-many-positional-arguments
538 self,
539 data: Any,
540 level: LogLevel,
541 logger_name: Optional[str] = None,
542 entity_type: Optional[str] = None,
543 entity_id: Optional[str] = None,
544 entity_name: Optional[str] = None,
545 request_id: Optional[str] = None,
546 extra_data: Optional[Dict[str, Any]] = None,
547 ) -> None:
548 """Send log notification to subscribers.
550 Args:
551 data: Log message data
552 level: Log severity level
553 logger_name: Optional logger name
554 entity_type: Type of entity (tool, resource, server, gateway)
555 entity_id: ID of the related entity
556 entity_name: Name of the related entity
557 request_id: Associated request ID for tracing
558 extra_data: Additional structured data
560 Examples:
561 >>> from mcpgateway.services.logging_service import LoggingService
562 >>> from mcpgateway.common.models import LogLevel
563 >>> import asyncio
564 >>> service = LoggingService()
565 >>> asyncio.run(service.notify('test', LogLevel.INFO))
567 """
568 # Skip if below current level
569 if not self._should_log(level):
570 return
572 # Format notification message
573 message: _LogMessage = {
574 "type": "log",
575 "data": {
576 "level": level,
577 "data": data,
578 "timestamp": datetime.now(timezone.utc).isoformat(),
579 },
580 }
581 if logger_name:
582 message["data"]["logger"] = logger_name
584 # Log through standard logging
585 logger = self.get_logger(logger_name or "")
587 # Map MCP log levels to Python logging levels (uses module-level cached dict)
588 log_method = _MCP_TO_PYTHON_METHOD_MAP.get(level, "info")
589 log_func = getattr(logger, log_method)
590 log_func(data)
592 # Store in log storage if available
593 if self._storage:
594 await self._storage.add_log(
595 level=level,
596 message=str(data),
597 entity_type=entity_type,
598 entity_id=entity_id,
599 entity_name=entity_name,
600 logger=logger_name,
601 data=extra_data,
602 request_id=request_id,
603 )
605 # Notify subscribers
606 for queue in self._subscribers:
607 try:
608 await queue.put(message)
609 except Exception as e:
610 logger.error(f"Failed to notify subscriber: {e}")
612 async def subscribe(self) -> AsyncGenerator[_LogMessage, None]:
613 """Subscribe to log messages.
615 Returns a generator yielding log message events.
617 Yields:
618 Log message events
620 Examples:
621 This example was removed to prevent the test runner from hanging on async generator consumption.
623 """
624 queue: asyncio.Queue[_LogMessage] = asyncio.Queue()
625 self._subscribers.append(queue)
626 try:
627 while True:
628 message = await queue.get()
629 yield message
630 finally:
631 self._subscribers.remove(queue)
633 def _should_log(self, level: LogLevel) -> bool:
634 """Check if level meets minimum threshold.
636 Args:
637 level: Log level to check
639 Returns:
640 True if should log
642 Examples:
643 >>> from mcpgateway.common.models import LogLevel
644 >>> service = LoggingService()
645 >>> service._level = LogLevel.WARNING
646 >>> service._should_log(LogLevel.ERROR)
647 True
648 >>> service._should_log(LogLevel.INFO)
649 False
650 >>> service._should_log(LogLevel.WARNING)
651 True
652 >>> service._should_log(LogLevel.DEBUG)
653 False
655 """
656 # Uses module-level cached dict for performance
657 return _MCP_LEVEL_VALUES[level] >= _MCP_LEVEL_VALUES[self._level]
659 def _configure_uvicorn_loggers(self) -> None:
660 """Configure uvicorn loggers to use our dual logging setup.
662 This method handles uvicorn's logging setup which can happen after our initialization.
663 Uvicorn creates its own loggers and handlers, so we need to redirect them to our setup.
664 """
665 uvicorn_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "uvicorn.asgi"]
667 for logger_name in uvicorn_loggers:
668 uvicorn_logger = logging.getLogger(logger_name)
670 # Clear any handlers that uvicorn may have added
671 uvicorn_logger.handlers.clear()
673 # Make sure they propagate to root (which has our dual handlers)
674 uvicorn_logger.propagate = True
676 # Set level to match our logging service level
677 if hasattr(self, "_level"): 677 ↛ 682line 677 didn't jump to line 682 because the condition on line 677 was always true
678 log_level = getattr(logging, self._level.upper())
679 uvicorn_logger.setLevel(log_level)
681 # Track the logger
682 self._loggers[logger_name] = uvicorn_logger
684 def configure_uvicorn_after_startup(self) -> None:
685 """Public method to reconfigure uvicorn loggers after server startup.
687 Call this after uvicorn has started to ensure access logs go to dual output.
688 This handles the case where uvicorn creates loggers after our initialization.
689 """
690 self._configure_uvicorn_loggers()
691 logging.info("Uvicorn loggers reconfigured for dual logging")
693 def get_storage(self) -> Optional[LogStorageService]:
694 """Get the log storage service if available.
696 Returns:
697 LogStorageService instance or None if not initialized
699 """
700 return self._storage