Coverage for mcpgateway / services / logging_service.py: 100%
257 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/logging_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Logging Service Implementation.
8This module implements structured logging according to the MCP specification.
9It supports RFC 5424 severity levels, log level management, and log event subscriptions.
10"""
12# Standard
13import asyncio
14from asyncio.events import AbstractEventLoop
15from datetime import datetime, timezone
16import logging
17from logging.handlers import RotatingFileHandler
18import os
19import socket
20from typing import Any, AsyncGenerator, Dict, List, NotRequired, Optional, TextIO, TypedDict
22# Third-Party
23from pythonjsonlogger import json as jsonlogger # You may need to install python-json-logger package
25# First-Party
26from mcpgateway.common.models import LogLevel
27from mcpgateway.config import settings
28from mcpgateway.services.log_storage_service import LogStorageService
29from mcpgateway.utils.correlation_id import get_correlation_id
30from mcpgateway.utils.url_auth import sanitize_exception_message
32# Optional OpenTelemetry support (Third-Party)
33try:
34 # Third-Party
35 from opentelemetry import trace # type: ignore[import-untyped]
36except ImportError:
37 trace = None # type: ignore[assignment]
39AnyioClosedResourceError: Optional[type] # pylint: disable=invalid-name
40try:
41 # Optional import; only used for filtering a known benign upstream error (Third-Party)
42 # Third-Party
43 from anyio import ClosedResourceError as AnyioClosedResourceError # pylint: disable=invalid-name
44except Exception: # pragma: no cover - environment without anyio
45 AnyioClosedResourceError = None # pylint: disable=invalid-name
47# First-Party
48# Standard log format used across the codebase
49LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
50LOG_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
52# Cache static values at module load - these don't change during process lifetime
53_CACHED_HOSTNAME: str = socket.gethostname()
54_CACHED_PID: int = os.getpid()
56# Cache level mapping dictionaries at module load to avoid recreation on every log call
57# Maps Python log level names to MCP LogLevel enum (used in StorageHandler.emit)
58_PYTHON_TO_MCP_LEVEL_MAP: Dict[str, LogLevel] = {
59 "DEBUG": LogLevel.DEBUG,
60 "INFO": LogLevel.INFO,
61 "WARNING": LogLevel.WARNING,
62 "ERROR": LogLevel.ERROR,
63 "CRITICAL": LogLevel.CRITICAL,
64}
66# Maps MCP LogLevel to Python logging method names (used in notify)
67_MCP_TO_PYTHON_METHOD_MAP: Dict[LogLevel, str] = {
68 LogLevel.DEBUG: "debug",
69 LogLevel.INFO: "info",
70 LogLevel.NOTICE: "info", # Map NOTICE to INFO
71 LogLevel.WARNING: "warning",
72 LogLevel.ERROR: "error",
73 LogLevel.CRITICAL: "critical",
74 LogLevel.ALERT: "critical", # Map ALERT to CRITICAL
75 LogLevel.EMERGENCY: "critical", # Map EMERGENCY to CRITICAL
76}
78# Maps MCP LogLevel to numeric values for comparison (used in _should_log)
79_MCP_LEVEL_VALUES: Dict[LogLevel, int] = {
80 LogLevel.DEBUG: 0,
81 LogLevel.INFO: 1,
82 LogLevel.NOTICE: 2,
83 LogLevel.WARNING: 3,
84 LogLevel.ERROR: 4,
85 LogLevel.CRITICAL: 5,
86 LogLevel.ALERT: 6,
87 LogLevel.EMERGENCY: 7,
88}
90# Create a text formatter with standard format
91text_formatter = logging.Formatter(LOG_FORMAT, datefmt=LOG_DATE_FORMAT)
94class CorrelationIdJsonFormatter(jsonlogger.JsonFormatter):
95 """JSON formatter that includes correlation ID and OpenTelemetry trace context."""
97 def add_fields(self, log_record: dict, record: logging.LogRecord, message_dict: dict) -> None: # pylint: disable=arguments-renamed
98 """Add custom fields to the log record.
100 Args:
101 log_record: The dictionary that will be logged as JSON
102 record: The original LogRecord
103 message_dict: Additional message fields
105 """
106 super().add_fields(log_record, record, message_dict)
108 # Add timestamp in ISO 8601 format with 'Z' suffix for UTC
109 dt = datetime.fromtimestamp(record.created, tz=timezone.utc)
110 log_record["@timestamp"] = dt.isoformat().replace("+00:00", "Z")
112 # Add hostname and process ID for log aggregation - use cached values for performance
113 log_record["hostname"] = _CACHED_HOSTNAME
114 log_record["process_id"] = _CACHED_PID
116 # Add correlation ID from context
117 correlation_id = get_correlation_id()
118 if correlation_id:
119 log_record["request_id"] = correlation_id
121 # Add OpenTelemetry trace context if available
122 if trace is not None:
123 try:
124 span = trace.get_current_span()
125 if span and span.is_recording():
126 span_context = span.get_span_context()
127 if span_context.is_valid:
128 # Format trace_id and span_id as hex strings
129 log_record["trace_id"] = format(span_context.trace_id, "032x")
130 log_record["span_id"] = format(span_context.span_id, "016x")
131 log_record["trace_flags"] = format(span_context.trace_flags, "02x")
132 except Exception: # nosec B110 - intentionally catching all exceptions for optional tracing
133 # Error accessing span context, continue without trace fields
134 pass
137# Create a JSON formatter with correlation ID support (uses same base format)
138json_formatter = CorrelationIdJsonFormatter(LOG_FORMAT, datefmt=LOG_DATE_FORMAT)
140# Note: Don't use basicConfig here as it conflicts with our custom dual logging setup
141# The LoggingService.initialize() method will properly configure all handlers
143# Global handlers will be created lazily
144_file_handler: Optional[logging.Handler] = None
145_text_handler: Optional[logging.StreamHandler[TextIO]] = None
148def _get_file_handler() -> logging.Handler:
149 """Get or create the file handler.
151 Returns:
152 logging.Handler: Either a RotatingFileHandler or regular FileHandler for JSON logging.
154 Raises:
155 ValueError: If file logging is disabled or no log file specified.
157 """
158 global _file_handler # pylint: disable=global-statement
159 if _file_handler is None:
160 # Only create if file logging is enabled and file is specified
161 if not settings.log_to_file or not settings.log_file:
162 raise ValueError("File logging is disabled or no log file specified")
164 # Ensure log folder exists
165 if settings.log_folder:
166 os.makedirs(settings.log_folder, exist_ok=True)
167 log_path = os.path.join(settings.log_folder, settings.log_file)
168 else:
169 log_path = settings.log_file
171 # Create appropriate handler based on rotation settings
172 if settings.log_rotation_enabled:
173 max_bytes = settings.log_max_size_mb * 1024 * 1024 # Convert MB to bytes
174 _file_handler = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=settings.log_backup_count, mode=settings.log_filemode)
175 else:
176 _file_handler = logging.FileHandler(log_path, mode=settings.log_filemode)
178 _file_handler.setFormatter(json_formatter)
179 return _file_handler
182def _get_text_handler() -> logging.StreamHandler[TextIO]:
183 """Get or create the text handler.
185 Returns:
186 logging.StreamHandler: The stream handler for console logging.
188 """
189 global _text_handler # pylint: disable=global-statement
190 if _text_handler is None:
191 _text_handler = logging.StreamHandler()
192 _text_handler.setFormatter(text_formatter)
193 return _text_handler
196class StorageHandler(logging.Handler):
197 """Custom logging handler that stores logs in LogStorageService."""
199 def __init__(self, storage_service: LogStorageService):
200 """Initialize the storage handler.
202 Args:
203 storage_service: The LogStorageService instance to store logs in
205 """
206 super().__init__()
207 self.storage = storage_service
208 self.loop: AbstractEventLoop | None = None
210 def emit(self, record: logging.LogRecord) -> None:
211 """Emit a log record to storage.
213 Args:
214 record: The LogRecord to emit
216 """
217 if not self.storage:
218 return
220 # Map Python log levels to MCP LogLevel (uses module-level cached dict)
221 log_level = _PYTHON_TO_MCP_LEVEL_MAP.get(record.levelname, LogLevel.INFO)
223 # Extract entity context from record if available
224 entity_type = getattr(record, "entity_type", None)
225 entity_id = getattr(record, "entity_id", None)
226 entity_name = getattr(record, "entity_name", None)
227 request_id = getattr(record, "request_id", None)
229 # Format the message
230 try:
231 message = self.format(record)
232 except Exception:
233 message = record.getMessage()
235 # Store the log asynchronously
236 try:
237 coro = self.storage.add_log(
238 level=log_level,
239 message=message,
240 entity_type=entity_type,
241 entity_id=entity_id,
242 entity_name=entity_name,
243 logger=record.name,
244 request_id=request_id,
245 )
247 try:
248 # Fast path: we're already on an event loop thread.
249 loop = asyncio.get_running_loop()
250 self.loop = loop
251 task = loop.create_task(coro)
252 task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)
253 except RuntimeError:
254 # Fallback: no running loop in this thread; attempt to schedule on a known loop.
255 loop = self.loop
256 if loop is None or not loop.is_running():
257 coro.close()
258 return
260 future = asyncio.run_coroutine_threadsafe(coro, loop)
261 future.add_done_callback(lambda f: f.exception() if not f.cancelled() else None)
262 except Exception:
263 # Silently fail to avoid logging recursion
264 pass # nosec B110 - Intentional to prevent logging recursion
267class _LogMessageData(TypedDict):
268 """Log message data structure."""
270 level: LogLevel
271 data: Any
272 timestamp: str
273 logger: NotRequired[str]
276class _LogMessage(TypedDict):
277 """Log message event structure."""
279 type: str
280 data: _LogMessageData
283class LoggingService:
284 """MCP logging service.
286 Implements structured logging with:
287 - RFC 5424 severity levels
288 - Log level management
289 - Log event subscriptions
290 - Logger name tracking
291 """
293 def __init__(self) -> None:
294 """Initialize logging service."""
295 self._level = LogLevel.INFO
296 self._subscribers: List[asyncio.Queue[_LogMessage]] = []
297 self._loggers: Dict[str, logging.Logger] = {}
298 self._storage: LogStorageService | None = None # Will be initialized if admin UI is enabled
299 self._storage_handler: Optional[StorageHandler] = None # Track the storage handler for cleanup
301 async def initialize(self) -> None:
302 """Initialize logging service.
304 Examples:
305 >>> from mcpgateway.services.logging_service import LoggingService
306 >>> import asyncio
307 >>> service = LoggingService()
308 >>> asyncio.run(service.initialize())
310 """
311 # Update service log level from settings BEFORE configuring loggers
312 self._level = LogLevel[settings.log_level.upper()]
314 root_logger = logging.getLogger()
315 self._loggers[""] = root_logger
317 # Clear existing handlers to avoid duplicates
318 root_logger.handlers.clear()
320 # Set root logger level to match settings - this is critical for LOG_LEVEL to work
321 log_level = getattr(logging, settings.log_level.upper())
322 root_logger.setLevel(log_level)
324 # Console handler (stdout/stderr)
325 #
326 # LOG_FORMAT controls the console output format:
327 # - text: human-friendly
328 # - json: machine-friendly (Loki/ELK) and includes OTEL trace context when available
329 if getattr(settings, "log_format", "text").lower() == "json":
330 console_handler = logging.StreamHandler()
331 console_handler.setFormatter(json_formatter)
332 else:
333 console_handler = _get_text_handler()
334 console_handler.setLevel(log_level)
335 root_logger.addHandler(console_handler)
337 # Only add file handler if enabled
338 if settings.log_to_file and settings.log_file:
339 try:
340 file_handler = _get_file_handler()
341 file_handler.setLevel(log_level)
342 root_logger.addHandler(file_handler)
343 if settings.log_rotation_enabled:
344 logging.info(f"File logging enabled with rotation: {settings.log_folder or '.'}/{settings.log_file} (max: {settings.log_max_size_mb}MB, backups: {settings.log_backup_count})")
345 else:
346 logging.info(f"File logging enabled (no rotation): {settings.log_folder or '.'}/{settings.log_file}")
347 except Exception as e:
348 logging.warning(f"Failed to initialize file logging: {e}")
349 else:
350 logging.info("File logging disabled - logging to stdout/stderr only")
352 # Configure uvicorn loggers to use our handlers (for access logs)
353 # Note: This needs to be done both at init and dynamically as uvicorn creates loggers later
354 self._configure_uvicorn_loggers()
356 # Initialize log storage if admin UI is enabled
357 if settings.mcpgateway_ui_enabled or settings.mcpgateway_admin_api_enabled:
358 self._storage = LogStorageService()
360 # Add storage handler to capture all logs
361 self._storage_handler = StorageHandler(self._storage)
362 self._storage_handler.setFormatter(text_formatter)
363 self._storage_handler.setLevel(log_level)
364 root_logger.addHandler(self._storage_handler)
366 logging.info(f"Log storage initialized with {settings.log_buffer_size_mb}MB buffer")
368 logging.info("Logging service initialized")
370 # Suppress noisy upstream logs for normal stream closures in MCP streamable HTTP
371 self._install_closedresourceerror_filter()
373 # Redact sensitive query parameters from httpx/httpcore log messages
374 self._install_httpx_url_sanitize_filter()
376 async def shutdown(self) -> None:
377 """Shutdown logging service.
379 Examples:
380 >>> from mcpgateway.services.logging_service import LoggingService
381 >>> import asyncio
382 >>> service = LoggingService()
383 >>> asyncio.run(service.shutdown())
385 """
386 # Remove storage handler from root logger if it was added
387 if self._storage_handler:
388 root_logger = logging.getLogger()
389 root_logger.removeHandler(self._storage_handler)
390 self._storage_handler = None
392 # Clear subscribers
393 self._subscribers.clear()
394 logging.info("Logging service shutdown")
396 def _install_closedresourceerror_filter(self) -> None:
397 """Install a filter to drop benign ClosedResourceError logs from upstream MCP.
399 The MCP streamable HTTP server logs an ERROR when the in-memory channel is
400 closed during normal client disconnects, raising ``anyio.ClosedResourceError``.
401 This filter suppresses those specific records to keep logs clean.
403 Examples:
404 >>> # Initialize service (installs filter)
405 >>> import asyncio, logging, anyio
406 >>> service = LoggingService()
407 >>> asyncio.run(service.initialize())
408 >>> # Locate the installed filter on the target logger
409 >>> target = logging.getLogger('mcp.server.streamable_http')
410 >>> flts = [f for f in target.filters if f.__class__.__name__.endswith('SuppressClosedResourceErrorFilter')]
411 >>> len(flts) >= 1
412 True
413 >>> filt = flts[0]
414 >>> # Non-target logger should pass through even if message matches
415 >>> rec_other = logging.makeLogRecord({'name': 'other.logger', 'msg': 'ClosedResourceError'})
416 >>> filt.filter(rec_other)
417 True
418 >>> # Target logger with message containing ClosedResourceError should be suppressed
419 >>> rec_target_msg = logging.makeLogRecord({'name': 'mcp.server.streamable_http', 'msg': 'ClosedResourceError in normal shutdown'})
420 >>> filt.filter(rec_target_msg)
421 False
422 >>> # Target logger with ClosedResourceError in exc_info should be suppressed
423 >>> try:
424 ... raise anyio.ClosedResourceError
425 ... except anyio.ClosedResourceError as e:
426 ... rec_target_exc = logging.makeLogRecord({
427 ... 'name': 'mcp.server.streamable_http',
428 ... 'msg': 'Error in message router',
429 ... 'exc_info': (e.__class__, e, None),
430 ... })
431 >>> filt.filter(rec_target_exc)
432 False
433 >>> # Cleanup
434 >>> asyncio.run(service.shutdown())
436 """
438 class _SuppressClosedResourceErrorFilter(logging.Filter):
439 """Filter to suppress ClosedResourceError exceptions from MCP streamable HTTP logger.
441 This filter prevents noisy ClosedResourceError exceptions from the upstream
442 MCP streamable HTTP implementation from cluttering the logs. These errors
443 are typically harmless connection cleanup events.
444 """
446 def filter(self, record: logging.LogRecord) -> bool: # noqa: D401
447 """Filter log records to suppress ClosedResourceError exceptions.
449 Args:
450 record: The log record to evaluate
452 Returns:
453 True to allow the record through, False to suppress it
455 """
456 # Apply only to upstream MCP streamable HTTP logger
457 if not record.name.startswith("mcp.server.streamable_http"):
458 return True
460 # If exception info is present, check its type
461 exc_info = getattr(record, "exc_info", None)
462 if exc_info and AnyioClosedResourceError is not None:
463 exc_type, exc, _tb = exc_info
464 try:
465 if isinstance(exc, AnyioClosedResourceError) or (getattr(exc_type, "__name__", "") == "ClosedResourceError"):
466 return False
467 except Exception:
468 # Be permissive if anything goes wrong, don't drop logs accidentally
469 return True
471 # Fallback: drop if message text clearly indicates ClosedResourceError
472 try:
473 msg = record.getMessage()
474 if "ClosedResourceError" in msg:
475 return False
476 except Exception:
477 pass # nosec B110 - Intentional to prevent logging recursion
478 return True
480 target_logger = logging.getLogger("mcp.server.streamable_http")
481 target_logger.addFilter(_SuppressClosedResourceErrorFilter())
483 @staticmethod
484 def _install_httpx_url_sanitize_filter() -> None:
485 """Install a filter to redact sensitive query parameters from httpx/httpcore log messages.
487 httpx and httpcore log full request URLs at INFO level, bypassing
488 application-level sanitization. This filter intercepts those log
489 records and redacts sensitive query parameters (api_key, token, etc.)
490 before they reach any handler.
492 Examples:
493 >>> import asyncio, logging
494 >>> service = LoggingService()
495 >>> asyncio.run(service.initialize())
496 >>> filt = [f for f in logging.getLogger('httpx').filters
497 ... if f.__class__.__name__ == '_HttpxUrlSanitizeFilter'][0]
498 >>> rec = logging.makeLogRecord({
499 ... 'name': 'httpx',
500 ... 'msg': 'HTTP Request: GET https://example.mcp.server.com/sse?api_key=secret-value "HTTP/1.1 200 OK"',
501 ... })
502 >>> filt.filter(rec)
503 True
504 >>> 'secret-value' not in rec.getMessage()
505 True
506 >>> 'api_key=REDACTED' in rec.getMessage()
507 True
508 >>> asyncio.run(service.shutdown())
510 """
512 class _HttpxUrlSanitizeFilter(logging.Filter):
513 """Filter that redacts sensitive query parameters from URLs in httpx log messages."""
515 def filter(self, record: logging.LogRecord) -> bool: # noqa: D401
516 """Sanitize URLs in the log record message, then allow it through.
518 Args:
519 record: The log record to sanitize.
521 Returns:
522 Always True (record is never suppressed, only sanitized).
524 """
525 try:
526 msg = record.getMessage()
527 sanitized = sanitize_exception_message(msg)
528 if sanitized != msg:
529 record.msg = sanitized
530 record.args = None
531 except Exception:
532 pass # nosec B110 - Never break logging due to sanitization failure
533 return True
535 url_filter = _HttpxUrlSanitizeFilter()
536 for logger_name in ("httpx", "httpcore"):
537 logging.getLogger(logger_name).addFilter(url_filter)
539 def get_logger(self, name: str) -> logging.Logger:
540 """Get or create logger instance.
542 Args:
543 name: Logger name
545 Returns:
546 Logger instance
548 Examples:
549 >>> from mcpgateway.services.logging_service import LoggingService
550 >>> service = LoggingService()
551 >>> logger = service.get_logger('test')
552 >>> import logging
553 >>> isinstance(logger, logging.Logger)
554 True
556 """
557 if name not in self._loggers:
558 logger = logging.getLogger(name)
560 # Don't add handlers to child loggers - let them inherit from root
561 # This prevents duplicate logging while maintaining dual output (console + file)
562 logger.propagate = True
564 # Don't set level on child loggers - let them inherit from root logger
565 # This ensures LOG_LEVEL environment variable is respected after initialize() runs
566 # The root logger level is set in initialize() based on settings.log_level
568 self._loggers[name] = logger
570 return self._loggers[name]
572 async def set_level(self, level: LogLevel) -> None:
573 """Set minimum log level.
575 This updates the level for all registered loggers.
577 Args:
578 level: New log level
580 Examples:
581 >>> from mcpgateway.services.logging_service import LoggingService
582 >>> from mcpgateway.common.models import LogLevel
583 >>> import asyncio
584 >>> service = LoggingService()
585 >>> asyncio.run(service.set_level(LogLevel.DEBUG))
587 """
588 self._level = level
590 # Update all loggers and handlers
591 log_level = getattr(logging, level.upper())
593 # Update Python root logger so new child loggers inherit the correct level
594 logging.getLogger().setLevel(log_level)
596 # Update handler levels so they don't filter out records the logger passes
597 for handler in logging.getLogger().handlers:
598 handler.setLevel(log_level)
600 for logger in self._loggers.values():
601 logger.setLevel(log_level)
603 await self.notify(f"Log level set to {level}", LogLevel.INFO, "logging")
605 async def notify( # pylint: disable=too-many-positional-arguments
606 self,
607 data: Any,
608 level: LogLevel,
609 logger_name: Optional[str] = None,
610 entity_type: Optional[str] = None,
611 entity_id: Optional[str] = None,
612 entity_name: Optional[str] = None,
613 request_id: Optional[str] = None,
614 extra_data: Optional[Dict[str, Any]] = None,
615 ) -> None:
616 """Send log notification to subscribers.
618 Args:
619 data: Log message data
620 level: Log severity level
621 logger_name: Optional logger name
622 entity_type: Type of entity (tool, resource, server, gateway)
623 entity_id: ID of the related entity
624 entity_name: Name of the related entity
625 request_id: Associated request ID for tracing
626 extra_data: Additional structured data
628 Examples:
629 >>> from mcpgateway.services.logging_service import LoggingService
630 >>> from mcpgateway.common.models import LogLevel
631 >>> import asyncio
632 >>> service = LoggingService()
633 >>> asyncio.run(service.notify('test', LogLevel.INFO))
635 """
636 # Skip if below current level
637 if not self._should_log(level):
638 return
640 # Format notification message
641 message: _LogMessage = {
642 "type": "log",
643 "data": {
644 "level": level,
645 "data": data,
646 "timestamp": datetime.now(timezone.utc).isoformat(),
647 },
648 }
649 if logger_name:
650 message["data"]["logger"] = logger_name
652 # Log through standard logging
653 logger = self.get_logger(logger_name or "")
655 # Map MCP log levels to Python logging levels (uses module-level cached dict)
656 log_method = _MCP_TO_PYTHON_METHOD_MAP.get(level, "info")
657 log_func = getattr(logger, log_method)
658 log_func(data)
660 # Store in log storage if available
661 if self._storage:
662 await self._storage.add_log(
663 level=level,
664 message=str(data),
665 entity_type=entity_type,
666 entity_id=entity_id,
667 entity_name=entity_name,
668 logger=logger_name,
669 data=extra_data,
670 request_id=request_id,
671 )
673 # Notify subscribers
674 for queue in self._subscribers:
675 try:
676 await queue.put(message)
677 except Exception as e:
678 logger.error(f"Failed to notify subscriber: {e}")
680 async def subscribe(self) -> AsyncGenerator[_LogMessage, None]:
681 """Subscribe to log messages.
683 Returns a generator yielding log message events.
685 Yields:
686 Log message events
688 Examples:
689 This example was removed to prevent the test runner from hanging on async generator consumption.
691 """
692 queue: asyncio.Queue[_LogMessage] = asyncio.Queue()
693 self._subscribers.append(queue)
694 try:
695 while True:
696 message = await queue.get()
697 yield message
698 finally:
699 self._subscribers.remove(queue)
701 def _should_log(self, level: LogLevel) -> bool:
702 """Check if level meets minimum threshold.
704 Args:
705 level: Log level to check
707 Returns:
708 True if should log
710 Examples:
711 >>> from mcpgateway.common.models import LogLevel
712 >>> service = LoggingService()
713 >>> service._level = LogLevel.WARNING
714 >>> service._should_log(LogLevel.ERROR)
715 True
716 >>> service._should_log(LogLevel.INFO)
717 False
718 >>> service._should_log(LogLevel.WARNING)
719 True
720 >>> service._should_log(LogLevel.DEBUG)
721 False
723 """
724 # Uses module-level cached dict for performance
725 return _MCP_LEVEL_VALUES[level] >= _MCP_LEVEL_VALUES[self._level]
727 def _configure_uvicorn_loggers(self) -> None:
728 """Configure uvicorn loggers to use our dual logging setup.
730 This method handles uvicorn's logging setup which can happen after our initialization.
731 Uvicorn creates its own loggers and handlers, so we need to redirect them to our setup.
732 """
733 uvicorn_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "uvicorn.asgi"]
735 for logger_name in uvicorn_loggers:
736 uvicorn_logger = logging.getLogger(logger_name)
738 # Clear any handlers that uvicorn may have added
739 uvicorn_logger.handlers.clear()
741 # Make sure they propagate to root (which has our dual handlers)
742 uvicorn_logger.propagate = True
744 # Set level to match our logging service level
745 if hasattr(self, "_level"):
746 log_level = getattr(logging, self._level.upper())
747 uvicorn_logger.setLevel(log_level)
749 # Track the logger
750 self._loggers[logger_name] = uvicorn_logger
752 def configure_uvicorn_after_startup(self) -> None:
753 """Public method to reconfigure uvicorn loggers after server startup.
755 Call this after uvicorn has started to ensure access logs go to dual output.
756 This handles the case where uvicorn creates loggers after our initialization.
757 """
758 self._configure_uvicorn_loggers()
759 logging.info("Uvicorn loggers reconfigured for dual logging")
761 def get_storage(self) -> Optional[LogStorageService]:
762 """Get the log storage service if available.
764 Returns:
765 LogStorageService instance or None if not initialized
767 """
768 return self._storage