Coverage for mcpgateway / services / logging_service.py: 96%

238 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/logging_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Logging Service Implementation. 

8This module implements structured logging according to the MCP specification. 

9It supports RFC 5424 severity levels, log level management, and log event subscriptions. 

10""" 

11 

12# Standard 

13import asyncio 

14from asyncio.events import AbstractEventLoop 

15from datetime import datetime, timezone 

16import logging 

17from logging.handlers import RotatingFileHandler 

18import os 

19import socket 

20from typing import Any, AsyncGenerator, Dict, List, NotRequired, Optional, TextIO, TypedDict 

21 

22# Third-Party 

23from pythonjsonlogger import json as jsonlogger # You may need to install python-json-logger package 

24 

25# First-Party 

26from mcpgateway.common.models import LogLevel 

27from mcpgateway.config import settings 

28from mcpgateway.services.log_storage_service import LogStorageService 

29from mcpgateway.utils.correlation_id import get_correlation_id 

30 

31# Optional OpenTelemetry support (Third-Party) 

32try: 

33 # Third-Party 

34 from opentelemetry import trace # type: ignore[import-untyped] 

35except ImportError: 

36 trace = None # type: ignore[assignment] 

37 

38AnyioClosedResourceError: Optional[type] # pylint: disable=invalid-name 

39try: 

40 # Optional import; only used for filtering a known benign upstream error (Third-Party) 

41 # Third-Party 

42 from anyio import ClosedResourceError as AnyioClosedResourceError # pylint: disable=invalid-name 

43except Exception: # pragma: no cover - environment without anyio 

44 AnyioClosedResourceError = None # pylint: disable=invalid-name 

45 

46# First-Party 

47# Standard log format used across the codebase 

48LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" 

49LOG_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S" 

50 

51# Cache static values at module load - these don't change during process lifetime 

52_CACHED_HOSTNAME: str = socket.gethostname() 

53_CACHED_PID: int = os.getpid() 

54 

55# Cache level mapping dictionaries at module load to avoid recreation on every log call 

56# Maps Python log level names to MCP LogLevel enum (used in StorageHandler.emit) 

57_PYTHON_TO_MCP_LEVEL_MAP: Dict[str, LogLevel] = { 

58 "DEBUG": LogLevel.DEBUG, 

59 "INFO": LogLevel.INFO, 

60 "WARNING": LogLevel.WARNING, 

61 "ERROR": LogLevel.ERROR, 

62 "CRITICAL": LogLevel.CRITICAL, 

63} 

64 

65# Maps MCP LogLevel to Python logging method names (used in notify) 

66_MCP_TO_PYTHON_METHOD_MAP: Dict[LogLevel, str] = { 

67 LogLevel.DEBUG: "debug", 

68 LogLevel.INFO: "info", 

69 LogLevel.NOTICE: "info", # Map NOTICE to INFO 

70 LogLevel.WARNING: "warning", 

71 LogLevel.ERROR: "error", 

72 LogLevel.CRITICAL: "critical", 

73 LogLevel.ALERT: "critical", # Map ALERT to CRITICAL 

74 LogLevel.EMERGENCY: "critical", # Map EMERGENCY to CRITICAL 

75} 

76 

77# Maps MCP LogLevel to numeric values for comparison (used in _should_log) 

78_MCP_LEVEL_VALUES: Dict[LogLevel, int] = { 

79 LogLevel.DEBUG: 0, 

80 LogLevel.INFO: 1, 

81 LogLevel.NOTICE: 2, 

82 LogLevel.WARNING: 3, 

83 LogLevel.ERROR: 4, 

84 LogLevel.CRITICAL: 5, 

85 LogLevel.ALERT: 6, 

86 LogLevel.EMERGENCY: 7, 

87} 

88 

89# Create a text formatter with standard format 

90text_formatter = logging.Formatter(LOG_FORMAT, datefmt=LOG_DATE_FORMAT) 

91 

92 

93class CorrelationIdJsonFormatter(jsonlogger.JsonFormatter): 

94 """JSON formatter that includes correlation ID and OpenTelemetry trace context.""" 

95 

96 def add_fields(self, log_record: dict, record: logging.LogRecord, message_dict: dict) -> None: # pylint: disable=arguments-renamed 

97 """Add custom fields to the log record. 

98 

99 Args: 

100 log_record: The dictionary that will be logged as JSON 

101 record: The original LogRecord 

102 message_dict: Additional message fields 

103 

104 """ 

105 super().add_fields(log_record, record, message_dict) 

106 

107 # Add timestamp in ISO 8601 format with 'Z' suffix for UTC 

108 dt = datetime.fromtimestamp(record.created, tz=timezone.utc) 

109 log_record["@timestamp"] = dt.isoformat().replace("+00:00", "Z") 

110 

111 # Add hostname and process ID for log aggregation - use cached values for performance 

112 log_record["hostname"] = _CACHED_HOSTNAME 

113 log_record["process_id"] = _CACHED_PID 

114 

115 # Add correlation ID from context 

116 correlation_id = get_correlation_id() 

117 if correlation_id: 

118 log_record["request_id"] = correlation_id 

119 

120 # Add OpenTelemetry trace context if available 

121 if trace is not None: 

122 try: 

123 span = trace.get_current_span() 

124 if span and span.is_recording(): 124 ↛ exitline 124 didn't return from function 'add_fields' because the condition on line 124 was always true

125 span_context = span.get_span_context() 

126 if span_context.is_valid: 

127 # Format trace_id and span_id as hex strings 

128 log_record["trace_id"] = format(span_context.trace_id, "032x") 

129 log_record["span_id"] = format(span_context.span_id, "016x") 

130 log_record["trace_flags"] = format(span_context.trace_flags, "02x") 

131 except Exception: # nosec B110 - intentionally catching all exceptions for optional tracing 

132 # Error accessing span context, continue without trace fields 

133 pass 

134 

135 

136# Create a JSON formatter with correlation ID support (uses same base format) 

137json_formatter = CorrelationIdJsonFormatter(LOG_FORMAT, datefmt=LOG_DATE_FORMAT) 

138 

139# Note: Don't use basicConfig here as it conflicts with our custom dual logging setup 

140# The LoggingService.initialize() method will properly configure all handlers 

141 

142# Global handlers will be created lazily 

143_file_handler: Optional[logging.Handler] = None 

144_text_handler: Optional[logging.StreamHandler[TextIO]] = None 

145 

146 

147def _get_file_handler() -> logging.Handler: 

148 """Get or create the file handler. 

149 

150 Returns: 

151 logging.Handler: Either a RotatingFileHandler or regular FileHandler for JSON logging. 

152 

153 Raises: 

154 ValueError: If file logging is disabled or no log file specified. 

155 

156 """ 

157 global _file_handler # pylint: disable=global-statement 

158 if _file_handler is None: 158 ↛ 178line 158 didn't jump to line 178 because the condition on line 158 was always true

159 # Only create if file logging is enabled and file is specified 

160 if not settings.log_to_file or not settings.log_file: 

161 raise ValueError("File logging is disabled or no log file specified") 

162 

163 # Ensure log folder exists 

164 if settings.log_folder: 

165 os.makedirs(settings.log_folder, exist_ok=True) 

166 log_path = os.path.join(settings.log_folder, settings.log_file) 

167 else: 

168 log_path = settings.log_file 

169 

170 # Create appropriate handler based on rotation settings 

171 if settings.log_rotation_enabled: 

172 max_bytes = settings.log_max_size_mb * 1024 * 1024 # Convert MB to bytes 

173 _file_handler = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=settings.log_backup_count, mode=settings.log_filemode) 

174 else: 

175 _file_handler = logging.FileHandler(log_path, mode=settings.log_filemode) 

176 

177 _file_handler.setFormatter(json_formatter) 

178 return _file_handler 

179 

180 

181def _get_text_handler() -> logging.StreamHandler[TextIO]: 

182 """Get or create the text handler. 

183 

184 Returns: 

185 logging.StreamHandler: The stream handler for console logging. 

186 

187 """ 

188 global _text_handler # pylint: disable=global-statement 

189 if _text_handler is None: 

190 _text_handler = logging.StreamHandler() 

191 _text_handler.setFormatter(text_formatter) 

192 return _text_handler 

193 

194 

195class StorageHandler(logging.Handler): 

196 """Custom logging handler that stores logs in LogStorageService.""" 

197 

198 def __init__(self, storage_service: LogStorageService): 

199 """Initialize the storage handler. 

200 

201 Args: 

202 storage_service: The LogStorageService instance to store logs in 

203 

204 """ 

205 super().__init__() 

206 self.storage = storage_service 

207 self.loop: AbstractEventLoop | None = None 

208 

209 def emit(self, record: logging.LogRecord) -> None: 

210 """Emit a log record to storage. 

211 

212 Args: 

213 record: The LogRecord to emit 

214 

215 """ 

216 if not self.storage: 

217 return 

218 

219 # Map Python log levels to MCP LogLevel (uses module-level cached dict) 

220 log_level = _PYTHON_TO_MCP_LEVEL_MAP.get(record.levelname, LogLevel.INFO) 

221 

222 # Extract entity context from record if available 

223 entity_type = getattr(record, "entity_type", None) 

224 entity_id = getattr(record, "entity_id", None) 

225 entity_name = getattr(record, "entity_name", None) 

226 request_id = getattr(record, "request_id", None) 

227 

228 # Format the message 

229 try: 

230 message = self.format(record) 

231 except Exception: 

232 message = record.getMessage() 

233 

234 # Store the log asynchronously 

235 try: 

236 coro = self.storage.add_log( 

237 level=log_level, 

238 message=message, 

239 entity_type=entity_type, 

240 entity_id=entity_id, 

241 entity_name=entity_name, 

242 logger=record.name, 

243 request_id=request_id, 

244 ) 

245 

246 try: 

247 # Fast path: we're already on an event loop thread. 

248 loop = asyncio.get_running_loop() 

249 self.loop = loop 

250 task = loop.create_task(coro) 

251 task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None) 

252 except RuntimeError: 

253 # Fallback: no running loop in this thread; attempt to schedule on a known loop. 

254 loop = self.loop 

255 if loop is None or not loop.is_running(): 

256 coro.close() 

257 return 

258 

259 future = asyncio.run_coroutine_threadsafe(coro, loop) 

260 future.add_done_callback(lambda f: f.exception() if not f.cancelled() else None) 

261 except Exception: 

262 # Silently fail to avoid logging recursion 

263 pass # nosec B110 - Intentional to prevent logging recursion 

264 

265 

266class _LogMessageData(TypedDict): 

267 """Log message data structure.""" 

268 

269 level: LogLevel 

270 data: Any 

271 timestamp: str 

272 logger: NotRequired[str] 

273 

274 

275class _LogMessage(TypedDict): 

276 """Log message event structure.""" 

277 

278 type: str 

279 data: _LogMessageData 

280 

281 

282class LoggingService: 

283 """MCP logging service. 

284 

285 Implements structured logging with: 

286 - RFC 5424 severity levels 

287 - Log level management 

288 - Log event subscriptions 

289 - Logger name tracking 

290 """ 

291 

292 def __init__(self) -> None: 

293 """Initialize logging service.""" 

294 self._level = LogLevel.INFO 

295 self._subscribers: List[asyncio.Queue[_LogMessage]] = [] 

296 self._loggers: Dict[str, logging.Logger] = {} 

297 self._storage: LogStorageService | None = None # Will be initialized if admin UI is enabled 

298 self._storage_handler: Optional[StorageHandler] = None # Track the storage handler for cleanup 

299 

300 async def initialize(self) -> None: 

301 """Initialize logging service. 

302 

303 Examples: 

304 >>> from mcpgateway.services.logging_service import LoggingService 

305 >>> import asyncio 

306 >>> service = LoggingService() 

307 >>> asyncio.run(service.initialize()) 

308 

309 """ 

310 # Update service log level from settings BEFORE configuring loggers 

311 self._level = LogLevel[settings.log_level.upper()] 

312 

313 root_logger = logging.getLogger() 

314 self._loggers[""] = root_logger 

315 

316 # Clear existing handlers to avoid duplicates 

317 root_logger.handlers.clear() 

318 

319 # Set root logger level to match settings - this is critical for LOG_LEVEL to work 

320 log_level = getattr(logging, settings.log_level.upper()) 

321 root_logger.setLevel(log_level) 

322 

323 # Console handler (stdout/stderr) 

324 # 

325 # LOG_FORMAT controls the console output format: 

326 # - text: human-friendly 

327 # - json: machine-friendly (Loki/ELK) and includes OTEL trace context when available 

328 if getattr(settings, "log_format", "text").lower() == "json": 

329 console_handler = logging.StreamHandler() 

330 console_handler.setFormatter(json_formatter) 

331 else: 

332 console_handler = _get_text_handler() 

333 console_handler.setLevel(log_level) 

334 root_logger.addHandler(console_handler) 

335 

336 # Only add file handler if enabled 

337 if settings.log_to_file and settings.log_file: 

338 try: 

339 file_handler = _get_file_handler() 

340 file_handler.setLevel(log_level) 

341 root_logger.addHandler(file_handler) 

342 if settings.log_rotation_enabled: 

343 logging.info(f"File logging enabled with rotation: {settings.log_folder or '.'}/{settings.log_file} (max: {settings.log_max_size_mb}MB, backups: {settings.log_backup_count})") 

344 else: 

345 logging.info(f"File logging enabled (no rotation): {settings.log_folder or '.'}/{settings.log_file}") 

346 except Exception as e: 

347 logging.warning(f"Failed to initialize file logging: {e}") 

348 else: 

349 logging.info("File logging disabled - logging to stdout/stderr only") 

350 

351 # Configure uvicorn loggers to use our handlers (for access logs) 

352 # Note: This needs to be done both at init and dynamically as uvicorn creates loggers later 

353 self._configure_uvicorn_loggers() 

354 

355 # Initialize log storage if admin UI is enabled 

356 if settings.mcpgateway_ui_enabled or settings.mcpgateway_admin_api_enabled: 

357 self._storage = LogStorageService() 

358 

359 # Add storage handler to capture all logs 

360 self._storage_handler = StorageHandler(self._storage) 

361 self._storage_handler.setFormatter(text_formatter) 

362 self._storage_handler.setLevel(log_level) 

363 root_logger.addHandler(self._storage_handler) 

364 

365 logging.info(f"Log storage initialized with {settings.log_buffer_size_mb}MB buffer") 

366 

367 logging.info("Logging service initialized") 

368 

369 # Suppress noisy upstream logs for normal stream closures in MCP streamable HTTP 

370 self._install_closedresourceerror_filter() 

371 

372 async def shutdown(self) -> None: 

373 """Shutdown logging service. 

374 

375 Examples: 

376 >>> from mcpgateway.services.logging_service import LoggingService 

377 >>> import asyncio 

378 >>> service = LoggingService() 

379 >>> asyncio.run(service.shutdown()) 

380 

381 """ 

382 # Remove storage handler from root logger if it was added 

383 if self._storage_handler: 

384 root_logger = logging.getLogger() 

385 root_logger.removeHandler(self._storage_handler) 

386 self._storage_handler = None 

387 

388 # Clear subscribers 

389 self._subscribers.clear() 

390 logging.info("Logging service shutdown") 

391 

392 def _install_closedresourceerror_filter(self) -> None: 

393 """Install a filter to drop benign ClosedResourceError logs from upstream MCP. 

394 

395 The MCP streamable HTTP server logs an ERROR when the in-memory channel is 

396 closed during normal client disconnects, raising ``anyio.ClosedResourceError``. 

397 This filter suppresses those specific records to keep logs clean. 

398 

399 Examples: 

400 >>> # Initialize service (installs filter) 

401 >>> import asyncio, logging, anyio 

402 >>> service = LoggingService() 

403 >>> asyncio.run(service.initialize()) 

404 >>> # Locate the installed filter on the target logger 

405 >>> target = logging.getLogger('mcp.server.streamable_http') 

406 >>> flts = [f for f in target.filters if f.__class__.__name__.endswith('SuppressClosedResourceErrorFilter')] 

407 >>> len(flts) >= 1 

408 True 

409 >>> filt = flts[0] 

410 >>> # Non-target logger should pass through even if message matches 

411 >>> rec_other = logging.makeLogRecord({'name': 'other.logger', 'msg': 'ClosedResourceError'}) 

412 >>> filt.filter(rec_other) 

413 True 

414 >>> # Target logger with message containing ClosedResourceError should be suppressed 

415 >>> rec_target_msg = logging.makeLogRecord({'name': 'mcp.server.streamable_http', 'msg': 'ClosedResourceError in normal shutdown'}) 

416 >>> filt.filter(rec_target_msg) 

417 False 

418 >>> # Target logger with ClosedResourceError in exc_info should be suppressed 

419 >>> try: 

420 ... raise anyio.ClosedResourceError 

421 ... except anyio.ClosedResourceError as e: 

422 ... rec_target_exc = logging.makeLogRecord({ 

423 ... 'name': 'mcp.server.streamable_http', 

424 ... 'msg': 'Error in message router', 

425 ... 'exc_info': (e.__class__, e, None), 

426 ... }) 

427 >>> filt.filter(rec_target_exc) 

428 False 

429 >>> # Cleanup 

430 >>> asyncio.run(service.shutdown()) 

431 

432 """ 

433 

434 class _SuppressClosedResourceErrorFilter(logging.Filter): 

435 """Filter to suppress ClosedResourceError exceptions from MCP streamable HTTP logger. 

436 

437 This filter prevents noisy ClosedResourceError exceptions from the upstream 

438 MCP streamable HTTP implementation from cluttering the logs. These errors 

439 are typically harmless connection cleanup events. 

440 """ 

441 

442 def filter(self, record: logging.LogRecord) -> bool: # noqa: D401 

443 """Filter log records to suppress ClosedResourceError exceptions. 

444 

445 Args: 

446 record: The log record to evaluate 

447 

448 Returns: 

449 True to allow the record through, False to suppress it 

450 

451 """ 

452 # Apply only to upstream MCP streamable HTTP logger 

453 if not record.name.startswith("mcp.server.streamable_http"): 

454 return True 

455 

456 # If exception info is present, check its type 

457 exc_info = getattr(record, "exc_info", None) 

458 if exc_info and AnyioClosedResourceError is not None: 

459 exc_type, exc, _tb = exc_info 

460 try: 

461 if isinstance(exc, AnyioClosedResourceError) or (getattr(exc_type, "__name__", "") == "ClosedResourceError"): 

462 return False 

463 except Exception: 

464 # Be permissive if anything goes wrong, don't drop logs accidentally 

465 return True 

466 

467 # Fallback: drop if message text clearly indicates ClosedResourceError 

468 try: 

469 msg = record.getMessage() 

470 if "ClosedResourceError" in msg: 

471 return False 

472 except Exception: 

473 pass # nosec B110 - Intentional to prevent logging recursion 

474 return True 

475 

476 target_logger = logging.getLogger("mcp.server.streamable_http") 

477 target_logger.addFilter(_SuppressClosedResourceErrorFilter()) 

478 

479 def get_logger(self, name: str) -> logging.Logger: 

480 """Get or create logger instance. 

481 

482 Args: 

483 name: Logger name 

484 

485 Returns: 

486 Logger instance 

487 

488 Examples: 

489 >>> from mcpgateway.services.logging_service import LoggingService 

490 >>> service = LoggingService() 

491 >>> logger = service.get_logger('test') 

492 >>> import logging 

493 >>> isinstance(logger, logging.Logger) 

494 True 

495 

496 """ 

497 if name not in self._loggers: 

498 logger = logging.getLogger(name) 

499 

500 # Don't add handlers to child loggers - let them inherit from root 

501 # This prevents duplicate logging while maintaining dual output (console + file) 

502 logger.propagate = True 

503 

504 # Set level to match service level 

505 log_level = getattr(logging, self._level.upper()) 

506 logger.setLevel(log_level) 

507 

508 self._loggers[name] = logger 

509 

510 return self._loggers[name] 

511 

512 async def set_level(self, level: LogLevel) -> None: 

513 """Set minimum log level. 

514 

515 This updates the level for all registered loggers. 

516 

517 Args: 

518 level: New log level 

519 

520 Examples: 

521 >>> from mcpgateway.services.logging_service import LoggingService 

522 >>> from mcpgateway.common.models import LogLevel 

523 >>> import asyncio 

524 >>> service = LoggingService() 

525 >>> asyncio.run(service.set_level(LogLevel.DEBUG)) 

526 

527 """ 

528 self._level = level 

529 

530 # Update all loggers 

531 log_level = getattr(logging, level.upper()) 

532 for logger in self._loggers.values(): 

533 logger.setLevel(log_level) 

534 

535 await self.notify(f"Log level set to {level}", LogLevel.INFO, "logging") 

536 

537 async def notify( # pylint: disable=too-many-positional-arguments 

538 self, 

539 data: Any, 

540 level: LogLevel, 

541 logger_name: Optional[str] = None, 

542 entity_type: Optional[str] = None, 

543 entity_id: Optional[str] = None, 

544 entity_name: Optional[str] = None, 

545 request_id: Optional[str] = None, 

546 extra_data: Optional[Dict[str, Any]] = None, 

547 ) -> None: 

548 """Send log notification to subscribers. 

549 

550 Args: 

551 data: Log message data 

552 level: Log severity level 

553 logger_name: Optional logger name 

554 entity_type: Type of entity (tool, resource, server, gateway) 

555 entity_id: ID of the related entity 

556 entity_name: Name of the related entity 

557 request_id: Associated request ID for tracing 

558 extra_data: Additional structured data 

559 

560 Examples: 

561 >>> from mcpgateway.services.logging_service import LoggingService 

562 >>> from mcpgateway.common.models import LogLevel 

563 >>> import asyncio 

564 >>> service = LoggingService() 

565 >>> asyncio.run(service.notify('test', LogLevel.INFO)) 

566 

567 """ 

568 # Skip if below current level 

569 if not self._should_log(level): 

570 return 

571 

572 # Format notification message 

573 message: _LogMessage = { 

574 "type": "log", 

575 "data": { 

576 "level": level, 

577 "data": data, 

578 "timestamp": datetime.now(timezone.utc).isoformat(), 

579 }, 

580 } 

581 if logger_name: 

582 message["data"]["logger"] = logger_name 

583 

584 # Log through standard logging 

585 logger = self.get_logger(logger_name or "") 

586 

587 # Map MCP log levels to Python logging levels (uses module-level cached dict) 

588 log_method = _MCP_TO_PYTHON_METHOD_MAP.get(level, "info") 

589 log_func = getattr(logger, log_method) 

590 log_func(data) 

591 

592 # Store in log storage if available 

593 if self._storage: 

594 await self._storage.add_log( 

595 level=level, 

596 message=str(data), 

597 entity_type=entity_type, 

598 entity_id=entity_id, 

599 entity_name=entity_name, 

600 logger=logger_name, 

601 data=extra_data, 

602 request_id=request_id, 

603 ) 

604 

605 # Notify subscribers 

606 for queue in self._subscribers: 

607 try: 

608 await queue.put(message) 

609 except Exception as e: 

610 logger.error(f"Failed to notify subscriber: {e}") 

611 

612 async def subscribe(self) -> AsyncGenerator[_LogMessage, None]: 

613 """Subscribe to log messages. 

614 

615 Returns a generator yielding log message events. 

616 

617 Yields: 

618 Log message events 

619 

620 Examples: 

621 This example was removed to prevent the test runner from hanging on async generator consumption. 

622 

623 """ 

624 queue: asyncio.Queue[_LogMessage] = asyncio.Queue() 

625 self._subscribers.append(queue) 

626 try: 

627 while True: 

628 message = await queue.get() 

629 yield message 

630 finally: 

631 self._subscribers.remove(queue) 

632 

633 def _should_log(self, level: LogLevel) -> bool: 

634 """Check if level meets minimum threshold. 

635 

636 Args: 

637 level: Log level to check 

638 

639 Returns: 

640 True if should log 

641 

642 Examples: 

643 >>> from mcpgateway.common.models import LogLevel 

644 >>> service = LoggingService() 

645 >>> service._level = LogLevel.WARNING 

646 >>> service._should_log(LogLevel.ERROR) 

647 True 

648 >>> service._should_log(LogLevel.INFO) 

649 False 

650 >>> service._should_log(LogLevel.WARNING) 

651 True 

652 >>> service._should_log(LogLevel.DEBUG) 

653 False 

654 

655 """ 

656 # Uses module-level cached dict for performance 

657 return _MCP_LEVEL_VALUES[level] >= _MCP_LEVEL_VALUES[self._level] 

658 

659 def _configure_uvicorn_loggers(self) -> None: 

660 """Configure uvicorn loggers to use our dual logging setup. 

661 

662 This method handles uvicorn's logging setup which can happen after our initialization. 

663 Uvicorn creates its own loggers and handlers, so we need to redirect them to our setup. 

664 """ 

665 uvicorn_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "uvicorn.asgi"] 

666 

667 for logger_name in uvicorn_loggers: 

668 uvicorn_logger = logging.getLogger(logger_name) 

669 

670 # Clear any handlers that uvicorn may have added 

671 uvicorn_logger.handlers.clear() 

672 

673 # Make sure they propagate to root (which has our dual handlers) 

674 uvicorn_logger.propagate = True 

675 

676 # Set level to match our logging service level 

677 if hasattr(self, "_level"): 677 ↛ 682line 677 didn't jump to line 682 because the condition on line 677 was always true

678 log_level = getattr(logging, self._level.upper()) 

679 uvicorn_logger.setLevel(log_level) 

680 

681 # Track the logger 

682 self._loggers[logger_name] = uvicorn_logger 

683 

684 def configure_uvicorn_after_startup(self) -> None: 

685 """Public method to reconfigure uvicorn loggers after server startup. 

686 

687 Call this after uvicorn has started to ensure access logs go to dual output. 

688 This handles the case where uvicorn creates loggers after our initialization. 

689 """ 

690 self._configure_uvicorn_loggers() 

691 logging.info("Uvicorn loggers reconfigured for dual logging") 

692 

693 def get_storage(self) -> Optional[LogStorageService]: 

694 """Get the log storage service if available. 

695 

696 Returns: 

697 LogStorageService instance or None if not initialized 

698 

699 """ 

700 return self._storage