Coverage for mcpgateway / services / structured_logger.py: 99%
150 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/structured_logger.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
6Structured Logger Service.
8This module provides comprehensive structured logging with component-based loggers,
9automatic enrichment, intelligent routing, and database persistence.
10"""
12# Standard
13from datetime import datetime, timezone
14from enum import Enum
15import logging
16import os
17import socket
18import traceback
19from typing import Any, Dict, List, Optional, Union
21# First-Party
22from mcpgateway.config import settings
23from mcpgateway.db import fresh_db_session, StructuredLogEntry
24from mcpgateway.services.performance_tracker import get_performance_tracker
25from mcpgateway.utils.correlation_id import get_correlation_id
27# Optional OpenTelemetry support - import once at module level for performance
28try:
29 # Third-Party
30 from opentelemetry import trace as otel_trace
32 _OTEL_AVAILABLE = True
33except ImportError:
34 otel_trace = None # type: ignore[assignment]
35 _OTEL_AVAILABLE = False
37logger = logging.getLogger(__name__)
39# Cache static values at module load - these don't change during process lifetime
40_CACHED_HOSTNAME: str = socket.gethostname()
41_CACHED_PID: int = os.getpid()
44class LogLevel(str, Enum):
45 """Log levels matching Python logging."""
47 DEBUG = "DEBUG"
48 INFO = "INFO"
49 WARNING = "WARNING"
50 ERROR = "ERROR"
51 CRITICAL = "CRITICAL"
54class LogCategory(str, Enum):
55 """Log categories for classification."""
57 APPLICATION = "application"
58 REQUEST = "request"
59 SECURITY = "security"
60 PERFORMANCE = "performance"
61 DATABASE = "database"
62 AUTHENTICATION = "authentication"
63 AUTHORIZATION = "authorization"
64 EXTERNAL_SERVICE = "external_service"
65 BUSINESS_LOGIC = "business_logic"
66 SYSTEM = "system"
69# Log level numeric values for comparison (matches Python logging module)
70_LOG_LEVEL_VALUES: Dict[str, int] = {
71 "DEBUG": logging.DEBUG, # 10
72 "INFO": logging.INFO, # 20
73 "WARNING": logging.WARNING, # 30
74 "ERROR": logging.ERROR, # 40
75 "CRITICAL": logging.CRITICAL, # 50
76}
79def _should_log(level: Union[LogLevel, str]) -> bool:
80 """Check if a log level should be processed based on settings.log_level.
82 This enables early termination of log processing to avoid expensive
83 enrichment and database operations for messages below the configured threshold.
85 Args:
86 level: The log level to check (LogLevel enum or string)
88 Returns:
89 True if the level meets or exceeds the configured threshold
90 """
91 # Get string value from enum if needed
92 level_str = level.value if isinstance(level, LogLevel) else str(level).upper()
94 # Get numeric values for comparison
95 entry_level = _LOG_LEVEL_VALUES.get(level_str, logging.INFO)
96 config_level = _LOG_LEVEL_VALUES.get(settings.log_level.upper(), logging.INFO)
98 return entry_level >= config_level
101class LogEnricher:
102 """Enriches log entries with contextual information."""
104 @staticmethod
105 def enrich(entry: Dict[str, Any]) -> Dict[str, Any]:
106 """Enrich log entry with system and context information.
108 Args:
109 entry: Base log entry
111 Returns:
112 Enriched log entry
113 """
114 # Get correlation ID
115 correlation_id = get_correlation_id()
116 if correlation_id:
117 entry["correlation_id"] = correlation_id
119 # Add hostname and process info - use cached values for performance
120 entry.setdefault("hostname", _CACHED_HOSTNAME)
121 entry.setdefault("process_id", _CACHED_PID)
123 # Add timestamp if not present
124 if "timestamp" not in entry:
125 entry["timestamp"] = datetime.now(timezone.utc)
127 # Add performance metrics if available (skip if tracker not initialized)
128 try:
129 perf_tracker = get_performance_tracker()
130 if correlation_id and perf_tracker and hasattr(perf_tracker, "get_current_operations"):
131 current_ops = perf_tracker.get_current_operations(correlation_id) # pylint: disable=no-member
132 if current_ops:
133 entry["active_operations"] = len(current_ops)
134 except Exception: # nosec B110 - Graceful degradation if performance tracker unavailable
135 pass
137 # Add OpenTelemetry trace context if available (uses module-level import)
138 if _OTEL_AVAILABLE:
139 try:
140 span = otel_trace.get_current_span()
141 if span and span.get_span_context().is_valid:
142 ctx = span.get_span_context()
143 entry["trace_id"] = format(ctx.trace_id, "032x")
144 entry["span_id"] = format(ctx.span_id, "016x")
145 except Exception: # nosec B110 - Graceful degradation
146 pass
148 return entry
151class LogRouter:
152 """Routes log entries to appropriate destinations."""
154 def __init__(self):
155 """Initialize log router."""
156 self.database_enabled = getattr(settings, "structured_logging_database_enabled", True)
157 self.external_enabled = getattr(settings, "structured_logging_external_enabled", False)
159 def route(self, entry: Dict[str, Any]) -> None:
160 """Route log entry to configured destinations.
162 Args:
163 entry: Log entry to route
164 """
165 # Always log to standard Python logger
166 self._log_to_python_logger(entry)
168 # Persist to database if enabled
169 if self.database_enabled:
170 self._persist_to_database(entry)
172 # Send to external systems if enabled
173 if self.external_enabled:
174 self._send_to_external(entry)
176 def _log_to_python_logger(self, entry: Dict[str, Any]) -> None:
177 """Log to standard Python logger.
179 Args:
180 entry: Log entry
181 """
182 level_str = entry.get("level", "INFO")
183 level = getattr(logging, level_str, logging.INFO)
185 message = entry.get("message", "")
186 component = entry.get("component", "")
188 log_message = f"[{component}] {message}" if component else message
190 # Build extra dict for structured logging
191 extra = {k: v for k, v in entry.items() if k not in ["message", "level"]}
193 logger.log(level, log_message, extra=extra)
195 def _persist_to_database(self, entry: Dict[str, Any]) -> None:
196 """Persist log entry to database.
198 Always uses a fresh, independent database session to avoid accidentally
199 committing unrelated pending objects from the caller's session.
201 Args:
202 entry: Log entry
203 """
204 try:
205 with fresh_db_session() as log_db:
206 # Build error_details JSON from error-related fields
207 error_details = None
208 if any([entry.get("error_type"), entry.get("error_message"), entry.get("error_stack_trace"), entry.get("error_context")]):
209 error_details = {
210 "error_type": entry.get("error_type"),
211 "error_message": entry.get("error_message"),
212 "error_stack_trace": entry.get("error_stack_trace"),
213 "error_context": entry.get("error_context"),
214 }
216 # Build performance_metrics JSON from performance-related fields
217 performance_metrics = None
218 perf_fields = {
219 "database_query_count": entry.get("database_query_count"),
220 "database_query_duration_ms": entry.get("database_query_duration_ms"),
221 "cache_hits": entry.get("cache_hits"),
222 "cache_misses": entry.get("cache_misses"),
223 "external_api_calls": entry.get("external_api_calls"),
224 "external_api_duration_ms": entry.get("external_api_duration_ms"),
225 "memory_usage_mb": entry.get("memory_usage_mb"),
226 "cpu_usage_percent": entry.get("cpu_usage_percent"),
227 }
228 if any(v is not None for v in perf_fields.values()):
229 performance_metrics = {k: v for k, v in perf_fields.items() if v is not None}
231 # Build threat_indicators JSON from security-related fields
232 threat_indicators = None
233 security_fields = {
234 "security_event_type": entry.get("security_event_type"),
235 "security_threat_score": entry.get("security_threat_score"),
236 "security_action_taken": entry.get("security_action_taken"),
237 }
238 if any(v is not None for v in security_fields.values()):
239 threat_indicators = {k: v for k, v in security_fields.items() if v is not None}
241 # Build context JSON from remaining fields
242 context_fields = {
243 "team_id": entry.get("team_id"),
244 "request_query": entry.get("request_query"),
245 "request_headers": entry.get("request_headers"),
246 "request_body_size": entry.get("request_body_size"),
247 "response_status_code": entry.get("response_status_code"),
248 "response_body_size": entry.get("response_body_size"),
249 "response_headers": entry.get("response_headers"),
250 "business_event_type": entry.get("business_event_type"),
251 "business_entity_type": entry.get("business_entity_type"),
252 "business_entity_id": entry.get("business_entity_id"),
253 "resource_type": entry.get("resource_type"),
254 "resource_id": entry.get("resource_id"),
255 "resource_action": entry.get("resource_action"),
256 "category": entry.get("category"),
257 "custom_fields": entry.get("custom_fields"),
258 "tags": entry.get("tags"),
259 "metadata": entry.get("metadata"),
260 }
261 context = {k: v for k, v in context_fields.items() if v is not None}
263 # Determine if this is a security event
264 is_security_event = entry.get("is_security_event", False) or bool(threat_indicators)
265 security_severity = entry.get("security_severity")
267 log_entry = StructuredLogEntry(
268 timestamp=entry.get("timestamp", datetime.now(timezone.utc)),
269 level=entry.get("level", "INFO"),
270 component=entry.get("component"),
271 message=entry.get("message", ""),
272 correlation_id=entry.get("correlation_id"),
273 request_id=entry.get("request_id"),
274 trace_id=entry.get("trace_id"),
275 span_id=entry.get("span_id"),
276 user_id=entry.get("user_id"),
277 user_email=entry.get("user_email"),
278 client_ip=entry.get("client_ip"),
279 user_agent=entry.get("user_agent"),
280 request_method=entry.get("request_method"),
281 request_path=entry.get("request_path"),
282 duration_ms=entry.get("duration_ms"),
283 operation_type=entry.get("operation_type"),
284 is_security_event=is_security_event,
285 security_severity=security_severity,
286 threat_indicators=threat_indicators,
287 context=context if context else None,
288 error_details=error_details,
289 performance_metrics=performance_metrics,
290 hostname=entry.get("hostname"),
291 process_id=entry.get("process_id"),
292 thread_id=entry.get("thread_id"),
293 environment=entry.get("environment", getattr(settings, "environment", "development")),
294 version=entry.get("version", getattr(settings, "version", "unknown")),
295 )
297 log_db.add(log_entry)
299 except Exception as e:
300 logger.error(f"Failed to persist log entry to database: {e}", exc_info=True)
302 def _send_to_external(self, entry: Dict[str, Any]) -> None:
303 """Send log entry to external systems.
305 Args:
306 entry: Log entry
307 """
308 # Placeholder for external logging integration
309 # Will be implemented in log exporters
312class StructuredLogger:
313 """Main structured logger with enrichment and routing."""
315 def __init__(self, component: str):
316 """Initialize structured logger.
318 Args:
319 component: Component name for log entries
320 """
321 self.component = component
322 self.enricher = LogEnricher()
323 self.router = LogRouter()
325 def log(
326 self,
327 level: Union[LogLevel, str],
328 message: str,
329 category: Optional[Union[LogCategory, str]] = None,
330 user_id: Optional[str] = None,
331 user_email: Optional[str] = None,
332 team_id: Optional[str] = None,
333 error: Optional[Exception] = None,
334 duration_ms: Optional[float] = None,
335 custom_fields: Optional[Dict[str, Any]] = None,
336 tags: Optional[List[str]] = None,
337 **kwargs: Any,
338 ) -> None:
339 """Log a structured message.
341 Args:
342 level: Log level
343 message: Log message
344 category: Log category
345 user_id: User identifier
346 user_email: User email
347 team_id: Team identifier
348 error: Exception object
349 duration_ms: Operation duration
350 custom_fields: Additional custom fields
351 tags: Log tags
352 **kwargs: Additional fields to include
353 """
354 # Early termination if log level is below configured threshold
355 # This avoids expensive enrichment and database operations for filtered messages
356 if not _should_log(level):
357 return
359 # Build base entry
360 entry: Dict[str, Any] = {
361 "level": level.value if isinstance(level, LogLevel) else level,
362 "component": self.component,
363 "message": message,
364 "category": category.value if isinstance(category, LogCategory) and category else category if category else None,
365 "user_id": user_id,
366 "user_email": user_email,
367 "team_id": team_id,
368 "duration_ms": duration_ms,
369 "custom_fields": custom_fields,
370 "tags": tags,
371 }
373 # Add error information if present
374 if error:
375 entry["error_type"] = type(error).__name__
376 entry["error_message"] = str(error)
377 entry["error_stack_trace"] = "".join(traceback.format_exception(type(error), error, error.__traceback__))
379 # Add any additional kwargs
380 entry.update(kwargs)
382 # Enrich entry with context
383 entry = self.enricher.enrich(entry)
385 # Route to destinations
386 self.router.route(entry)
388 def debug(self, message: str, **kwargs: Any) -> None:
389 """Log debug message.
391 Args:
392 message: Log message
393 **kwargs: Additional context fields
394 """
395 self.log(LogLevel.DEBUG, message, **kwargs)
397 def info(self, message: str, **kwargs: Any) -> None:
398 """Log info message.
400 Args:
401 message: Log message
402 **kwargs: Additional context fields
403 """
404 self.log(LogLevel.INFO, message, **kwargs)
406 def warning(self, message: str, **kwargs: Any) -> None:
407 """Log warning message.
409 Args:
410 message: Log message
411 **kwargs: Additional context fields
412 """
413 self.log(LogLevel.WARNING, message, **kwargs)
415 def error(self, message: str, error: Optional[Exception] = None, **kwargs: Any) -> None:
416 """Log error message.
418 Args:
419 message: Log message
420 error: Exception object if available
421 **kwargs: Additional context fields
422 """
423 self.log(LogLevel.ERROR, message, error=error, **kwargs)
425 def critical(self, message: str, error: Optional[Exception] = None, **kwargs: Any) -> None:
426 """Log critical message.
428 Args:
429 message: Log message
430 error: Exception object if available
431 **kwargs: Additional context fields
432 """
433 self.log(LogLevel.CRITICAL, message, error=error, **kwargs)
436class ComponentLogger:
437 """Logger factory for component-specific loggers."""
439 _loggers: Dict[str, StructuredLogger] = {}
441 @classmethod
442 def get_logger(cls, component: str) -> StructuredLogger:
443 """Get or create a logger for a specific component.
445 Args:
446 component: Component name
448 Returns:
449 StructuredLogger instance for the component
450 """
451 if component not in cls._loggers:
452 cls._loggers[component] = StructuredLogger(component)
453 return cls._loggers[component]
455 @classmethod
456 def clear_loggers(cls) -> None:
457 """Clear all cached loggers (useful for testing)."""
458 cls._loggers.clear()
461# Global structured logger instance for backward compatibility
462def get_structured_logger(component: str = "mcpgateway") -> StructuredLogger:
463 """Get a structured logger instance.
465 Args:
466 component: Component name
468 Returns:
469 StructuredLogger instance
470 """
471 return ComponentLogger.get_logger(component)