Coverage for mcpgateway / services / logging_service.py: 100%

257 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/logging_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Logging Service Implementation. 

8This module implements structured logging according to the MCP specification. 

9It supports RFC 5424 severity levels, log level management, and log event subscriptions. 

10""" 

11 

12# Standard 

13import asyncio 

14from asyncio.events import AbstractEventLoop 

15from datetime import datetime, timezone 

16import logging 

17from logging.handlers import RotatingFileHandler 

18import os 

19import socket 

20from typing import Any, AsyncGenerator, Dict, List, NotRequired, Optional, TextIO, TypedDict 

21 

22# Third-Party 

23from pythonjsonlogger import json as jsonlogger # You may need to install python-json-logger package 

24 

25# First-Party 

26from mcpgateway.common.models import LogLevel 

27from mcpgateway.config import settings 

28from mcpgateway.services.log_storage_service import LogStorageService 

29from mcpgateway.utils.correlation_id import get_correlation_id 

30from mcpgateway.utils.url_auth import sanitize_exception_message 

31 

32# Optional OpenTelemetry support (Third-Party) 

33try: 

34 # Third-Party 

35 from opentelemetry import trace # type: ignore[import-untyped] 

36except ImportError: 

37 trace = None # type: ignore[assignment] 

38 

39AnyioClosedResourceError: Optional[type] # pylint: disable=invalid-name 

40try: 

41 # Optional import; only used for filtering a known benign upstream error (Third-Party) 

42 # Third-Party 

43 from anyio import ClosedResourceError as AnyioClosedResourceError # pylint: disable=invalid-name 

44except Exception: # pragma: no cover - environment without anyio 

45 AnyioClosedResourceError = None # pylint: disable=invalid-name 

46 

47# First-Party 

48# Standard log format used across the codebase 

49LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" 

50LOG_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S" 

51 

52# Cache static values at module load - these don't change during process lifetime 

53_CACHED_HOSTNAME: str = socket.gethostname() 

54_CACHED_PID: int = os.getpid() 

55 

56# Cache level mapping dictionaries at module load to avoid recreation on every log call 

57# Maps Python log level names to MCP LogLevel enum (used in StorageHandler.emit) 

58_PYTHON_TO_MCP_LEVEL_MAP: Dict[str, LogLevel] = { 

59 "DEBUG": LogLevel.DEBUG, 

60 "INFO": LogLevel.INFO, 

61 "WARNING": LogLevel.WARNING, 

62 "ERROR": LogLevel.ERROR, 

63 "CRITICAL": LogLevel.CRITICAL, 

64} 

65 

66# Maps MCP LogLevel to Python logging method names (used in notify) 

67_MCP_TO_PYTHON_METHOD_MAP: Dict[LogLevel, str] = { 

68 LogLevel.DEBUG: "debug", 

69 LogLevel.INFO: "info", 

70 LogLevel.NOTICE: "info", # Map NOTICE to INFO 

71 LogLevel.WARNING: "warning", 

72 LogLevel.ERROR: "error", 

73 LogLevel.CRITICAL: "critical", 

74 LogLevel.ALERT: "critical", # Map ALERT to CRITICAL 

75 LogLevel.EMERGENCY: "critical", # Map EMERGENCY to CRITICAL 

76} 

77 

78# Maps MCP LogLevel to numeric values for comparison (used in _should_log) 

79_MCP_LEVEL_VALUES: Dict[LogLevel, int] = { 

80 LogLevel.DEBUG: 0, 

81 LogLevel.INFO: 1, 

82 LogLevel.NOTICE: 2, 

83 LogLevel.WARNING: 3, 

84 LogLevel.ERROR: 4, 

85 LogLevel.CRITICAL: 5, 

86 LogLevel.ALERT: 6, 

87 LogLevel.EMERGENCY: 7, 

88} 

89 

90# Create a text formatter with standard format 

91text_formatter = logging.Formatter(LOG_FORMAT, datefmt=LOG_DATE_FORMAT) 

92 

93 

94class CorrelationIdJsonFormatter(jsonlogger.JsonFormatter): 

95 """JSON formatter that includes correlation ID and OpenTelemetry trace context.""" 

96 

97 def add_fields(self, log_record: dict, record: logging.LogRecord, message_dict: dict) -> None: # pylint: disable=arguments-renamed 

98 """Add custom fields to the log record. 

99 

100 Args: 

101 log_record: The dictionary that will be logged as JSON 

102 record: The original LogRecord 

103 message_dict: Additional message fields 

104 

105 """ 

106 super().add_fields(log_record, record, message_dict) 

107 

108 # Add timestamp in ISO 8601 format with 'Z' suffix for UTC 

109 dt = datetime.fromtimestamp(record.created, tz=timezone.utc) 

110 log_record["@timestamp"] = dt.isoformat().replace("+00:00", "Z") 

111 

112 # Add hostname and process ID for log aggregation - use cached values for performance 

113 log_record["hostname"] = _CACHED_HOSTNAME 

114 log_record["process_id"] = _CACHED_PID 

115 

116 # Add correlation ID from context 

117 correlation_id = get_correlation_id() 

118 if correlation_id: 

119 log_record["request_id"] = correlation_id 

120 

121 # Add OpenTelemetry trace context if available 

122 if trace is not None: 

123 try: 

124 span = trace.get_current_span() 

125 if span and span.is_recording(): 

126 span_context = span.get_span_context() 

127 if span_context.is_valid: 

128 # Format trace_id and span_id as hex strings 

129 log_record["trace_id"] = format(span_context.trace_id, "032x") 

130 log_record["span_id"] = format(span_context.span_id, "016x") 

131 log_record["trace_flags"] = format(span_context.trace_flags, "02x") 

132 except Exception: # nosec B110 - intentionally catching all exceptions for optional tracing 

133 # Error accessing span context, continue without trace fields 

134 pass 

135 

136 

137# Create a JSON formatter with correlation ID support (uses same base format) 

138json_formatter = CorrelationIdJsonFormatter(LOG_FORMAT, datefmt=LOG_DATE_FORMAT) 

139 

140# Note: Don't use basicConfig here as it conflicts with our custom dual logging setup 

141# The LoggingService.initialize() method will properly configure all handlers 

142 

143# Global handlers will be created lazily 

144_file_handler: Optional[logging.Handler] = None 

145_text_handler: Optional[logging.StreamHandler[TextIO]] = None 

146 

147 

148def _get_file_handler() -> logging.Handler: 

149 """Get or create the file handler. 

150 

151 Returns: 

152 logging.Handler: Either a RotatingFileHandler or regular FileHandler for JSON logging. 

153 

154 Raises: 

155 ValueError: If file logging is disabled or no log file specified. 

156 

157 """ 

158 global _file_handler # pylint: disable=global-statement 

159 if _file_handler is None: 

160 # Only create if file logging is enabled and file is specified 

161 if not settings.log_to_file or not settings.log_file: 

162 raise ValueError("File logging is disabled or no log file specified") 

163 

164 # Ensure log folder exists 

165 if settings.log_folder: 

166 os.makedirs(settings.log_folder, exist_ok=True) 

167 log_path = os.path.join(settings.log_folder, settings.log_file) 

168 else: 

169 log_path = settings.log_file 

170 

171 # Create appropriate handler based on rotation settings 

172 if settings.log_rotation_enabled: 

173 max_bytes = settings.log_max_size_mb * 1024 * 1024 # Convert MB to bytes 

174 _file_handler = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=settings.log_backup_count, mode=settings.log_filemode) 

175 else: 

176 _file_handler = logging.FileHandler(log_path, mode=settings.log_filemode) 

177 

178 _file_handler.setFormatter(json_formatter) 

179 return _file_handler 

180 

181 

182def _get_text_handler() -> logging.StreamHandler[TextIO]: 

183 """Get or create the text handler. 

184 

185 Returns: 

186 logging.StreamHandler: The stream handler for console logging. 

187 

188 """ 

189 global _text_handler # pylint: disable=global-statement 

190 if _text_handler is None: 

191 _text_handler = logging.StreamHandler() 

192 _text_handler.setFormatter(text_formatter) 

193 return _text_handler 

194 

195 

196class StorageHandler(logging.Handler): 

197 """Custom logging handler that stores logs in LogStorageService.""" 

198 

199 def __init__(self, storage_service: LogStorageService): 

200 """Initialize the storage handler. 

201 

202 Args: 

203 storage_service: The LogStorageService instance to store logs in 

204 

205 """ 

206 super().__init__() 

207 self.storage = storage_service 

208 self.loop: AbstractEventLoop | None = None 

209 

210 def emit(self, record: logging.LogRecord) -> None: 

211 """Emit a log record to storage. 

212 

213 Args: 

214 record: The LogRecord to emit 

215 

216 """ 

217 if not self.storage: 

218 return 

219 

220 # Map Python log levels to MCP LogLevel (uses module-level cached dict) 

221 log_level = _PYTHON_TO_MCP_LEVEL_MAP.get(record.levelname, LogLevel.INFO) 

222 

223 # Extract entity context from record if available 

224 entity_type = getattr(record, "entity_type", None) 

225 entity_id = getattr(record, "entity_id", None) 

226 entity_name = getattr(record, "entity_name", None) 

227 request_id = getattr(record, "request_id", None) 

228 

229 # Format the message 

230 try: 

231 message = self.format(record) 

232 except Exception: 

233 message = record.getMessage() 

234 

235 # Store the log asynchronously 

236 try: 

237 coro = self.storage.add_log( 

238 level=log_level, 

239 message=message, 

240 entity_type=entity_type, 

241 entity_id=entity_id, 

242 entity_name=entity_name, 

243 logger=record.name, 

244 request_id=request_id, 

245 ) 

246 

247 try: 

248 # Fast path: we're already on an event loop thread. 

249 loop = asyncio.get_running_loop() 

250 self.loop = loop 

251 task = loop.create_task(coro) 

252 task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None) 

253 except RuntimeError: 

254 # Fallback: no running loop in this thread; attempt to schedule on a known loop. 

255 loop = self.loop 

256 if loop is None or not loop.is_running(): 

257 coro.close() 

258 return 

259 

260 future = asyncio.run_coroutine_threadsafe(coro, loop) 

261 future.add_done_callback(lambda f: f.exception() if not f.cancelled() else None) 

262 except Exception: 

263 # Silently fail to avoid logging recursion 

264 pass # nosec B110 - Intentional to prevent logging recursion 

265 

266 

267class _LogMessageData(TypedDict): 

268 """Log message data structure.""" 

269 

270 level: LogLevel 

271 data: Any 

272 timestamp: str 

273 logger: NotRequired[str] 

274 

275 

276class _LogMessage(TypedDict): 

277 """Log message event structure.""" 

278 

279 type: str 

280 data: _LogMessageData 

281 

282 

283class LoggingService: 

284 """MCP logging service. 

285 

286 Implements structured logging with: 

287 - RFC 5424 severity levels 

288 - Log level management 

289 - Log event subscriptions 

290 - Logger name tracking 

291 """ 

292 

293 def __init__(self) -> None: 

294 """Initialize logging service.""" 

295 self._level = LogLevel.INFO 

296 self._subscribers: List[asyncio.Queue[_LogMessage]] = [] 

297 self._loggers: Dict[str, logging.Logger] = {} 

298 self._storage: LogStorageService | None = None # Will be initialized if admin UI is enabled 

299 self._storage_handler: Optional[StorageHandler] = None # Track the storage handler for cleanup 

300 

301 async def initialize(self) -> None: 

302 """Initialize logging service. 

303 

304 Examples: 

305 >>> from mcpgateway.services.logging_service import LoggingService 

306 >>> import asyncio 

307 >>> service = LoggingService() 

308 >>> asyncio.run(service.initialize()) 

309 

310 """ 

311 # Update service log level from settings BEFORE configuring loggers 

312 self._level = LogLevel[settings.log_level.upper()] 

313 

314 root_logger = logging.getLogger() 

315 self._loggers[""] = root_logger 

316 

317 # Clear existing handlers to avoid duplicates 

318 root_logger.handlers.clear() 

319 

320 # Set root logger level to match settings - this is critical for LOG_LEVEL to work 

321 log_level = getattr(logging, settings.log_level.upper()) 

322 root_logger.setLevel(log_level) 

323 

324 # Console handler (stdout/stderr) 

325 # 

326 # LOG_FORMAT controls the console output format: 

327 # - text: human-friendly 

328 # - json: machine-friendly (Loki/ELK) and includes OTEL trace context when available 

329 if getattr(settings, "log_format", "text").lower() == "json": 

330 console_handler = logging.StreamHandler() 

331 console_handler.setFormatter(json_formatter) 

332 else: 

333 console_handler = _get_text_handler() 

334 console_handler.setLevel(log_level) 

335 root_logger.addHandler(console_handler) 

336 

337 # Only add file handler if enabled 

338 if settings.log_to_file and settings.log_file: 

339 try: 

340 file_handler = _get_file_handler() 

341 file_handler.setLevel(log_level) 

342 root_logger.addHandler(file_handler) 

343 if settings.log_rotation_enabled: 

344 logging.info(f"File logging enabled with rotation: {settings.log_folder or '.'}/{settings.log_file} (max: {settings.log_max_size_mb}MB, backups: {settings.log_backup_count})") 

345 else: 

346 logging.info(f"File logging enabled (no rotation): {settings.log_folder or '.'}/{settings.log_file}") 

347 except Exception as e: 

348 logging.warning(f"Failed to initialize file logging: {e}") 

349 else: 

350 logging.info("File logging disabled - logging to stdout/stderr only") 

351 

352 # Configure uvicorn loggers to use our handlers (for access logs) 

353 # Note: This needs to be done both at init and dynamically as uvicorn creates loggers later 

354 self._configure_uvicorn_loggers() 

355 

356 # Initialize log storage if admin UI is enabled 

357 if settings.mcpgateway_ui_enabled or settings.mcpgateway_admin_api_enabled: 

358 self._storage = LogStorageService() 

359 

360 # Add storage handler to capture all logs 

361 self._storage_handler = StorageHandler(self._storage) 

362 self._storage_handler.setFormatter(text_formatter) 

363 self._storage_handler.setLevel(log_level) 

364 root_logger.addHandler(self._storage_handler) 

365 

366 logging.info(f"Log storage initialized with {settings.log_buffer_size_mb}MB buffer") 

367 

368 logging.info("Logging service initialized") 

369 

370 # Suppress noisy upstream logs for normal stream closures in MCP streamable HTTP 

371 self._install_closedresourceerror_filter() 

372 

373 # Redact sensitive query parameters from httpx/httpcore log messages 

374 self._install_httpx_url_sanitize_filter() 

375 

376 async def shutdown(self) -> None: 

377 """Shutdown logging service. 

378 

379 Examples: 

380 >>> from mcpgateway.services.logging_service import LoggingService 

381 >>> import asyncio 

382 >>> service = LoggingService() 

383 >>> asyncio.run(service.shutdown()) 

384 

385 """ 

386 # Remove storage handler from root logger if it was added 

387 if self._storage_handler: 

388 root_logger = logging.getLogger() 

389 root_logger.removeHandler(self._storage_handler) 

390 self._storage_handler = None 

391 

392 # Clear subscribers 

393 self._subscribers.clear() 

394 logging.info("Logging service shutdown") 

395 

396 def _install_closedresourceerror_filter(self) -> None: 

397 """Install a filter to drop benign ClosedResourceError logs from upstream MCP. 

398 

399 The MCP streamable HTTP server logs an ERROR when the in-memory channel is 

400 closed during normal client disconnects, raising ``anyio.ClosedResourceError``. 

401 This filter suppresses those specific records to keep logs clean. 

402 

403 Examples: 

404 >>> # Initialize service (installs filter) 

405 >>> import asyncio, logging, anyio 

406 >>> service = LoggingService() 

407 >>> asyncio.run(service.initialize()) 

408 >>> # Locate the installed filter on the target logger 

409 >>> target = logging.getLogger('mcp.server.streamable_http') 

410 >>> flts = [f for f in target.filters if f.__class__.__name__.endswith('SuppressClosedResourceErrorFilter')] 

411 >>> len(flts) >= 1 

412 True 

413 >>> filt = flts[0] 

414 >>> # Non-target logger should pass through even if message matches 

415 >>> rec_other = logging.makeLogRecord({'name': 'other.logger', 'msg': 'ClosedResourceError'}) 

416 >>> filt.filter(rec_other) 

417 True 

418 >>> # Target logger with message containing ClosedResourceError should be suppressed 

419 >>> rec_target_msg = logging.makeLogRecord({'name': 'mcp.server.streamable_http', 'msg': 'ClosedResourceError in normal shutdown'}) 

420 >>> filt.filter(rec_target_msg) 

421 False 

422 >>> # Target logger with ClosedResourceError in exc_info should be suppressed 

423 >>> try: 

424 ... raise anyio.ClosedResourceError 

425 ... except anyio.ClosedResourceError as e: 

426 ... rec_target_exc = logging.makeLogRecord({ 

427 ... 'name': 'mcp.server.streamable_http', 

428 ... 'msg': 'Error in message router', 

429 ... 'exc_info': (e.__class__, e, None), 

430 ... }) 

431 >>> filt.filter(rec_target_exc) 

432 False 

433 >>> # Cleanup 

434 >>> asyncio.run(service.shutdown()) 

435 

436 """ 

437 

438 class _SuppressClosedResourceErrorFilter(logging.Filter): 

439 """Filter to suppress ClosedResourceError exceptions from MCP streamable HTTP logger. 

440 

441 This filter prevents noisy ClosedResourceError exceptions from the upstream 

442 MCP streamable HTTP implementation from cluttering the logs. These errors 

443 are typically harmless connection cleanup events. 

444 """ 

445 

446 def filter(self, record: logging.LogRecord) -> bool: # noqa: D401 

447 """Filter log records to suppress ClosedResourceError exceptions. 

448 

449 Args: 

450 record: The log record to evaluate 

451 

452 Returns: 

453 True to allow the record through, False to suppress it 

454 

455 """ 

456 # Apply only to upstream MCP streamable HTTP logger 

457 if not record.name.startswith("mcp.server.streamable_http"): 

458 return True 

459 

460 # If exception info is present, check its type 

461 exc_info = getattr(record, "exc_info", None) 

462 if exc_info and AnyioClosedResourceError is not None: 

463 exc_type, exc, _tb = exc_info 

464 try: 

465 if isinstance(exc, AnyioClosedResourceError) or (getattr(exc_type, "__name__", "") == "ClosedResourceError"): 

466 return False 

467 except Exception: 

468 # Be permissive if anything goes wrong, don't drop logs accidentally 

469 return True 

470 

471 # Fallback: drop if message text clearly indicates ClosedResourceError 

472 try: 

473 msg = record.getMessage() 

474 if "ClosedResourceError" in msg: 

475 return False 

476 except Exception: 

477 pass # nosec B110 - Intentional to prevent logging recursion 

478 return True 

479 

480 target_logger = logging.getLogger("mcp.server.streamable_http") 

481 target_logger.addFilter(_SuppressClosedResourceErrorFilter()) 

482 

483 @staticmethod 

484 def _install_httpx_url_sanitize_filter() -> None: 

485 """Install a filter to redact sensitive query parameters from httpx/httpcore log messages. 

486 

487 httpx and httpcore log full request URLs at INFO level, bypassing 

488 application-level sanitization. This filter intercepts those log 

489 records and redacts sensitive query parameters (api_key, token, etc.) 

490 before they reach any handler. 

491 

492 Examples: 

493 >>> import asyncio, logging 

494 >>> service = LoggingService() 

495 >>> asyncio.run(service.initialize()) 

496 >>> filt = [f for f in logging.getLogger('httpx').filters 

497 ... if f.__class__.__name__ == '_HttpxUrlSanitizeFilter'][0] 

498 >>> rec = logging.makeLogRecord({ 

499 ... 'name': 'httpx', 

500 ... 'msg': 'HTTP Request: GET https://example.mcp.server.com/sse?api_key=secret-value "HTTP/1.1 200 OK"', 

501 ... }) 

502 >>> filt.filter(rec) 

503 True 

504 >>> 'secret-value' not in rec.getMessage() 

505 True 

506 >>> 'api_key=REDACTED' in rec.getMessage() 

507 True 

508 >>> asyncio.run(service.shutdown()) 

509 

510 """ 

511 

512 class _HttpxUrlSanitizeFilter(logging.Filter): 

513 """Filter that redacts sensitive query parameters from URLs in httpx log messages.""" 

514 

515 def filter(self, record: logging.LogRecord) -> bool: # noqa: D401 

516 """Sanitize URLs in the log record message, then allow it through. 

517 

518 Args: 

519 record: The log record to sanitize. 

520 

521 Returns: 

522 Always True (record is never suppressed, only sanitized). 

523 

524 """ 

525 try: 

526 msg = record.getMessage() 

527 sanitized = sanitize_exception_message(msg) 

528 if sanitized != msg: 

529 record.msg = sanitized 

530 record.args = None 

531 except Exception: 

532 pass # nosec B110 - Never break logging due to sanitization failure 

533 return True 

534 

535 url_filter = _HttpxUrlSanitizeFilter() 

536 for logger_name in ("httpx", "httpcore"): 

537 logging.getLogger(logger_name).addFilter(url_filter) 

538 

539 def get_logger(self, name: str) -> logging.Logger: 

540 """Get or create logger instance. 

541 

542 Args: 

543 name: Logger name 

544 

545 Returns: 

546 Logger instance 

547 

548 Examples: 

549 >>> from mcpgateway.services.logging_service import LoggingService 

550 >>> service = LoggingService() 

551 >>> logger = service.get_logger('test') 

552 >>> import logging 

553 >>> isinstance(logger, logging.Logger) 

554 True 

555 

556 """ 

557 if name not in self._loggers: 

558 logger = logging.getLogger(name) 

559 

560 # Don't add handlers to child loggers - let them inherit from root 

561 # This prevents duplicate logging while maintaining dual output (console + file) 

562 logger.propagate = True 

563 

564 # Don't set level on child loggers - let them inherit from root logger 

565 # This ensures LOG_LEVEL environment variable is respected after initialize() runs 

566 # The root logger level is set in initialize() based on settings.log_level 

567 

568 self._loggers[name] = logger 

569 

570 return self._loggers[name] 

571 

572 async def set_level(self, level: LogLevel) -> None: 

573 """Set minimum log level. 

574 

575 This updates the level for all registered loggers. 

576 

577 Args: 

578 level: New log level 

579 

580 Examples: 

581 >>> from mcpgateway.services.logging_service import LoggingService 

582 >>> from mcpgateway.common.models import LogLevel 

583 >>> import asyncio 

584 >>> service = LoggingService() 

585 >>> asyncio.run(service.set_level(LogLevel.DEBUG)) 

586 

587 """ 

588 self._level = level 

589 

590 # Update all loggers and handlers 

591 log_level = getattr(logging, level.upper()) 

592 

593 # Update Python root logger so new child loggers inherit the correct level 

594 logging.getLogger().setLevel(log_level) 

595 

596 # Update handler levels so they don't filter out records the logger passes 

597 for handler in logging.getLogger().handlers: 

598 handler.setLevel(log_level) 

599 

600 for logger in self._loggers.values(): 

601 logger.setLevel(log_level) 

602 

603 await self.notify(f"Log level set to {level}", LogLevel.INFO, "logging") 

604 

605 async def notify( # pylint: disable=too-many-positional-arguments 

606 self, 

607 data: Any, 

608 level: LogLevel, 

609 logger_name: Optional[str] = None, 

610 entity_type: Optional[str] = None, 

611 entity_id: Optional[str] = None, 

612 entity_name: Optional[str] = None, 

613 request_id: Optional[str] = None, 

614 extra_data: Optional[Dict[str, Any]] = None, 

615 ) -> None: 

616 """Send log notification to subscribers. 

617 

618 Args: 

619 data: Log message data 

620 level: Log severity level 

621 logger_name: Optional logger name 

622 entity_type: Type of entity (tool, resource, server, gateway) 

623 entity_id: ID of the related entity 

624 entity_name: Name of the related entity 

625 request_id: Associated request ID for tracing 

626 extra_data: Additional structured data 

627 

628 Examples: 

629 >>> from mcpgateway.services.logging_service import LoggingService 

630 >>> from mcpgateway.common.models import LogLevel 

631 >>> import asyncio 

632 >>> service = LoggingService() 

633 >>> asyncio.run(service.notify('test', LogLevel.INFO)) 

634 

635 """ 

636 # Skip if below current level 

637 if not self._should_log(level): 

638 return 

639 

640 # Format notification message 

641 message: _LogMessage = { 

642 "type": "log", 

643 "data": { 

644 "level": level, 

645 "data": data, 

646 "timestamp": datetime.now(timezone.utc).isoformat(), 

647 }, 

648 } 

649 if logger_name: 

650 message["data"]["logger"] = logger_name 

651 

652 # Log through standard logging 

653 logger = self.get_logger(logger_name or "") 

654 

655 # Map MCP log levels to Python logging levels (uses module-level cached dict) 

656 log_method = _MCP_TO_PYTHON_METHOD_MAP.get(level, "info") 

657 log_func = getattr(logger, log_method) 

658 log_func(data) 

659 

660 # Store in log storage if available 

661 if self._storage: 

662 await self._storage.add_log( 

663 level=level, 

664 message=str(data), 

665 entity_type=entity_type, 

666 entity_id=entity_id, 

667 entity_name=entity_name, 

668 logger=logger_name, 

669 data=extra_data, 

670 request_id=request_id, 

671 ) 

672 

673 # Notify subscribers 

674 for queue in self._subscribers: 

675 try: 

676 await queue.put(message) 

677 except Exception as e: 

678 logger.error(f"Failed to notify subscriber: {e}") 

679 

680 async def subscribe(self) -> AsyncGenerator[_LogMessage, None]: 

681 """Subscribe to log messages. 

682 

683 Returns a generator yielding log message events. 

684 

685 Yields: 

686 Log message events 

687 

688 Examples: 

689 This example was removed to prevent the test runner from hanging on async generator consumption. 

690 

691 """ 

692 queue: asyncio.Queue[_LogMessage] = asyncio.Queue() 

693 self._subscribers.append(queue) 

694 try: 

695 while True: 

696 message = await queue.get() 

697 yield message 

698 finally: 

699 self._subscribers.remove(queue) 

700 

701 def _should_log(self, level: LogLevel) -> bool: 

702 """Check if level meets minimum threshold. 

703 

704 Args: 

705 level: Log level to check 

706 

707 Returns: 

708 True if should log 

709 

710 Examples: 

711 >>> from mcpgateway.common.models import LogLevel 

712 >>> service = LoggingService() 

713 >>> service._level = LogLevel.WARNING 

714 >>> service._should_log(LogLevel.ERROR) 

715 True 

716 >>> service._should_log(LogLevel.INFO) 

717 False 

718 >>> service._should_log(LogLevel.WARNING) 

719 True 

720 >>> service._should_log(LogLevel.DEBUG) 

721 False 

722 

723 """ 

724 # Uses module-level cached dict for performance 

725 return _MCP_LEVEL_VALUES[level] >= _MCP_LEVEL_VALUES[self._level] 

726 

727 def _configure_uvicorn_loggers(self) -> None: 

728 """Configure uvicorn loggers to use our dual logging setup. 

729 

730 This method handles uvicorn's logging setup which can happen after our initialization. 

731 Uvicorn creates its own loggers and handlers, so we need to redirect them to our setup. 

732 """ 

733 uvicorn_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "uvicorn.asgi"] 

734 

735 for logger_name in uvicorn_loggers: 

736 uvicorn_logger = logging.getLogger(logger_name) 

737 

738 # Clear any handlers that uvicorn may have added 

739 uvicorn_logger.handlers.clear() 

740 

741 # Make sure they propagate to root (which has our dual handlers) 

742 uvicorn_logger.propagate = True 

743 

744 # Set level to match our logging service level 

745 if hasattr(self, "_level"): 

746 log_level = getattr(logging, self._level.upper()) 

747 uvicorn_logger.setLevel(log_level) 

748 

749 # Track the logger 

750 self._loggers[logger_name] = uvicorn_logger 

751 

752 def configure_uvicorn_after_startup(self) -> None: 

753 """Public method to reconfigure uvicorn loggers after server startup. 

754 

755 Call this after uvicorn has started to ensure access logs go to dual output. 

756 This handles the case where uvicorn creates loggers after our initialization. 

757 """ 

758 self._configure_uvicorn_loggers() 

759 logging.info("Uvicorn loggers reconfigured for dual logging") 

760 

761 def get_storage(self) -> Optional[LogStorageService]: 

762 """Get the log storage service if available. 

763 

764 Returns: 

765 LogStorageService instance or None if not initialized 

766 

767 """ 

768 return self._storage