Coverage for mcpgateway / services / structured_logger.py: 99%
161 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/structured_logger.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
6Structured Logger Service.
8This module provides comprehensive structured logging with component-based loggers,
9automatic enrichment, intelligent routing, and database persistence.
10"""
12# Standard
13from datetime import datetime, timezone
14from enum import Enum
15import logging
16import os
17import socket
18import traceback
19from typing import Any, Dict, List, Optional, Union
21# Third-Party
22from sqlalchemy.orm import Session
24# First-Party
25from mcpgateway.config import settings
26from mcpgateway.db import SessionLocal, StructuredLogEntry
27from mcpgateway.services.performance_tracker import get_performance_tracker
28from mcpgateway.utils.correlation_id import get_correlation_id
30# Optional OpenTelemetry support - import once at module level for performance
31try:
32 # Third-Party
33 from opentelemetry import trace as otel_trace
35 _OTEL_AVAILABLE = True
36except ImportError:
37 otel_trace = None # type: ignore[assignment]
38 _OTEL_AVAILABLE = False
40logger = logging.getLogger(__name__)
42# Cache static values at module load - these don't change during process lifetime
43_CACHED_HOSTNAME: str = socket.gethostname()
44_CACHED_PID: int = os.getpid()
47class LogLevel(str, Enum):
48 """Log levels matching Python logging."""
50 DEBUG = "DEBUG"
51 INFO = "INFO"
52 WARNING = "WARNING"
53 ERROR = "ERROR"
54 CRITICAL = "CRITICAL"
57class LogCategory(str, Enum):
58 """Log categories for classification."""
60 APPLICATION = "application"
61 REQUEST = "request"
62 SECURITY = "security"
63 PERFORMANCE = "performance"
64 DATABASE = "database"
65 AUTHENTICATION = "authentication"
66 AUTHORIZATION = "authorization"
67 EXTERNAL_SERVICE = "external_service"
68 BUSINESS_LOGIC = "business_logic"
69 SYSTEM = "system"
72# Log level numeric values for comparison (matches Python logging module)
73_LOG_LEVEL_VALUES: Dict[str, int] = {
74 "DEBUG": logging.DEBUG, # 10
75 "INFO": logging.INFO, # 20
76 "WARNING": logging.WARNING, # 30
77 "ERROR": logging.ERROR, # 40
78 "CRITICAL": logging.CRITICAL, # 50
79}
82def _should_log(level: Union[LogLevel, str]) -> bool:
83 """Check if a log level should be processed based on settings.log_level.
85 This enables early termination of log processing to avoid expensive
86 enrichment and database operations for messages below the configured threshold.
88 Args:
89 level: The log level to check (LogLevel enum or string)
91 Returns:
92 True if the level meets or exceeds the configured threshold
93 """
94 # Get string value from enum if needed
95 level_str = level.value if isinstance(level, LogLevel) else str(level).upper()
97 # Get numeric values for comparison
98 entry_level = _LOG_LEVEL_VALUES.get(level_str, logging.INFO)
99 config_level = _LOG_LEVEL_VALUES.get(settings.log_level.upper(), logging.INFO)
101 return entry_level >= config_level
104class LogEnricher:
105 """Enriches log entries with contextual information."""
107 @staticmethod
108 def enrich(entry: Dict[str, Any]) -> Dict[str, Any]:
109 """Enrich log entry with system and context information.
111 Args:
112 entry: Base log entry
114 Returns:
115 Enriched log entry
116 """
117 # Get correlation ID
118 correlation_id = get_correlation_id()
119 if correlation_id:
120 entry["correlation_id"] = correlation_id
122 # Add hostname and process info - use cached values for performance
123 entry.setdefault("hostname", _CACHED_HOSTNAME)
124 entry.setdefault("process_id", _CACHED_PID)
126 # Add timestamp if not present
127 if "timestamp" not in entry:
128 entry["timestamp"] = datetime.now(timezone.utc)
130 # Add performance metrics if available (skip if tracker not initialized)
131 try:
132 perf_tracker = get_performance_tracker()
133 if correlation_id and perf_tracker and hasattr(perf_tracker, "get_current_operations"):
134 current_ops = perf_tracker.get_current_operations(correlation_id) # pylint: disable=no-member
135 if current_ops: 135 ↛ 141line 135 didn't jump to line 141 because the condition on line 135 was always true
136 entry["active_operations"] = len(current_ops)
137 except Exception: # nosec B110 - Graceful degradation if performance tracker unavailable
138 pass
140 # Add OpenTelemetry trace context if available (uses module-level import)
141 if _OTEL_AVAILABLE:
142 try:
143 span = otel_trace.get_current_span()
144 if span and span.get_span_context().is_valid:
145 ctx = span.get_span_context()
146 entry["trace_id"] = format(ctx.trace_id, "032x")
147 entry["span_id"] = format(ctx.span_id, "016x")
148 except Exception: # nosec B110 - Graceful degradation
149 pass
151 return entry
154class LogRouter:
155 """Routes log entries to appropriate destinations."""
157 def __init__(self):
158 """Initialize log router."""
159 self.database_enabled = getattr(settings, "structured_logging_database_enabled", True)
160 self.external_enabled = getattr(settings, "structured_logging_external_enabled", False)
162 def route(self, entry: Dict[str, Any], db: Optional[Session] = None) -> None:
163 """Route log entry to configured destinations.
165 Args:
166 entry: Log entry to route
167 db: Optional database session
168 """
169 # Always log to standard Python logger
170 self._log_to_python_logger(entry)
172 # Persist to database if enabled
173 if self.database_enabled:
174 self._persist_to_database(entry, db)
176 # Send to external systems if enabled
177 if self.external_enabled:
178 self._send_to_external(entry)
180 def _log_to_python_logger(self, entry: Dict[str, Any]) -> None:
181 """Log to standard Python logger.
183 Args:
184 entry: Log entry
185 """
186 level_str = entry.get("level", "INFO")
187 level = getattr(logging, level_str, logging.INFO)
189 message = entry.get("message", "")
190 component = entry.get("component", "")
192 log_message = f"[{component}] {message}" if component else message
194 # Build extra dict for structured logging
195 extra = {k: v for k, v in entry.items() if k not in ["message", "level"]}
197 logger.log(level, log_message, extra=extra)
199 def _persist_to_database(self, entry: Dict[str, Any], db: Optional[Session] = None) -> None:
200 """Persist log entry to database.
202 Args:
203 entry: Log entry
204 db: Optional database session
205 """
206 should_close = False
207 if db is None:
208 db = SessionLocal()
209 should_close = True
211 try:
212 # Build error_details JSON from error-related fields
213 error_details = None
214 if any([entry.get("error_type"), entry.get("error_message"), entry.get("error_stack_trace"), entry.get("error_context")]):
215 error_details = {
216 "error_type": entry.get("error_type"),
217 "error_message": entry.get("error_message"),
218 "error_stack_trace": entry.get("error_stack_trace"),
219 "error_context": entry.get("error_context"),
220 }
222 # Build performance_metrics JSON from performance-related fields
223 performance_metrics = None
224 perf_fields = {
225 "database_query_count": entry.get("database_query_count"),
226 "database_query_duration_ms": entry.get("database_query_duration_ms"),
227 "cache_hits": entry.get("cache_hits"),
228 "cache_misses": entry.get("cache_misses"),
229 "external_api_calls": entry.get("external_api_calls"),
230 "external_api_duration_ms": entry.get("external_api_duration_ms"),
231 "memory_usage_mb": entry.get("memory_usage_mb"),
232 "cpu_usage_percent": entry.get("cpu_usage_percent"),
233 }
234 if any(v is not None for v in perf_fields.values()):
235 performance_metrics = {k: v for k, v in perf_fields.items() if v is not None}
237 # Build threat_indicators JSON from security-related fields
238 threat_indicators = None
239 security_fields = {
240 "security_event_type": entry.get("security_event_type"),
241 "security_threat_score": entry.get("security_threat_score"),
242 "security_action_taken": entry.get("security_action_taken"),
243 }
244 if any(v is not None for v in security_fields.values()):
245 threat_indicators = {k: v for k, v in security_fields.items() if v is not None}
247 # Build context JSON from remaining fields
248 context_fields = {
249 "team_id": entry.get("team_id"),
250 "request_query": entry.get("request_query"),
251 "request_headers": entry.get("request_headers"),
252 "request_body_size": entry.get("request_body_size"),
253 "response_status_code": entry.get("response_status_code"),
254 "response_body_size": entry.get("response_body_size"),
255 "response_headers": entry.get("response_headers"),
256 "business_event_type": entry.get("business_event_type"),
257 "business_entity_type": entry.get("business_entity_type"),
258 "business_entity_id": entry.get("business_entity_id"),
259 "resource_type": entry.get("resource_type"),
260 "resource_id": entry.get("resource_id"),
261 "resource_action": entry.get("resource_action"),
262 "category": entry.get("category"),
263 "custom_fields": entry.get("custom_fields"),
264 "tags": entry.get("tags"),
265 "metadata": entry.get("metadata"),
266 }
267 context = {k: v for k, v in context_fields.items() if v is not None}
269 # Determine if this is a security event
270 is_security_event = entry.get("is_security_event", False) or bool(threat_indicators)
271 security_severity = entry.get("security_severity")
273 log_entry = StructuredLogEntry(
274 timestamp=entry.get("timestamp", datetime.now(timezone.utc)),
275 level=entry.get("level", "INFO"),
276 component=entry.get("component"),
277 message=entry.get("message", ""),
278 correlation_id=entry.get("correlation_id"),
279 request_id=entry.get("request_id"),
280 trace_id=entry.get("trace_id"),
281 span_id=entry.get("span_id"),
282 user_id=entry.get("user_id"),
283 user_email=entry.get("user_email"),
284 client_ip=entry.get("client_ip"),
285 user_agent=entry.get("user_agent"),
286 request_method=entry.get("request_method"),
287 request_path=entry.get("request_path"),
288 duration_ms=entry.get("duration_ms"),
289 operation_type=entry.get("operation_type"),
290 is_security_event=is_security_event,
291 security_severity=security_severity,
292 threat_indicators=threat_indicators,
293 context=context if context else None,
294 error_details=error_details,
295 performance_metrics=performance_metrics,
296 hostname=entry.get("hostname"),
297 process_id=entry.get("process_id"),
298 thread_id=entry.get("thread_id"),
299 environment=entry.get("environment", getattr(settings, "environment", "development")),
300 version=entry.get("version", getattr(settings, "version", "unknown")),
301 )
303 db.add(log_entry)
304 db.commit()
306 except Exception as e:
307 logger.error(f"Failed to persist log entry to database: {e}", exc_info=True)
308 # Also print to console for immediate visibility
309 print(f"ERROR persisting log to database: {e}")
310 traceback.print_exc()
311 if db: 311 ↛ 315line 311 didn't jump to line 315 because the condition on line 311 was always true
312 db.rollback()
314 finally:
315 if should_close:
316 db.close() # Commit/rollback already handled above
318 def _send_to_external(self, entry: Dict[str, Any]) -> None:
319 """Send log entry to external systems.
321 Args:
322 entry: Log entry
323 """
324 # Placeholder for external logging integration
325 # Will be implemented in log exporters
328class StructuredLogger:
329 """Main structured logger with enrichment and routing."""
331 def __init__(self, component: str):
332 """Initialize structured logger.
334 Args:
335 component: Component name for log entries
336 """
337 self.component = component
338 self.enricher = LogEnricher()
339 self.router = LogRouter()
341 def log(
342 self,
343 level: Union[LogLevel, str],
344 message: str,
345 category: Optional[Union[LogCategory, str]] = None,
346 user_id: Optional[str] = None,
347 user_email: Optional[str] = None,
348 team_id: Optional[str] = None,
349 error: Optional[Exception] = None,
350 duration_ms: Optional[float] = None,
351 custom_fields: Optional[Dict[str, Any]] = None,
352 tags: Optional[List[str]] = None,
353 db: Optional[Session] = None,
354 **kwargs: Any,
355 ) -> None:
356 """Log a structured message.
358 Args:
359 level: Log level
360 message: Log message
361 category: Log category
362 user_id: User identifier
363 user_email: User email
364 team_id: Team identifier
365 error: Exception object
366 duration_ms: Operation duration
367 custom_fields: Additional custom fields
368 tags: Log tags
369 db: Optional database session
370 **kwargs: Additional fields to include
371 """
372 # Early termination if log level is below configured threshold
373 # This avoids expensive enrichment and database operations for filtered messages
374 if not _should_log(level):
375 return
377 # Build base entry
378 entry: Dict[str, Any] = {
379 "level": level.value if isinstance(level, LogLevel) else level,
380 "component": self.component,
381 "message": message,
382 "category": category.value if isinstance(category, LogCategory) and category else category if category else None,
383 "user_id": user_id,
384 "user_email": user_email,
385 "team_id": team_id,
386 "duration_ms": duration_ms,
387 "custom_fields": custom_fields,
388 "tags": tags,
389 }
391 # Add error information if present
392 if error:
393 entry["error_type"] = type(error).__name__
394 entry["error_message"] = str(error)
395 entry["error_stack_trace"] = "".join(traceback.format_exception(type(error), error, error.__traceback__))
397 # Add any additional kwargs
398 entry.update(kwargs)
400 # Enrich entry with context
401 entry = self.enricher.enrich(entry)
403 # Route to destinations
404 self.router.route(entry, db)
406 def debug(self, message: str, **kwargs: Any) -> None:
407 """Log debug message.
409 Args:
410 message: Log message
411 **kwargs: Additional context fields
412 """
413 self.log(LogLevel.DEBUG, message, **kwargs)
415 def info(self, message: str, **kwargs: Any) -> None:
416 """Log info message.
418 Args:
419 message: Log message
420 **kwargs: Additional context fields
421 """
422 self.log(LogLevel.INFO, message, **kwargs)
424 def warning(self, message: str, **kwargs: Any) -> None:
425 """Log warning message.
427 Args:
428 message: Log message
429 **kwargs: Additional context fields
430 """
431 self.log(LogLevel.WARNING, message, **kwargs)
433 def error(self, message: str, error: Optional[Exception] = None, **kwargs: Any) -> None:
434 """Log error message.
436 Args:
437 message: Log message
438 error: Exception object if available
439 **kwargs: Additional context fields
440 """
441 self.log(LogLevel.ERROR, message, error=error, **kwargs)
443 def critical(self, message: str, error: Optional[Exception] = None, **kwargs: Any) -> None:
444 """Log critical message.
446 Args:
447 message: Log message
448 error: Exception object if available
449 **kwargs: Additional context fields
450 """
451 self.log(LogLevel.CRITICAL, message, error=error, **kwargs)
454class ComponentLogger:
455 """Logger factory for component-specific loggers."""
457 _loggers: Dict[str, StructuredLogger] = {}
459 @classmethod
460 def get_logger(cls, component: str) -> StructuredLogger:
461 """Get or create a logger for a specific component.
463 Args:
464 component: Component name
466 Returns:
467 StructuredLogger instance for the component
468 """
469 if component not in cls._loggers:
470 cls._loggers[component] = StructuredLogger(component)
471 return cls._loggers[component]
473 @classmethod
474 def clear_loggers(cls) -> None:
475 """Clear all cached loggers (useful for testing)."""
476 cls._loggers.clear()
479# Global structured logger instance for backward compatibility
480def get_structured_logger(component: str = "mcpgateway") -> StructuredLogger:
481 """Get a structured logger instance.
483 Args:
484 component: Component name
486 Returns:
487 StructuredLogger instance
488 """
489 return ComponentLogger.get_logger(component)