Coverage for mcpgateway / services / security_logger.py: 99%
144 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/security_logger.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
6Security Logger Service.
8This module provides specialized logging for security events, threat detection,
9and audit trail management with automated threat analysis and alerting.
10"""
12# Standard
13from datetime import datetime, timedelta, timezone
14from enum import Enum
15import logging
16from typing import Any, Dict, Optional
18# Third-Party
19from sqlalchemy import func, select
20from sqlalchemy.orm import Session
22# First-Party
23from mcpgateway.config import settings
24from mcpgateway.db import AuditTrail, SecurityEvent, SessionLocal
25from mcpgateway.utils.correlation_id import get_correlation_id
27logger = logging.getLogger(__name__)
30class SecuritySeverity(str, Enum):
31 """Security event severity levels."""
33 LOW = "LOW"
34 MEDIUM = "MEDIUM"
35 HIGH = "HIGH"
36 CRITICAL = "CRITICAL"
39class SecurityEventType(str, Enum):
40 """Types of security events."""
42 AUTHENTICATION_FAILURE = "authentication_failure"
43 AUTHENTICATION_SUCCESS = "authentication_success"
44 AUTHORIZATION_FAILURE = "authorization_failure"
45 SUSPICIOUS_ACTIVITY = "suspicious_activity"
46 RATE_LIMIT_EXCEEDED = "rate_limit_exceeded"
47 BRUTE_FORCE_ATTEMPT = "brute_force_attempt"
48 TOKEN_MANIPULATION = "token_manipulation" # nosec B105 - Not a password, security event type constant
49 DATA_EXFILTRATION = "data_exfiltration"
50 PRIVILEGE_ESCALATION = "privilege_escalation"
51 INJECTION_ATTEMPT = "injection_attempt"
52 ANOMALOUS_BEHAVIOR = "anomalous_behavior"
55class SecurityLogger:
56 """Specialized logger for security events and audit trails.
58 Provides threat detection, security event logging, and audit trail
59 management with automated analysis and alerting capabilities.
60 """
62 def __init__(self):
63 """Initialize security logger."""
64 self.failed_auth_threshold = getattr(settings, "security_failed_auth_threshold", 5)
65 self.threat_score_alert_threshold = getattr(settings, "security_threat_score_alert", 0.7)
66 self.rate_limit_window_minutes = getattr(settings, "security_rate_limit_window", 5)
68 def log_authentication_attempt(
69 self,
70 user_id: str,
71 user_email: Optional[str],
72 auth_method: str,
73 success: bool,
74 client_ip: str,
75 user_agent: Optional[str] = None,
76 failure_reason: Optional[str] = None,
77 additional_context: Optional[Dict[str, Any]] = None,
78 db: Optional[Session] = None,
79 ) -> Optional[SecurityEvent]:
80 """Log authentication attempts with security analysis.
82 Args:
83 user_id: User identifier
84 user_email: User email address
85 auth_method: Authentication method used
86 success: Whether authentication succeeded
87 client_ip: Client IP address
88 user_agent: Client user agent
89 failure_reason: Reason for failure if applicable
90 additional_context: Additional event context
91 db: Optional database session
93 Returns:
94 Created SecurityEvent or None if logging disabled
95 """
96 correlation_id = get_correlation_id()
98 # Count recent failed attempts
99 failed_attempts = self._count_recent_failures(user_id=user_id, client_ip=client_ip, db=db)
101 # Calculate threat score
102 threat_score = self._calculate_auth_threat_score(success=success, failed_attempts=failed_attempts, auth_method=auth_method)
104 # Determine severity
105 if not success:
106 if failed_attempts >= self.failed_auth_threshold:
107 severity = SecuritySeverity.HIGH
108 elif failed_attempts >= 3:
109 severity = SecuritySeverity.MEDIUM
110 else:
111 severity = SecuritySeverity.LOW
112 else:
113 severity = SecuritySeverity.LOW
115 # Build event description
116 description = f"Authentication {'successful' if success else 'failed'} for user {user_id}"
117 if not success and failure_reason:
118 description += f": {failure_reason}"
120 # Build context
121 context = {"auth_method": auth_method, "failed_attempts_recent": failed_attempts, "user_agent": user_agent, **(additional_context or {})}
123 # Create security event
124 event = self._create_security_event(
125 event_type=SecurityEventType.AUTHENTICATION_SUCCESS if success else SecurityEventType.AUTHENTICATION_FAILURE,
126 severity=severity,
127 category="authentication",
128 user_id=user_id,
129 user_email=user_email,
130 client_ip=client_ip,
131 user_agent=user_agent,
132 description=description,
133 threat_score=threat_score,
134 failed_attempts_count=failed_attempts,
135 context=context,
136 action_taken="allowed" if success else "denied",
137 correlation_id=correlation_id,
138 db=db,
139 )
141 # Log to standard logger as well
142 log_level = logging.WARNING if not success else logging.INFO
143 logger.log(
144 log_level,
145 f"Authentication attempt: {description}",
146 extra={
147 "security_event": True,
148 "event_type": event.event_type if event else None,
149 "severity": severity.value,
150 "threat_score": threat_score,
151 "correlation_id": correlation_id,
152 },
153 )
155 return event
157 def log_data_access( # pylint: disable=too-many-positional-arguments
158 self,
159 action: str,
160 resource_type: str,
161 resource_id: str,
162 resource_name: Optional[str],
163 user_id: str,
164 user_email: Optional[str],
165 team_id: Optional[str],
166 client_ip: Optional[str],
167 user_agent: Optional[str],
168 success: bool,
169 data_classification: Optional[str] = None,
170 old_values: Optional[Dict[str, Any]] = None,
171 new_values: Optional[Dict[str, Any]] = None,
172 error_message: Optional[str] = None,
173 additional_context: Optional[Dict[str, Any]] = None,
174 db: Optional[Session] = None,
175 ) -> Optional[AuditTrail]:
176 """Log data access for audit trails.
178 Args:
179 action: Action performed (create, read, update, delete, execute)
180 resource_type: Type of resource accessed
181 resource_id: Resource identifier
182 resource_name: Resource name
183 user_id: User performing the action
184 user_email: User email
185 team_id: Team context
186 client_ip: Client IP address
187 user_agent: Client user agent
188 success: Whether action succeeded
189 data_classification: Data sensitivity classification
190 old_values: Previous values (for updates)
191 new_values: New values (for updates/creates)
192 error_message: Error message if failed
193 additional_context: Additional context
194 db: Optional database session
196 Returns:
197 Created AuditTrail entry or None
198 """
199 correlation_id = get_correlation_id()
201 # Determine if audit requires review
202 requires_review = self._requires_audit_review(action=action, resource_type=resource_type, data_classification=data_classification, success=success)
204 # Calculate changes
205 changes = None
206 if old_values and new_values:
207 changes = {k: {"old": old_values.get(k), "new": new_values.get(k)} for k in set(old_values.keys()) | set(new_values.keys()) if old_values.get(k) != new_values.get(k)}
209 # Create audit trail
210 audit = self._create_audit_trail(
211 action=action,
212 resource_type=resource_type,
213 resource_id=resource_id,
214 resource_name=resource_name,
215 user_id=user_id,
216 user_email=user_email,
217 team_id=team_id,
218 client_ip=client_ip,
219 user_agent=user_agent,
220 success=success,
221 old_values=old_values,
222 new_values=new_values,
223 changes=changes,
224 data_classification=data_classification,
225 requires_review=requires_review,
226 error_message=error_message,
227 context=additional_context,
228 correlation_id=correlation_id,
229 db=db,
230 )
232 # Log sensitive data access as security event
233 if data_classification in ["confidential", "restricted", "sensitive"]:
234 self._create_security_event(
235 event_type="data_access",
236 severity=SecuritySeverity.MEDIUM if success else SecuritySeverity.HIGH,
237 category="data_access",
238 user_id=user_id,
239 user_email=user_email,
240 client_ip=client_ip or "unknown",
241 user_agent=user_agent,
242 description=f"Access to {data_classification} {resource_type}: {resource_name or resource_id}",
243 threat_score=0.3 if success else 0.6,
244 context={
245 "action": action,
246 "resource_type": resource_type,
247 "resource_id": resource_id,
248 "data_classification": data_classification,
249 },
250 correlation_id=correlation_id,
251 db=db,
252 )
254 return audit
256 def log_suspicious_activity(
257 self,
258 activity_type: str,
259 description: str,
260 user_id: Optional[str],
261 user_email: Optional[str],
262 client_ip: str,
263 user_agent: Optional[str],
264 threat_score: float,
265 severity: SecuritySeverity,
266 threat_indicators: Dict[str, Any],
267 action_taken: str,
268 additional_context: Optional[Dict[str, Any]] = None,
269 db: Optional[Session] = None,
270 ) -> Optional[SecurityEvent]:
271 """Log suspicious activity with threat analysis.
273 Args:
274 activity_type: Type of suspicious activity
275 description: Event description
276 user_id: User identifier (if known)
277 user_email: User email (if known)
278 client_ip: Client IP address
279 user_agent: Client user agent
280 threat_score: Calculated threat score (0.0-1.0)
281 severity: Event severity
282 threat_indicators: Dictionary of threat indicators
283 action_taken: Action taken in response
284 additional_context: Additional context
285 db: Optional database session
287 Returns:
288 Created SecurityEvent or None
289 """
290 correlation_id = get_correlation_id()
292 event = self._create_security_event(
293 event_type=SecurityEventType.SUSPICIOUS_ACTIVITY,
294 severity=severity,
295 category="suspicious_activity",
296 user_id=user_id,
297 user_email=user_email,
298 client_ip=client_ip,
299 user_agent=user_agent,
300 description=description,
301 threat_score=threat_score,
302 threat_indicators=threat_indicators,
303 action_taken=action_taken,
304 context=additional_context,
305 correlation_id=correlation_id,
306 db=db,
307 )
309 logger.warning(
310 f"Suspicious activity detected: {description}",
311 extra={
312 "security_event": True,
313 "activity_type": activity_type,
314 "severity": severity.value,
315 "threat_score": threat_score,
316 "action_taken": action_taken,
317 "correlation_id": correlation_id,
318 },
319 )
321 return event
323 def _count_recent_failures(self, user_id: Optional[str] = None, client_ip: Optional[str] = None, minutes: Optional[int] = None, db: Optional[Session] = None) -> int:
324 """Count recent authentication failures.
326 Args:
327 user_id: User identifier
328 client_ip: Client IP address
329 minutes: Time window in minutes
330 db: Optional database session
332 Returns:
333 Count of recent failures
334 """
335 if not user_id and not client_ip:
336 return 0
338 window_minutes = minutes or self.rate_limit_window_minutes
339 since = datetime.now(timezone.utc) - timedelta(minutes=window_minutes)
341 should_close = False
342 if db is None:
343 db = SessionLocal()
344 should_close = True
346 try:
347 stmt = select(func.count(SecurityEvent.id)).where(SecurityEvent.event_type == SecurityEventType.AUTHENTICATION_FAILURE, SecurityEvent.timestamp >= since) # pylint: disable=not-callable
349 if user_id:
350 stmt = stmt.where(SecurityEvent.user_id == user_id)
351 if client_ip:
352 stmt = stmt.where(SecurityEvent.client_ip == client_ip)
354 result = db.execute(stmt).scalar()
355 return result or 0
357 finally:
358 if should_close:
359 db.commit() # End read-only transaction cleanly
360 db.close()
362 def _calculate_auth_threat_score(self, success: bool, failed_attempts: int, auth_method: str) -> float: # pylint: disable=unused-argument
363 """Calculate threat score for authentication attempt.
365 Args:
366 success: Whether authentication succeeded
367 failed_attempts: Count of recent failures
368 auth_method: Authentication method used
370 Returns:
371 Threat score from 0.0 to 1.0
372 """
373 if success:
374 return 0.0
376 # Base score for failure
377 score = 0.3
379 # Increase based on failed attempts
380 if failed_attempts >= 10:
381 score += 0.5
382 elif failed_attempts >= 5:
383 score += 0.3
384 elif failed_attempts >= 3:
385 score += 0.2
387 # Cap at 1.0
388 return min(score, 1.0)
390 def _requires_audit_review(self, action: str, resource_type: str, data_classification: Optional[str], success: bool) -> bool:
391 """Determine if audit entry requires manual review.
393 Args:
394 action: Action performed
395 resource_type: Resource type
396 data_classification: Data classification
397 success: Whether action succeeded
399 Returns:
400 True if review required
401 """
402 # Failed actions on sensitive data require review
403 if not success and data_classification in ["confidential", "restricted"]:
404 return True
406 # Deletions of sensitive data require review
407 if action == "delete" and data_classification in ["confidential", "restricted"]:
408 return True
410 # Privilege modifications require review
411 if resource_type in ["role", "permission", "team_member"]:
412 return True
414 return False
416 def _create_security_event(
417 self,
418 event_type: str,
419 severity: SecuritySeverity,
420 category: str,
421 client_ip: str,
422 description: str,
423 threat_score: float,
424 user_id: Optional[str] = None,
425 user_email: Optional[str] = None,
426 user_agent: Optional[str] = None,
427 action_taken: Optional[str] = None,
428 failed_attempts_count: int = 0,
429 threat_indicators: Optional[Dict[str, Any]] = None,
430 context: Optional[Dict[str, Any]] = None,
431 correlation_id: Optional[str] = None,
432 db: Optional[Session] = None,
433 ) -> Optional[SecurityEvent]:
434 """Create a security event record.
436 Args:
437 event_type: Type of security event
438 severity: Event severity
439 category: Event category
440 client_ip: Client IP address
441 description: Event description
442 threat_score: Threat score (0.0-1.0)
443 user_id: User identifier
444 user_email: User email
445 user_agent: User agent string
446 action_taken: Action taken
447 failed_attempts_count: Failed attempts count
448 threat_indicators: Threat indicators
449 context: Additional context
450 correlation_id: Correlation ID
451 db: Optional database session
453 Returns:
454 Created SecurityEvent or None
455 """
456 should_close = False
457 if db is None:
458 db = SessionLocal()
459 should_close = True
461 try:
462 event = SecurityEvent(
463 event_type=event_type,
464 severity=severity.value,
465 category=category,
466 user_id=user_id,
467 user_email=user_email,
468 client_ip=client_ip,
469 user_agent=user_agent,
470 description=description,
471 action_taken=action_taken,
472 threat_score=threat_score,
473 threat_indicators=threat_indicators or {},
474 failed_attempts_count=failed_attempts_count,
475 context=context,
476 correlation_id=correlation_id,
477 )
479 db.add(event)
480 db.commit()
481 db.refresh(event)
483 return event
485 except Exception as e:
486 logger.error(f"Failed to create security event: {e}")
487 db.rollback()
488 return None
490 finally:
491 if should_close:
492 db.close()
494 def _create_audit_trail( # pylint: disable=too-many-positional-arguments
495 self,
496 action: str,
497 resource_type: str,
498 user_id: str,
499 success: bool,
500 resource_id: Optional[str] = None,
501 resource_name: Optional[str] = None,
502 user_email: Optional[str] = None,
503 team_id: Optional[str] = None,
504 client_ip: Optional[str] = None,
505 user_agent: Optional[str] = None,
506 old_values: Optional[Dict[str, Any]] = None,
507 new_values: Optional[Dict[str, Any]] = None,
508 changes: Optional[Dict[str, Any]] = None,
509 data_classification: Optional[str] = None,
510 requires_review: bool = False,
511 error_message: Optional[str] = None,
512 context: Optional[Dict[str, Any]] = None,
513 correlation_id: Optional[str] = None,
514 db: Optional[Session] = None,
515 ) -> Optional[AuditTrail]:
516 """Create an audit trail record.
518 Args:
519 action: Action performed
520 resource_type: Resource type
521 user_id: User performing action
522 success: Whether action succeeded
523 resource_id: Resource identifier
524 resource_name: Resource name
525 user_email: User email
526 team_id: Team context
527 client_ip: Client IP
528 user_agent: User agent
529 old_values: Previous values
530 new_values: New values
531 changes: Calculated changes
532 data_classification: Data classification
533 requires_review: Whether manual review needed
534 error_message: Error message if failed
535 context: Additional context
536 correlation_id: Correlation ID
537 db: Optional database session
539 Returns:
540 Created AuditTrail or None
541 """
542 should_close = False
543 if db is None:
544 db = SessionLocal()
545 should_close = True
547 try:
548 audit = AuditTrail(
549 action=action,
550 resource_type=resource_type,
551 resource_id=resource_id,
552 resource_name=resource_name,
553 user_id=user_id,
554 user_email=user_email,
555 team_id=team_id,
556 client_ip=client_ip,
557 user_agent=user_agent,
558 old_values=old_values,
559 new_values=new_values,
560 changes=changes,
561 data_classification=data_classification,
562 requires_review=requires_review,
563 success=success,
564 error_message=error_message,
565 context=context,
566 correlation_id=correlation_id,
567 )
569 db.add(audit)
570 db.commit()
571 db.refresh(audit)
573 return audit
575 except Exception as e:
576 logger.error(f"Failed to create audit trail: {e}")
577 db.rollback()
578 return None
580 finally:
581 if should_close:
582 db.close()
585# Global security logger instance
586_security_logger: Optional[SecurityLogger] = None
589def get_security_logger() -> SecurityLogger:
590 """Get or create the global security logger instance.
592 Returns:
593 Global SecurityLogger instance
594 """
595 global _security_logger # pylint: disable=global-statement
596 if _security_logger is None: 596 ↛ 598line 596 didn't jump to line 598 because the condition on line 596 was always true
597 _security_logger = SecurityLogger()
598 return _security_logger