Coverage for mcpgateway / services / structured_logger.py: 99%

150 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/structured_logger.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5 

6Structured Logger Service. 

7 

8This module provides comprehensive structured logging with component-based loggers, 

9automatic enrichment, intelligent routing, and database persistence. 

10""" 

11 

12# Standard 

13from datetime import datetime, timezone 

14from enum import Enum 

15import logging 

16import os 

17import socket 

18import traceback 

19from typing import Any, Dict, List, Optional, Union 

20 

21# First-Party 

22from mcpgateway.config import settings 

23from mcpgateway.db import fresh_db_session, StructuredLogEntry 

24from mcpgateway.services.performance_tracker import get_performance_tracker 

25from mcpgateway.utils.correlation_id import get_correlation_id 

26 

27# Optional OpenTelemetry support - import once at module level for performance 

28try: 

29 # Third-Party 

30 from opentelemetry import trace as otel_trace 

31 

32 _OTEL_AVAILABLE = True 

33except ImportError: 

34 otel_trace = None # type: ignore[assignment] 

35 _OTEL_AVAILABLE = False 

36 

37logger = logging.getLogger(__name__) 

38 

39# Cache static values at module load - these don't change during process lifetime 

40_CACHED_HOSTNAME: str = socket.gethostname() 

41_CACHED_PID: int = os.getpid() 

42 

43 

44class LogLevel(str, Enum): 

45 """Log levels matching Python logging.""" 

46 

47 DEBUG = "DEBUG" 

48 INFO = "INFO" 

49 WARNING = "WARNING" 

50 ERROR = "ERROR" 

51 CRITICAL = "CRITICAL" 

52 

53 

54class LogCategory(str, Enum): 

55 """Log categories for classification.""" 

56 

57 APPLICATION = "application" 

58 REQUEST = "request" 

59 SECURITY = "security" 

60 PERFORMANCE = "performance" 

61 DATABASE = "database" 

62 AUTHENTICATION = "authentication" 

63 AUTHORIZATION = "authorization" 

64 EXTERNAL_SERVICE = "external_service" 

65 BUSINESS_LOGIC = "business_logic" 

66 SYSTEM = "system" 

67 

68 

69# Log level numeric values for comparison (matches Python logging module) 

70_LOG_LEVEL_VALUES: Dict[str, int] = { 

71 "DEBUG": logging.DEBUG, # 10 

72 "INFO": logging.INFO, # 20 

73 "WARNING": logging.WARNING, # 30 

74 "ERROR": logging.ERROR, # 40 

75 "CRITICAL": logging.CRITICAL, # 50 

76} 

77 

78 

79def _should_log(level: Union[LogLevel, str]) -> bool: 

80 """Check if a log level should be processed based on settings.log_level. 

81 

82 This enables early termination of log processing to avoid expensive 

83 enrichment and database operations for messages below the configured threshold. 

84 

85 Args: 

86 level: The log level to check (LogLevel enum or string) 

87 

88 Returns: 

89 True if the level meets or exceeds the configured threshold 

90 """ 

91 # Get string value from enum if needed 

92 level_str = level.value if isinstance(level, LogLevel) else str(level).upper() 

93 

94 # Get numeric values for comparison 

95 entry_level = _LOG_LEVEL_VALUES.get(level_str, logging.INFO) 

96 config_level = _LOG_LEVEL_VALUES.get(settings.log_level.upper(), logging.INFO) 

97 

98 return entry_level >= config_level 

99 

100 

101class LogEnricher: 

102 """Enriches log entries with contextual information.""" 

103 

104 @staticmethod 

105 def enrich(entry: Dict[str, Any]) -> Dict[str, Any]: 

106 """Enrich log entry with system and context information. 

107 

108 Args: 

109 entry: Base log entry 

110 

111 Returns: 

112 Enriched log entry 

113 """ 

114 # Get correlation ID 

115 correlation_id = get_correlation_id() 

116 if correlation_id: 

117 entry["correlation_id"] = correlation_id 

118 

119 # Add hostname and process info - use cached values for performance 

120 entry.setdefault("hostname", _CACHED_HOSTNAME) 

121 entry.setdefault("process_id", _CACHED_PID) 

122 

123 # Add timestamp if not present 

124 if "timestamp" not in entry: 

125 entry["timestamp"] = datetime.now(timezone.utc) 

126 

127 # Add performance metrics if available (skip if tracker not initialized) 

128 try: 

129 perf_tracker = get_performance_tracker() 

130 if correlation_id and perf_tracker and hasattr(perf_tracker, "get_current_operations"): 

131 current_ops = perf_tracker.get_current_operations(correlation_id) # pylint: disable=no-member 

132 if current_ops: 

133 entry["active_operations"] = len(current_ops) 

134 except Exception: # nosec B110 - Graceful degradation if performance tracker unavailable 

135 pass 

136 

137 # Add OpenTelemetry trace context if available (uses module-level import) 

138 if _OTEL_AVAILABLE: 

139 try: 

140 span = otel_trace.get_current_span() 

141 if span and span.get_span_context().is_valid: 

142 ctx = span.get_span_context() 

143 entry["trace_id"] = format(ctx.trace_id, "032x") 

144 entry["span_id"] = format(ctx.span_id, "016x") 

145 except Exception: # nosec B110 - Graceful degradation 

146 pass 

147 

148 return entry 

149 

150 

151class LogRouter: 

152 """Routes log entries to appropriate destinations.""" 

153 

154 def __init__(self): 

155 """Initialize log router.""" 

156 self.database_enabled = getattr(settings, "structured_logging_database_enabled", True) 

157 self.external_enabled = getattr(settings, "structured_logging_external_enabled", False) 

158 

159 def route(self, entry: Dict[str, Any]) -> None: 

160 """Route log entry to configured destinations. 

161 

162 Args: 

163 entry: Log entry to route 

164 """ 

165 # Always log to standard Python logger 

166 self._log_to_python_logger(entry) 

167 

168 # Persist to database if enabled 

169 if self.database_enabled: 

170 self._persist_to_database(entry) 

171 

172 # Send to external systems if enabled 

173 if self.external_enabled: 

174 self._send_to_external(entry) 

175 

176 def _log_to_python_logger(self, entry: Dict[str, Any]) -> None: 

177 """Log to standard Python logger. 

178 

179 Args: 

180 entry: Log entry 

181 """ 

182 level_str = entry.get("level", "INFO") 

183 level = getattr(logging, level_str, logging.INFO) 

184 

185 message = entry.get("message", "") 

186 component = entry.get("component", "") 

187 

188 log_message = f"[{component}] {message}" if component else message 

189 

190 # Build extra dict for structured logging 

191 extra = {k: v for k, v in entry.items() if k not in ["message", "level"]} 

192 

193 logger.log(level, log_message, extra=extra) 

194 

195 def _persist_to_database(self, entry: Dict[str, Any]) -> None: 

196 """Persist log entry to database. 

197 

198 Always uses a fresh, independent database session to avoid accidentally 

199 committing unrelated pending objects from the caller's session. 

200 

201 Args: 

202 entry: Log entry 

203 """ 

204 try: 

205 with fresh_db_session() as log_db: 

206 # Build error_details JSON from error-related fields 

207 error_details = None 

208 if any([entry.get("error_type"), entry.get("error_message"), entry.get("error_stack_trace"), entry.get("error_context")]): 

209 error_details = { 

210 "error_type": entry.get("error_type"), 

211 "error_message": entry.get("error_message"), 

212 "error_stack_trace": entry.get("error_stack_trace"), 

213 "error_context": entry.get("error_context"), 

214 } 

215 

216 # Build performance_metrics JSON from performance-related fields 

217 performance_metrics = None 

218 perf_fields = { 

219 "database_query_count": entry.get("database_query_count"), 

220 "database_query_duration_ms": entry.get("database_query_duration_ms"), 

221 "cache_hits": entry.get("cache_hits"), 

222 "cache_misses": entry.get("cache_misses"), 

223 "external_api_calls": entry.get("external_api_calls"), 

224 "external_api_duration_ms": entry.get("external_api_duration_ms"), 

225 "memory_usage_mb": entry.get("memory_usage_mb"), 

226 "cpu_usage_percent": entry.get("cpu_usage_percent"), 

227 } 

228 if any(v is not None for v in perf_fields.values()): 

229 performance_metrics = {k: v for k, v in perf_fields.items() if v is not None} 

230 

231 # Build threat_indicators JSON from security-related fields 

232 threat_indicators = None 

233 security_fields = { 

234 "security_event_type": entry.get("security_event_type"), 

235 "security_threat_score": entry.get("security_threat_score"), 

236 "security_action_taken": entry.get("security_action_taken"), 

237 } 

238 if any(v is not None for v in security_fields.values()): 

239 threat_indicators = {k: v for k, v in security_fields.items() if v is not None} 

240 

241 # Build context JSON from remaining fields 

242 context_fields = { 

243 "team_id": entry.get("team_id"), 

244 "request_query": entry.get("request_query"), 

245 "request_headers": entry.get("request_headers"), 

246 "request_body_size": entry.get("request_body_size"), 

247 "response_status_code": entry.get("response_status_code"), 

248 "response_body_size": entry.get("response_body_size"), 

249 "response_headers": entry.get("response_headers"), 

250 "business_event_type": entry.get("business_event_type"), 

251 "business_entity_type": entry.get("business_entity_type"), 

252 "business_entity_id": entry.get("business_entity_id"), 

253 "resource_type": entry.get("resource_type"), 

254 "resource_id": entry.get("resource_id"), 

255 "resource_action": entry.get("resource_action"), 

256 "category": entry.get("category"), 

257 "custom_fields": entry.get("custom_fields"), 

258 "tags": entry.get("tags"), 

259 "metadata": entry.get("metadata"), 

260 } 

261 context = {k: v for k, v in context_fields.items() if v is not None} 

262 

263 # Determine if this is a security event 

264 is_security_event = entry.get("is_security_event", False) or bool(threat_indicators) 

265 security_severity = entry.get("security_severity") 

266 

267 log_entry = StructuredLogEntry( 

268 timestamp=entry.get("timestamp", datetime.now(timezone.utc)), 

269 level=entry.get("level", "INFO"), 

270 component=entry.get("component"), 

271 message=entry.get("message", ""), 

272 correlation_id=entry.get("correlation_id"), 

273 request_id=entry.get("request_id"), 

274 trace_id=entry.get("trace_id"), 

275 span_id=entry.get("span_id"), 

276 user_id=entry.get("user_id"), 

277 user_email=entry.get("user_email"), 

278 client_ip=entry.get("client_ip"), 

279 user_agent=entry.get("user_agent"), 

280 request_method=entry.get("request_method"), 

281 request_path=entry.get("request_path"), 

282 duration_ms=entry.get("duration_ms"), 

283 operation_type=entry.get("operation_type"), 

284 is_security_event=is_security_event, 

285 security_severity=security_severity, 

286 threat_indicators=threat_indicators, 

287 context=context if context else None, 

288 error_details=error_details, 

289 performance_metrics=performance_metrics, 

290 hostname=entry.get("hostname"), 

291 process_id=entry.get("process_id"), 

292 thread_id=entry.get("thread_id"), 

293 environment=entry.get("environment", getattr(settings, "environment", "development")), 

294 version=entry.get("version", getattr(settings, "version", "unknown")), 

295 ) 

296 

297 log_db.add(log_entry) 

298 

299 except Exception as e: 

300 logger.error(f"Failed to persist log entry to database: {e}", exc_info=True) 

301 

302 def _send_to_external(self, entry: Dict[str, Any]) -> None: 

303 """Send log entry to external systems. 

304 

305 Args: 

306 entry: Log entry 

307 """ 

308 # Placeholder for external logging integration 

309 # Will be implemented in log exporters 

310 

311 

312class StructuredLogger: 

313 """Main structured logger with enrichment and routing.""" 

314 

315 def __init__(self, component: str): 

316 """Initialize structured logger. 

317 

318 Args: 

319 component: Component name for log entries 

320 """ 

321 self.component = component 

322 self.enricher = LogEnricher() 

323 self.router = LogRouter() 

324 

325 def log( 

326 self, 

327 level: Union[LogLevel, str], 

328 message: str, 

329 category: Optional[Union[LogCategory, str]] = None, 

330 user_id: Optional[str] = None, 

331 user_email: Optional[str] = None, 

332 team_id: Optional[str] = None, 

333 error: Optional[Exception] = None, 

334 duration_ms: Optional[float] = None, 

335 custom_fields: Optional[Dict[str, Any]] = None, 

336 tags: Optional[List[str]] = None, 

337 **kwargs: Any, 

338 ) -> None: 

339 """Log a structured message. 

340 

341 Args: 

342 level: Log level 

343 message: Log message 

344 category: Log category 

345 user_id: User identifier 

346 user_email: User email 

347 team_id: Team identifier 

348 error: Exception object 

349 duration_ms: Operation duration 

350 custom_fields: Additional custom fields 

351 tags: Log tags 

352 **kwargs: Additional fields to include 

353 """ 

354 # Early termination if log level is below configured threshold 

355 # This avoids expensive enrichment and database operations for filtered messages 

356 if not _should_log(level): 

357 return 

358 

359 # Build base entry 

360 entry: Dict[str, Any] = { 

361 "level": level.value if isinstance(level, LogLevel) else level, 

362 "component": self.component, 

363 "message": message, 

364 "category": category.value if isinstance(category, LogCategory) and category else category if category else None, 

365 "user_id": user_id, 

366 "user_email": user_email, 

367 "team_id": team_id, 

368 "duration_ms": duration_ms, 

369 "custom_fields": custom_fields, 

370 "tags": tags, 

371 } 

372 

373 # Add error information if present 

374 if error: 

375 entry["error_type"] = type(error).__name__ 

376 entry["error_message"] = str(error) 

377 entry["error_stack_trace"] = "".join(traceback.format_exception(type(error), error, error.__traceback__)) 

378 

379 # Add any additional kwargs 

380 entry.update(kwargs) 

381 

382 # Enrich entry with context 

383 entry = self.enricher.enrich(entry) 

384 

385 # Route to destinations 

386 self.router.route(entry) 

387 

388 def debug(self, message: str, **kwargs: Any) -> None: 

389 """Log debug message. 

390 

391 Args: 

392 message: Log message 

393 **kwargs: Additional context fields 

394 """ 

395 self.log(LogLevel.DEBUG, message, **kwargs) 

396 

397 def info(self, message: str, **kwargs: Any) -> None: 

398 """Log info message. 

399 

400 Args: 

401 message: Log message 

402 **kwargs: Additional context fields 

403 """ 

404 self.log(LogLevel.INFO, message, **kwargs) 

405 

406 def warning(self, message: str, **kwargs: Any) -> None: 

407 """Log warning message. 

408 

409 Args: 

410 message: Log message 

411 **kwargs: Additional context fields 

412 """ 

413 self.log(LogLevel.WARNING, message, **kwargs) 

414 

415 def error(self, message: str, error: Optional[Exception] = None, **kwargs: Any) -> None: 

416 """Log error message. 

417 

418 Args: 

419 message: Log message 

420 error: Exception object if available 

421 **kwargs: Additional context fields 

422 """ 

423 self.log(LogLevel.ERROR, message, error=error, **kwargs) 

424 

425 def critical(self, message: str, error: Optional[Exception] = None, **kwargs: Any) -> None: 

426 """Log critical message. 

427 

428 Args: 

429 message: Log message 

430 error: Exception object if available 

431 **kwargs: Additional context fields 

432 """ 

433 self.log(LogLevel.CRITICAL, message, error=error, **kwargs) 

434 

435 

436class ComponentLogger: 

437 """Logger factory for component-specific loggers.""" 

438 

439 _loggers: Dict[str, StructuredLogger] = {} 

440 

441 @classmethod 

442 def get_logger(cls, component: str) -> StructuredLogger: 

443 """Get or create a logger for a specific component. 

444 

445 Args: 

446 component: Component name 

447 

448 Returns: 

449 StructuredLogger instance for the component 

450 """ 

451 if component not in cls._loggers: 

452 cls._loggers[component] = StructuredLogger(component) 

453 return cls._loggers[component] 

454 

455 @classmethod 

456 def clear_loggers(cls) -> None: 

457 """Clear all cached loggers (useful for testing).""" 

458 cls._loggers.clear() 

459 

460 

461# Global structured logger instance for backward compatibility 

462def get_structured_logger(component: str = "mcpgateway") -> StructuredLogger: 

463 """Get a structured logger instance. 

464 

465 Args: 

466 component: Component name 

467 

468 Returns: 

469 StructuredLogger instance 

470 """ 

471 return ComponentLogger.get_logger(component)