Coverage for mcpgateway / services / structured_logger.py: 99%

161 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/structured_logger.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5 

6Structured Logger Service. 

7 

8This module provides comprehensive structured logging with component-based loggers, 

9automatic enrichment, intelligent routing, and database persistence. 

10""" 

11 

12# Standard 

13from datetime import datetime, timezone 

14from enum import Enum 

15import logging 

16import os 

17import socket 

18import traceback 

19from typing import Any, Dict, List, Optional, Union 

20 

21# Third-Party 

22from sqlalchemy.orm import Session 

23 

24# First-Party 

25from mcpgateway.config import settings 

26from mcpgateway.db import SessionLocal, StructuredLogEntry 

27from mcpgateway.services.performance_tracker import get_performance_tracker 

28from mcpgateway.utils.correlation_id import get_correlation_id 

29 

30# Optional OpenTelemetry support - import once at module level for performance 

31try: 

32 # Third-Party 

33 from opentelemetry import trace as otel_trace 

34 

35 _OTEL_AVAILABLE = True 

36except ImportError: 

37 otel_trace = None # type: ignore[assignment] 

38 _OTEL_AVAILABLE = False 

39 

40logger = logging.getLogger(__name__) 

41 

42# Cache static values at module load - these don't change during process lifetime 

43_CACHED_HOSTNAME: str = socket.gethostname() 

44_CACHED_PID: int = os.getpid() 

45 

46 

47class LogLevel(str, Enum): 

48 """Log levels matching Python logging.""" 

49 

50 DEBUG = "DEBUG" 

51 INFO = "INFO" 

52 WARNING = "WARNING" 

53 ERROR = "ERROR" 

54 CRITICAL = "CRITICAL" 

55 

56 

57class LogCategory(str, Enum): 

58 """Log categories for classification.""" 

59 

60 APPLICATION = "application" 

61 REQUEST = "request" 

62 SECURITY = "security" 

63 PERFORMANCE = "performance" 

64 DATABASE = "database" 

65 AUTHENTICATION = "authentication" 

66 AUTHORIZATION = "authorization" 

67 EXTERNAL_SERVICE = "external_service" 

68 BUSINESS_LOGIC = "business_logic" 

69 SYSTEM = "system" 

70 

71 

72# Log level numeric values for comparison (matches Python logging module) 

73_LOG_LEVEL_VALUES: Dict[str, int] = { 

74 "DEBUG": logging.DEBUG, # 10 

75 "INFO": logging.INFO, # 20 

76 "WARNING": logging.WARNING, # 30 

77 "ERROR": logging.ERROR, # 40 

78 "CRITICAL": logging.CRITICAL, # 50 

79} 

80 

81 

82def _should_log(level: Union[LogLevel, str]) -> bool: 

83 """Check if a log level should be processed based on settings.log_level. 

84 

85 This enables early termination of log processing to avoid expensive 

86 enrichment and database operations for messages below the configured threshold. 

87 

88 Args: 

89 level: The log level to check (LogLevel enum or string) 

90 

91 Returns: 

92 True if the level meets or exceeds the configured threshold 

93 """ 

94 # Get string value from enum if needed 

95 level_str = level.value if isinstance(level, LogLevel) else str(level).upper() 

96 

97 # Get numeric values for comparison 

98 entry_level = _LOG_LEVEL_VALUES.get(level_str, logging.INFO) 

99 config_level = _LOG_LEVEL_VALUES.get(settings.log_level.upper(), logging.INFO) 

100 

101 return entry_level >= config_level 

102 

103 

104class LogEnricher: 

105 """Enriches log entries with contextual information.""" 

106 

107 @staticmethod 

108 def enrich(entry: Dict[str, Any]) -> Dict[str, Any]: 

109 """Enrich log entry with system and context information. 

110 

111 Args: 

112 entry: Base log entry 

113 

114 Returns: 

115 Enriched log entry 

116 """ 

117 # Get correlation ID 

118 correlation_id = get_correlation_id() 

119 if correlation_id: 

120 entry["correlation_id"] = correlation_id 

121 

122 # Add hostname and process info - use cached values for performance 

123 entry.setdefault("hostname", _CACHED_HOSTNAME) 

124 entry.setdefault("process_id", _CACHED_PID) 

125 

126 # Add timestamp if not present 

127 if "timestamp" not in entry: 

128 entry["timestamp"] = datetime.now(timezone.utc) 

129 

130 # Add performance metrics if available (skip if tracker not initialized) 

131 try: 

132 perf_tracker = get_performance_tracker() 

133 if correlation_id and perf_tracker and hasattr(perf_tracker, "get_current_operations"): 

134 current_ops = perf_tracker.get_current_operations(correlation_id) # pylint: disable=no-member 

135 if current_ops: 135 ↛ 141line 135 didn't jump to line 141 because the condition on line 135 was always true

136 entry["active_operations"] = len(current_ops) 

137 except Exception: # nosec B110 - Graceful degradation if performance tracker unavailable 

138 pass 

139 

140 # Add OpenTelemetry trace context if available (uses module-level import) 

141 if _OTEL_AVAILABLE: 

142 try: 

143 span = otel_trace.get_current_span() 

144 if span and span.get_span_context().is_valid: 

145 ctx = span.get_span_context() 

146 entry["trace_id"] = format(ctx.trace_id, "032x") 

147 entry["span_id"] = format(ctx.span_id, "016x") 

148 except Exception: # nosec B110 - Graceful degradation 

149 pass 

150 

151 return entry 

152 

153 

154class LogRouter: 

155 """Routes log entries to appropriate destinations.""" 

156 

157 def __init__(self): 

158 """Initialize log router.""" 

159 self.database_enabled = getattr(settings, "structured_logging_database_enabled", True) 

160 self.external_enabled = getattr(settings, "structured_logging_external_enabled", False) 

161 

162 def route(self, entry: Dict[str, Any], db: Optional[Session] = None) -> None: 

163 """Route log entry to configured destinations. 

164 

165 Args: 

166 entry: Log entry to route 

167 db: Optional database session 

168 """ 

169 # Always log to standard Python logger 

170 self._log_to_python_logger(entry) 

171 

172 # Persist to database if enabled 

173 if self.database_enabled: 

174 self._persist_to_database(entry, db) 

175 

176 # Send to external systems if enabled 

177 if self.external_enabled: 

178 self._send_to_external(entry) 

179 

180 def _log_to_python_logger(self, entry: Dict[str, Any]) -> None: 

181 """Log to standard Python logger. 

182 

183 Args: 

184 entry: Log entry 

185 """ 

186 level_str = entry.get("level", "INFO") 

187 level = getattr(logging, level_str, logging.INFO) 

188 

189 message = entry.get("message", "") 

190 component = entry.get("component", "") 

191 

192 log_message = f"[{component}] {message}" if component else message 

193 

194 # Build extra dict for structured logging 

195 extra = {k: v for k, v in entry.items() if k not in ["message", "level"]} 

196 

197 logger.log(level, log_message, extra=extra) 

198 

199 def _persist_to_database(self, entry: Dict[str, Any], db: Optional[Session] = None) -> None: 

200 """Persist log entry to database. 

201 

202 Args: 

203 entry: Log entry 

204 db: Optional database session 

205 """ 

206 should_close = False 

207 if db is None: 

208 db = SessionLocal() 

209 should_close = True 

210 

211 try: 

212 # Build error_details JSON from error-related fields 

213 error_details = None 

214 if any([entry.get("error_type"), entry.get("error_message"), entry.get("error_stack_trace"), entry.get("error_context")]): 

215 error_details = { 

216 "error_type": entry.get("error_type"), 

217 "error_message": entry.get("error_message"), 

218 "error_stack_trace": entry.get("error_stack_trace"), 

219 "error_context": entry.get("error_context"), 

220 } 

221 

222 # Build performance_metrics JSON from performance-related fields 

223 performance_metrics = None 

224 perf_fields = { 

225 "database_query_count": entry.get("database_query_count"), 

226 "database_query_duration_ms": entry.get("database_query_duration_ms"), 

227 "cache_hits": entry.get("cache_hits"), 

228 "cache_misses": entry.get("cache_misses"), 

229 "external_api_calls": entry.get("external_api_calls"), 

230 "external_api_duration_ms": entry.get("external_api_duration_ms"), 

231 "memory_usage_mb": entry.get("memory_usage_mb"), 

232 "cpu_usage_percent": entry.get("cpu_usage_percent"), 

233 } 

234 if any(v is not None for v in perf_fields.values()): 

235 performance_metrics = {k: v for k, v in perf_fields.items() if v is not None} 

236 

237 # Build threat_indicators JSON from security-related fields 

238 threat_indicators = None 

239 security_fields = { 

240 "security_event_type": entry.get("security_event_type"), 

241 "security_threat_score": entry.get("security_threat_score"), 

242 "security_action_taken": entry.get("security_action_taken"), 

243 } 

244 if any(v is not None for v in security_fields.values()): 

245 threat_indicators = {k: v for k, v in security_fields.items() if v is not None} 

246 

247 # Build context JSON from remaining fields 

248 context_fields = { 

249 "team_id": entry.get("team_id"), 

250 "request_query": entry.get("request_query"), 

251 "request_headers": entry.get("request_headers"), 

252 "request_body_size": entry.get("request_body_size"), 

253 "response_status_code": entry.get("response_status_code"), 

254 "response_body_size": entry.get("response_body_size"), 

255 "response_headers": entry.get("response_headers"), 

256 "business_event_type": entry.get("business_event_type"), 

257 "business_entity_type": entry.get("business_entity_type"), 

258 "business_entity_id": entry.get("business_entity_id"), 

259 "resource_type": entry.get("resource_type"), 

260 "resource_id": entry.get("resource_id"), 

261 "resource_action": entry.get("resource_action"), 

262 "category": entry.get("category"), 

263 "custom_fields": entry.get("custom_fields"), 

264 "tags": entry.get("tags"), 

265 "metadata": entry.get("metadata"), 

266 } 

267 context = {k: v for k, v in context_fields.items() if v is not None} 

268 

269 # Determine if this is a security event 

270 is_security_event = entry.get("is_security_event", False) or bool(threat_indicators) 

271 security_severity = entry.get("security_severity") 

272 

273 log_entry = StructuredLogEntry( 

274 timestamp=entry.get("timestamp", datetime.now(timezone.utc)), 

275 level=entry.get("level", "INFO"), 

276 component=entry.get("component"), 

277 message=entry.get("message", ""), 

278 correlation_id=entry.get("correlation_id"), 

279 request_id=entry.get("request_id"), 

280 trace_id=entry.get("trace_id"), 

281 span_id=entry.get("span_id"), 

282 user_id=entry.get("user_id"), 

283 user_email=entry.get("user_email"), 

284 client_ip=entry.get("client_ip"), 

285 user_agent=entry.get("user_agent"), 

286 request_method=entry.get("request_method"), 

287 request_path=entry.get("request_path"), 

288 duration_ms=entry.get("duration_ms"), 

289 operation_type=entry.get("operation_type"), 

290 is_security_event=is_security_event, 

291 security_severity=security_severity, 

292 threat_indicators=threat_indicators, 

293 context=context if context else None, 

294 error_details=error_details, 

295 performance_metrics=performance_metrics, 

296 hostname=entry.get("hostname"), 

297 process_id=entry.get("process_id"), 

298 thread_id=entry.get("thread_id"), 

299 environment=entry.get("environment", getattr(settings, "environment", "development")), 

300 version=entry.get("version", getattr(settings, "version", "unknown")), 

301 ) 

302 

303 db.add(log_entry) 

304 db.commit() 

305 

306 except Exception as e: 

307 logger.error(f"Failed to persist log entry to database: {e}", exc_info=True) 

308 # Also print to console for immediate visibility 

309 print(f"ERROR persisting log to database: {e}") 

310 traceback.print_exc() 

311 if db: 311 ↛ 315line 311 didn't jump to line 315 because the condition on line 311 was always true

312 db.rollback() 

313 

314 finally: 

315 if should_close: 

316 db.close() # Commit/rollback already handled above 

317 

318 def _send_to_external(self, entry: Dict[str, Any]) -> None: 

319 """Send log entry to external systems. 

320 

321 Args: 

322 entry: Log entry 

323 """ 

324 # Placeholder for external logging integration 

325 # Will be implemented in log exporters 

326 

327 

328class StructuredLogger: 

329 """Main structured logger with enrichment and routing.""" 

330 

331 def __init__(self, component: str): 

332 """Initialize structured logger. 

333 

334 Args: 

335 component: Component name for log entries 

336 """ 

337 self.component = component 

338 self.enricher = LogEnricher() 

339 self.router = LogRouter() 

340 

341 def log( 

342 self, 

343 level: Union[LogLevel, str], 

344 message: str, 

345 category: Optional[Union[LogCategory, str]] = None, 

346 user_id: Optional[str] = None, 

347 user_email: Optional[str] = None, 

348 team_id: Optional[str] = None, 

349 error: Optional[Exception] = None, 

350 duration_ms: Optional[float] = None, 

351 custom_fields: Optional[Dict[str, Any]] = None, 

352 tags: Optional[List[str]] = None, 

353 db: Optional[Session] = None, 

354 **kwargs: Any, 

355 ) -> None: 

356 """Log a structured message. 

357 

358 Args: 

359 level: Log level 

360 message: Log message 

361 category: Log category 

362 user_id: User identifier 

363 user_email: User email 

364 team_id: Team identifier 

365 error: Exception object 

366 duration_ms: Operation duration 

367 custom_fields: Additional custom fields 

368 tags: Log tags 

369 db: Optional database session 

370 **kwargs: Additional fields to include 

371 """ 

372 # Early termination if log level is below configured threshold 

373 # This avoids expensive enrichment and database operations for filtered messages 

374 if not _should_log(level): 

375 return 

376 

377 # Build base entry 

378 entry: Dict[str, Any] = { 

379 "level": level.value if isinstance(level, LogLevel) else level, 

380 "component": self.component, 

381 "message": message, 

382 "category": category.value if isinstance(category, LogCategory) and category else category if category else None, 

383 "user_id": user_id, 

384 "user_email": user_email, 

385 "team_id": team_id, 

386 "duration_ms": duration_ms, 

387 "custom_fields": custom_fields, 

388 "tags": tags, 

389 } 

390 

391 # Add error information if present 

392 if error: 

393 entry["error_type"] = type(error).__name__ 

394 entry["error_message"] = str(error) 

395 entry["error_stack_trace"] = "".join(traceback.format_exception(type(error), error, error.__traceback__)) 

396 

397 # Add any additional kwargs 

398 entry.update(kwargs) 

399 

400 # Enrich entry with context 

401 entry = self.enricher.enrich(entry) 

402 

403 # Route to destinations 

404 self.router.route(entry, db) 

405 

406 def debug(self, message: str, **kwargs: Any) -> None: 

407 """Log debug message. 

408 

409 Args: 

410 message: Log message 

411 **kwargs: Additional context fields 

412 """ 

413 self.log(LogLevel.DEBUG, message, **kwargs) 

414 

415 def info(self, message: str, **kwargs: Any) -> None: 

416 """Log info message. 

417 

418 Args: 

419 message: Log message 

420 **kwargs: Additional context fields 

421 """ 

422 self.log(LogLevel.INFO, message, **kwargs) 

423 

424 def warning(self, message: str, **kwargs: Any) -> None: 

425 """Log warning message. 

426 

427 Args: 

428 message: Log message 

429 **kwargs: Additional context fields 

430 """ 

431 self.log(LogLevel.WARNING, message, **kwargs) 

432 

433 def error(self, message: str, error: Optional[Exception] = None, **kwargs: Any) -> None: 

434 """Log error message. 

435 

436 Args: 

437 message: Log message 

438 error: Exception object if available 

439 **kwargs: Additional context fields 

440 """ 

441 self.log(LogLevel.ERROR, message, error=error, **kwargs) 

442 

443 def critical(self, message: str, error: Optional[Exception] = None, **kwargs: Any) -> None: 

444 """Log critical message. 

445 

446 Args: 

447 message: Log message 

448 error: Exception object if available 

449 **kwargs: Additional context fields 

450 """ 

451 self.log(LogLevel.CRITICAL, message, error=error, **kwargs) 

452 

453 

454class ComponentLogger: 

455 """Logger factory for component-specific loggers.""" 

456 

457 _loggers: Dict[str, StructuredLogger] = {} 

458 

459 @classmethod 

460 def get_logger(cls, component: str) -> StructuredLogger: 

461 """Get or create a logger for a specific component. 

462 

463 Args: 

464 component: Component name 

465 

466 Returns: 

467 StructuredLogger instance for the component 

468 """ 

469 if component not in cls._loggers: 

470 cls._loggers[component] = StructuredLogger(component) 

471 return cls._loggers[component] 

472 

473 @classmethod 

474 def clear_loggers(cls) -> None: 

475 """Clear all cached loggers (useful for testing).""" 

476 cls._loggers.clear() 

477 

478 

479# Global structured logger instance for backward compatibility 

480def get_structured_logger(component: str = "mcpgateway") -> StructuredLogger: 

481 """Get a structured logger instance. 

482 

483 Args: 

484 component: Component name 

485 

486 Returns: 

487 StructuredLogger instance 

488 """ 

489 return ComponentLogger.get_logger(component)