Coverage for mcpgateway / services / security_logger.py: 99%

144 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/security_logger.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5 

6Security Logger Service. 

7 

8This module provides specialized logging for security events, threat detection, 

9and audit trail management with automated threat analysis and alerting. 

10""" 

11 

12# Standard 

13from datetime import datetime, timedelta, timezone 

14from enum import Enum 

15import logging 

16from typing import Any, Dict, Optional 

17 

18# Third-Party 

19from sqlalchemy import func, select 

20from sqlalchemy.orm import Session 

21 

22# First-Party 

23from mcpgateway.config import settings 

24from mcpgateway.db import AuditTrail, SecurityEvent, SessionLocal 

25from mcpgateway.utils.correlation_id import get_correlation_id 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30class SecuritySeverity(str, Enum): 

31 """Security event severity levels.""" 

32 

33 LOW = "LOW" 

34 MEDIUM = "MEDIUM" 

35 HIGH = "HIGH" 

36 CRITICAL = "CRITICAL" 

37 

38 

39class SecurityEventType(str, Enum): 

40 """Types of security events.""" 

41 

42 AUTHENTICATION_FAILURE = "authentication_failure" 

43 AUTHENTICATION_SUCCESS = "authentication_success" 

44 AUTHORIZATION_FAILURE = "authorization_failure" 

45 SUSPICIOUS_ACTIVITY = "suspicious_activity" 

46 RATE_LIMIT_EXCEEDED = "rate_limit_exceeded" 

47 BRUTE_FORCE_ATTEMPT = "brute_force_attempt" 

48 TOKEN_MANIPULATION = "token_manipulation" # nosec B105 - Not a password, security event type constant 

49 DATA_EXFILTRATION = "data_exfiltration" 

50 PRIVILEGE_ESCALATION = "privilege_escalation" 

51 INJECTION_ATTEMPT = "injection_attempt" 

52 ANOMALOUS_BEHAVIOR = "anomalous_behavior" 

53 

54 

55class SecurityLogger: 

56 """Specialized logger for security events and audit trails. 

57 

58 Provides threat detection, security event logging, and audit trail 

59 management with automated analysis and alerting capabilities. 

60 """ 

61 

62 def __init__(self): 

63 """Initialize security logger.""" 

64 self.failed_auth_threshold = getattr(settings, "security_failed_auth_threshold", 5) 

65 self.threat_score_alert_threshold = getattr(settings, "security_threat_score_alert", 0.7) 

66 self.rate_limit_window_minutes = getattr(settings, "security_rate_limit_window", 5) 

67 

68 def log_authentication_attempt( 

69 self, 

70 user_id: str, 

71 user_email: Optional[str], 

72 auth_method: str, 

73 success: bool, 

74 client_ip: str, 

75 user_agent: Optional[str] = None, 

76 failure_reason: Optional[str] = None, 

77 additional_context: Optional[Dict[str, Any]] = None, 

78 db: Optional[Session] = None, 

79 ) -> Optional[SecurityEvent]: 

80 """Log authentication attempts with security analysis. 

81 

82 Args: 

83 user_id: User identifier 

84 user_email: User email address 

85 auth_method: Authentication method used 

86 success: Whether authentication succeeded 

87 client_ip: Client IP address 

88 user_agent: Client user agent 

89 failure_reason: Reason for failure if applicable 

90 additional_context: Additional event context 

91 db: Optional database session 

92 

93 Returns: 

94 Created SecurityEvent or None if logging disabled 

95 """ 

96 correlation_id = get_correlation_id() 

97 

98 # Count recent failed attempts 

99 failed_attempts = self._count_recent_failures(user_id=user_id, client_ip=client_ip, db=db) 

100 

101 # Calculate threat score 

102 threat_score = self._calculate_auth_threat_score(success=success, failed_attempts=failed_attempts, auth_method=auth_method) 

103 

104 # Determine severity 

105 if not success: 

106 if failed_attempts >= self.failed_auth_threshold: 

107 severity = SecuritySeverity.HIGH 

108 elif failed_attempts >= 3: 

109 severity = SecuritySeverity.MEDIUM 

110 else: 

111 severity = SecuritySeverity.LOW 

112 else: 

113 severity = SecuritySeverity.LOW 

114 

115 # Build event description 

116 description = f"Authentication {'successful' if success else 'failed'} for user {user_id}" 

117 if not success and failure_reason: 

118 description += f": {failure_reason}" 

119 

120 # Build context 

121 context = {"auth_method": auth_method, "failed_attempts_recent": failed_attempts, "user_agent": user_agent, **(additional_context or {})} 

122 

123 # Create security event 

124 event = self._create_security_event( 

125 event_type=SecurityEventType.AUTHENTICATION_SUCCESS if success else SecurityEventType.AUTHENTICATION_FAILURE, 

126 severity=severity, 

127 category="authentication", 

128 user_id=user_id, 

129 user_email=user_email, 

130 client_ip=client_ip, 

131 user_agent=user_agent, 

132 description=description, 

133 threat_score=threat_score, 

134 failed_attempts_count=failed_attempts, 

135 context=context, 

136 action_taken="allowed" if success else "denied", 

137 correlation_id=correlation_id, 

138 db=db, 

139 ) 

140 

141 # Log to standard logger as well 

142 log_level = logging.WARNING if not success else logging.INFO 

143 logger.log( 

144 log_level, 

145 f"Authentication attempt: {description}", 

146 extra={ 

147 "security_event": True, 

148 "event_type": event.event_type if event else None, 

149 "severity": severity.value, 

150 "threat_score": threat_score, 

151 "correlation_id": correlation_id, 

152 }, 

153 ) 

154 

155 return event 

156 

157 def log_data_access( # pylint: disable=too-many-positional-arguments 

158 self, 

159 action: str, 

160 resource_type: str, 

161 resource_id: str, 

162 resource_name: Optional[str], 

163 user_id: str, 

164 user_email: Optional[str], 

165 team_id: Optional[str], 

166 client_ip: Optional[str], 

167 user_agent: Optional[str], 

168 success: bool, 

169 data_classification: Optional[str] = None, 

170 old_values: Optional[Dict[str, Any]] = None, 

171 new_values: Optional[Dict[str, Any]] = None, 

172 error_message: Optional[str] = None, 

173 additional_context: Optional[Dict[str, Any]] = None, 

174 db: Optional[Session] = None, 

175 ) -> Optional[AuditTrail]: 

176 """Log data access for audit trails. 

177 

178 Args: 

179 action: Action performed (create, read, update, delete, execute) 

180 resource_type: Type of resource accessed 

181 resource_id: Resource identifier 

182 resource_name: Resource name 

183 user_id: User performing the action 

184 user_email: User email 

185 team_id: Team context 

186 client_ip: Client IP address 

187 user_agent: Client user agent 

188 success: Whether action succeeded 

189 data_classification: Data sensitivity classification 

190 old_values: Previous values (for updates) 

191 new_values: New values (for updates/creates) 

192 error_message: Error message if failed 

193 additional_context: Additional context 

194 db: Optional database session 

195 

196 Returns: 

197 Created AuditTrail entry or None 

198 """ 

199 correlation_id = get_correlation_id() 

200 

201 # Determine if audit requires review 

202 requires_review = self._requires_audit_review(action=action, resource_type=resource_type, data_classification=data_classification, success=success) 

203 

204 # Calculate changes 

205 changes = None 

206 if old_values and new_values: 

207 changes = {k: {"old": old_values.get(k), "new": new_values.get(k)} for k in set(old_values.keys()) | set(new_values.keys()) if old_values.get(k) != new_values.get(k)} 

208 

209 # Create audit trail 

210 audit = self._create_audit_trail( 

211 action=action, 

212 resource_type=resource_type, 

213 resource_id=resource_id, 

214 resource_name=resource_name, 

215 user_id=user_id, 

216 user_email=user_email, 

217 team_id=team_id, 

218 client_ip=client_ip, 

219 user_agent=user_agent, 

220 success=success, 

221 old_values=old_values, 

222 new_values=new_values, 

223 changes=changes, 

224 data_classification=data_classification, 

225 requires_review=requires_review, 

226 error_message=error_message, 

227 context=additional_context, 

228 correlation_id=correlation_id, 

229 db=db, 

230 ) 

231 

232 # Log sensitive data access as security event 

233 if data_classification in ["confidential", "restricted", "sensitive"]: 

234 self._create_security_event( 

235 event_type="data_access", 

236 severity=SecuritySeverity.MEDIUM if success else SecuritySeverity.HIGH, 

237 category="data_access", 

238 user_id=user_id, 

239 user_email=user_email, 

240 client_ip=client_ip or "unknown", 

241 user_agent=user_agent, 

242 description=f"Access to {data_classification} {resource_type}: {resource_name or resource_id}", 

243 threat_score=0.3 if success else 0.6, 

244 context={ 

245 "action": action, 

246 "resource_type": resource_type, 

247 "resource_id": resource_id, 

248 "data_classification": data_classification, 

249 }, 

250 correlation_id=correlation_id, 

251 db=db, 

252 ) 

253 

254 return audit 

255 

256 def log_suspicious_activity( 

257 self, 

258 activity_type: str, 

259 description: str, 

260 user_id: Optional[str], 

261 user_email: Optional[str], 

262 client_ip: str, 

263 user_agent: Optional[str], 

264 threat_score: float, 

265 severity: SecuritySeverity, 

266 threat_indicators: Dict[str, Any], 

267 action_taken: str, 

268 additional_context: Optional[Dict[str, Any]] = None, 

269 db: Optional[Session] = None, 

270 ) -> Optional[SecurityEvent]: 

271 """Log suspicious activity with threat analysis. 

272 

273 Args: 

274 activity_type: Type of suspicious activity 

275 description: Event description 

276 user_id: User identifier (if known) 

277 user_email: User email (if known) 

278 client_ip: Client IP address 

279 user_agent: Client user agent 

280 threat_score: Calculated threat score (0.0-1.0) 

281 severity: Event severity 

282 threat_indicators: Dictionary of threat indicators 

283 action_taken: Action taken in response 

284 additional_context: Additional context 

285 db: Optional database session 

286 

287 Returns: 

288 Created SecurityEvent or None 

289 """ 

290 correlation_id = get_correlation_id() 

291 

292 event = self._create_security_event( 

293 event_type=SecurityEventType.SUSPICIOUS_ACTIVITY, 

294 severity=severity, 

295 category="suspicious_activity", 

296 user_id=user_id, 

297 user_email=user_email, 

298 client_ip=client_ip, 

299 user_agent=user_agent, 

300 description=description, 

301 threat_score=threat_score, 

302 threat_indicators=threat_indicators, 

303 action_taken=action_taken, 

304 context=additional_context, 

305 correlation_id=correlation_id, 

306 db=db, 

307 ) 

308 

309 logger.warning( 

310 f"Suspicious activity detected: {description}", 

311 extra={ 

312 "security_event": True, 

313 "activity_type": activity_type, 

314 "severity": severity.value, 

315 "threat_score": threat_score, 

316 "action_taken": action_taken, 

317 "correlation_id": correlation_id, 

318 }, 

319 ) 

320 

321 return event 

322 

323 def _count_recent_failures(self, user_id: Optional[str] = None, client_ip: Optional[str] = None, minutes: Optional[int] = None, db: Optional[Session] = None) -> int: 

324 """Count recent authentication failures. 

325 

326 Args: 

327 user_id: User identifier 

328 client_ip: Client IP address 

329 minutes: Time window in minutes 

330 db: Optional database session 

331 

332 Returns: 

333 Count of recent failures 

334 """ 

335 if not user_id and not client_ip: 

336 return 0 

337 

338 window_minutes = minutes or self.rate_limit_window_minutes 

339 since = datetime.now(timezone.utc) - timedelta(minutes=window_minutes) 

340 

341 should_close = False 

342 if db is None: 

343 db = SessionLocal() 

344 should_close = True 

345 

346 try: 

347 stmt = select(func.count(SecurityEvent.id)).where(SecurityEvent.event_type == SecurityEventType.AUTHENTICATION_FAILURE, SecurityEvent.timestamp >= since) # pylint: disable=not-callable 

348 

349 if user_id: 

350 stmt = stmt.where(SecurityEvent.user_id == user_id) 

351 if client_ip: 

352 stmt = stmt.where(SecurityEvent.client_ip == client_ip) 

353 

354 result = db.execute(stmt).scalar() 

355 return result or 0 

356 

357 finally: 

358 if should_close: 

359 db.commit() # End read-only transaction cleanly 

360 db.close() 

361 

362 def _calculate_auth_threat_score(self, success: bool, failed_attempts: int, auth_method: str) -> float: # pylint: disable=unused-argument 

363 """Calculate threat score for authentication attempt. 

364 

365 Args: 

366 success: Whether authentication succeeded 

367 failed_attempts: Count of recent failures 

368 auth_method: Authentication method used 

369 

370 Returns: 

371 Threat score from 0.0 to 1.0 

372 """ 

373 if success: 

374 return 0.0 

375 

376 # Base score for failure 

377 score = 0.3 

378 

379 # Increase based on failed attempts 

380 if failed_attempts >= 10: 

381 score += 0.5 

382 elif failed_attempts >= 5: 

383 score += 0.3 

384 elif failed_attempts >= 3: 

385 score += 0.2 

386 

387 # Cap at 1.0 

388 return min(score, 1.0) 

389 

390 def _requires_audit_review(self, action: str, resource_type: str, data_classification: Optional[str], success: bool) -> bool: 

391 """Determine if audit entry requires manual review. 

392 

393 Args: 

394 action: Action performed 

395 resource_type: Resource type 

396 data_classification: Data classification 

397 success: Whether action succeeded 

398 

399 Returns: 

400 True if review required 

401 """ 

402 # Failed actions on sensitive data require review 

403 if not success and data_classification in ["confidential", "restricted"]: 

404 return True 

405 

406 # Deletions of sensitive data require review 

407 if action == "delete" and data_classification in ["confidential", "restricted"]: 

408 return True 

409 

410 # Privilege modifications require review 

411 if resource_type in ["role", "permission", "team_member"]: 

412 return True 

413 

414 return False 

415 

416 def _create_security_event( 

417 self, 

418 event_type: str, 

419 severity: SecuritySeverity, 

420 category: str, 

421 client_ip: str, 

422 description: str, 

423 threat_score: float, 

424 user_id: Optional[str] = None, 

425 user_email: Optional[str] = None, 

426 user_agent: Optional[str] = None, 

427 action_taken: Optional[str] = None, 

428 failed_attempts_count: int = 0, 

429 threat_indicators: Optional[Dict[str, Any]] = None, 

430 context: Optional[Dict[str, Any]] = None, 

431 correlation_id: Optional[str] = None, 

432 db: Optional[Session] = None, 

433 ) -> Optional[SecurityEvent]: 

434 """Create a security event record. 

435 

436 Args: 

437 event_type: Type of security event 

438 severity: Event severity 

439 category: Event category 

440 client_ip: Client IP address 

441 description: Event description 

442 threat_score: Threat score (0.0-1.0) 

443 user_id: User identifier 

444 user_email: User email 

445 user_agent: User agent string 

446 action_taken: Action taken 

447 failed_attempts_count: Failed attempts count 

448 threat_indicators: Threat indicators 

449 context: Additional context 

450 correlation_id: Correlation ID 

451 db: Optional database session 

452 

453 Returns: 

454 Created SecurityEvent or None 

455 """ 

456 should_close = False 

457 if db is None: 

458 db = SessionLocal() 

459 should_close = True 

460 

461 try: 

462 event = SecurityEvent( 

463 event_type=event_type, 

464 severity=severity.value, 

465 category=category, 

466 user_id=user_id, 

467 user_email=user_email, 

468 client_ip=client_ip, 

469 user_agent=user_agent, 

470 description=description, 

471 action_taken=action_taken, 

472 threat_score=threat_score, 

473 threat_indicators=threat_indicators or {}, 

474 failed_attempts_count=failed_attempts_count, 

475 context=context, 

476 correlation_id=correlation_id, 

477 ) 

478 

479 db.add(event) 

480 db.commit() 

481 db.refresh(event) 

482 

483 return event 

484 

485 except Exception as e: 

486 logger.error(f"Failed to create security event: {e}") 

487 db.rollback() 

488 return None 

489 

490 finally: 

491 if should_close: 

492 db.close() 

493 

494 def _create_audit_trail( # pylint: disable=too-many-positional-arguments 

495 self, 

496 action: str, 

497 resource_type: str, 

498 user_id: str, 

499 success: bool, 

500 resource_id: Optional[str] = None, 

501 resource_name: Optional[str] = None, 

502 user_email: Optional[str] = None, 

503 team_id: Optional[str] = None, 

504 client_ip: Optional[str] = None, 

505 user_agent: Optional[str] = None, 

506 old_values: Optional[Dict[str, Any]] = None, 

507 new_values: Optional[Dict[str, Any]] = None, 

508 changes: Optional[Dict[str, Any]] = None, 

509 data_classification: Optional[str] = None, 

510 requires_review: bool = False, 

511 error_message: Optional[str] = None, 

512 context: Optional[Dict[str, Any]] = None, 

513 correlation_id: Optional[str] = None, 

514 db: Optional[Session] = None, 

515 ) -> Optional[AuditTrail]: 

516 """Create an audit trail record. 

517 

518 Args: 

519 action: Action performed 

520 resource_type: Resource type 

521 user_id: User performing action 

522 success: Whether action succeeded 

523 resource_id: Resource identifier 

524 resource_name: Resource name 

525 user_email: User email 

526 team_id: Team context 

527 client_ip: Client IP 

528 user_agent: User agent 

529 old_values: Previous values 

530 new_values: New values 

531 changes: Calculated changes 

532 data_classification: Data classification 

533 requires_review: Whether manual review needed 

534 error_message: Error message if failed 

535 context: Additional context 

536 correlation_id: Correlation ID 

537 db: Optional database session 

538 

539 Returns: 

540 Created AuditTrail or None 

541 """ 

542 should_close = False 

543 if db is None: 

544 db = SessionLocal() 

545 should_close = True 

546 

547 try: 

548 audit = AuditTrail( 

549 action=action, 

550 resource_type=resource_type, 

551 resource_id=resource_id, 

552 resource_name=resource_name, 

553 user_id=user_id, 

554 user_email=user_email, 

555 team_id=team_id, 

556 client_ip=client_ip, 

557 user_agent=user_agent, 

558 old_values=old_values, 

559 new_values=new_values, 

560 changes=changes, 

561 data_classification=data_classification, 

562 requires_review=requires_review, 

563 success=success, 

564 error_message=error_message, 

565 context=context, 

566 correlation_id=correlation_id, 

567 ) 

568 

569 db.add(audit) 

570 db.commit() 

571 db.refresh(audit) 

572 

573 return audit 

574 

575 except Exception as e: 

576 logger.error(f"Failed to create audit trail: {e}") 

577 db.rollback() 

578 return None 

579 

580 finally: 

581 if should_close: 

582 db.close() 

583 

584 

585# Global security logger instance 

586_security_logger: Optional[SecurityLogger] = None 

587 

588 

589def get_security_logger() -> SecurityLogger: 

590 """Get or create the global security logger instance. 

591 

592 Returns: 

593 Global SecurityLogger instance 

594 """ 

595 global _security_logger # pylint: disable=global-statement 

596 if _security_logger is None: 596 ↛ 598line 596 didn't jump to line 598 because the condition on line 596 was always true

597 _security_logger = SecurityLogger() 

598 return _security_logger