Coverage for mcpgateway / services / audit_trail_service.py: 97%
124 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/audit_trail_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
6Audit Trail Service.
8This module provides audit trail management for CRUD operations,
9data access tracking, and compliance logging.
10"""
12# Standard
13from datetime import datetime, timezone
14from enum import Enum
15import logging
16from typing import Any, Dict, Optional
18# Third-Party
19from sqlalchemy import select
20from sqlalchemy.orm import Session
22# First-Party
23from mcpgateway.config import settings
24from mcpgateway.db import AuditTrail, SessionLocal
25from mcpgateway.utils.correlation_id import get_or_generate_correlation_id
27logger = logging.getLogger(__name__)
30class AuditAction(str, Enum):
31 """Audit trail action types."""
33 CREATE = "CREATE"
34 READ = "READ"
35 UPDATE = "UPDATE"
36 DELETE = "DELETE"
37 EXECUTE = "EXECUTE"
38 ACCESS = "ACCESS"
39 EXPORT = "EXPORT"
40 IMPORT = "IMPORT"
43class DataClassification(str, Enum):
44 """Data classification levels."""
46 PUBLIC = "public"
47 INTERNAL = "internal"
48 CONFIDENTIAL = "confidential"
49 RESTRICTED = "restricted"
52REVIEW_REQUIRED_ACTIONS = {
53 "delete_server",
54 "delete_tool",
55 "delete_resource",
56 "delete_gateway",
57 "update_sensitive_config",
58 "bulk_delete",
59}
62class AuditTrailService:
63 """Service for managing audit trails and compliance logging.
65 Provides comprehensive audit trail management with data classification,
66 change tracking, and compliance reporting capabilities.
67 """
69 def __init__(self):
70 """Initialize audit trail service."""
72 def log_action( # pylint: disable=too-many-positional-arguments
73 self,
74 action: str,
75 resource_type: str,
76 resource_id: str,
77 user_id: str,
78 user_email: Optional[str] = None,
79 team_id: Optional[str] = None,
80 resource_name: Optional[str] = None,
81 client_ip: Optional[str] = None,
82 user_agent: Optional[str] = None,
83 request_path: Optional[str] = None,
84 request_method: Optional[str] = None,
85 old_values: Optional[Dict[str, Any]] = None,
86 new_values: Optional[Dict[str, Any]] = None,
87 changes: Optional[Dict[str, Any]] = None,
88 data_classification: Optional[str] = None,
89 requires_review: Optional[bool] = None,
90 success: bool = True,
91 error_message: Optional[str] = None,
92 context: Optional[Dict[str, Any]] = None,
93 details: Optional[Dict[str, Any]] = None,
94 metadata: Optional[Dict[str, Any]] = None,
95 db: Optional[Session] = None,
96 ) -> Optional[AuditTrail]:
97 """Log an audit trail entry.
99 Args:
100 action: Action performed (CREATE, READ, UPDATE, DELETE, etc.)
101 resource_type: Type of resource (tool, server, prompt, etc.)
102 resource_id: ID of the resource
103 user_id: User who performed the action
104 user_email: User's email address
105 team_id: Team ID if applicable
106 resource_name: Name of the resource
107 client_ip: Client IP address
108 user_agent: Client user agent
109 request_path: HTTP request path
110 request_method: HTTP request method
111 old_values: Previous values before change
112 new_values: New values after change
113 changes: Specific changes made
114 data_classification: Data classification level
115 requires_review: Whether this action requires review (None = auto)
116 success: Whether the action succeeded
117 error_message: Error message if failed
118 context: Additional context
119 details: Extra key/value payload (stored under context.details)
120 metadata: Extra metadata payload (stored under context.metadata)
121 db: Optional database session
123 Returns:
124 Created AuditTrail entry or None if logging disabled
125 """
126 # Check if audit trail logging is enabled
127 if not settings.audit_trail_enabled:
128 return None
130 correlation_id = get_or_generate_correlation_id()
132 # Use provided session or create new one
133 close_db = False
134 if db is None: 134 ↛ 138line 134 didn't jump to line 138 because the condition on line 134 was always true
135 db = SessionLocal()
136 close_db = True
138 try:
139 context_payload: Dict[str, Any] = dict(context) if context else {}
140 if details:
141 context_payload["details"] = details
142 if metadata:
143 context_payload["metadata"] = metadata
144 context_value = context_payload if context_payload else None
146 requires_review_flag = self._determine_requires_review(
147 action=action,
148 data_classification=data_classification,
149 requires_review_param=requires_review,
150 )
152 # Create audit trail entry
153 audit_entry = AuditTrail(
154 timestamp=datetime.now(timezone.utc),
155 correlation_id=correlation_id,
156 action=action,
157 resource_type=resource_type,
158 resource_id=resource_id,
159 resource_name=resource_name,
160 user_id=user_id,
161 user_email=user_email,
162 team_id=team_id,
163 client_ip=client_ip,
164 user_agent=user_agent,
165 request_path=request_path,
166 request_method=request_method,
167 old_values=old_values,
168 new_values=new_values,
169 changes=changes,
170 data_classification=data_classification,
171 requires_review=requires_review_flag,
172 success=success,
173 error_message=error_message,
174 context=context_value,
175 )
177 db.add(audit_entry)
178 db.commit()
179 db.refresh(audit_entry)
181 logger.debug(
182 f"Audit trail logged: {action} {resource_type}/{resource_id} by {user_id}",
183 extra={"correlation_id": correlation_id, "action": action, "resource_type": resource_type, "resource_id": resource_id, "user_id": user_id, "success": success},
184 )
186 return audit_entry
188 except Exception as e:
189 logger.error(f"Failed to log audit trail: {e}", exc_info=True, extra={"correlation_id": correlation_id, "action": action, "resource_type": resource_type, "resource_id": resource_id})
190 if close_db: 190 ↛ 192line 190 didn't jump to line 192 because the condition on line 190 was always true
191 db.rollback()
192 return None
194 finally:
195 if close_db:
196 db.close()
198 def _determine_requires_review(
199 self,
200 action: Optional[str],
201 data_classification: Optional[str],
202 requires_review_param: Optional[bool],
203 ) -> bool:
204 """Resolve whether an audit entry should require review.
206 Args:
207 action: Action being performed
208 data_classification: Data classification level
209 requires_review_param: Explicit review requirement
211 Returns:
212 bool: Whether the audit entry requires review
213 """
214 if requires_review_param is not None:
215 return requires_review_param
217 if data_classification in {DataClassification.CONFIDENTIAL.value, DataClassification.RESTRICTED.value}:
218 return True
220 normalized_action = (action or "").lower()
221 if normalized_action in REVIEW_REQUIRED_ACTIONS:
222 return True
224 return False
226 def log_crud_operation(
227 self,
228 operation: str,
229 resource_type: str,
230 resource_id: str,
231 user_id: str,
232 user_email: Optional[str] = None,
233 team_id: Optional[str] = None,
234 resource_name: Optional[str] = None,
235 old_values: Optional[Dict[str, Any]] = None,
236 new_values: Optional[Dict[str, Any]] = None,
237 success: bool = True,
238 error_message: Optional[str] = None,
239 db: Optional[Session] = None,
240 **kwargs,
241 ) -> Optional[AuditTrail]:
242 """Log a CRUD operation with change tracking.
244 Args:
245 operation: CRUD operation (CREATE, READ, UPDATE, DELETE)
246 resource_type: Type of resource
247 resource_id: ID of the resource
248 user_id: User who performed the operation
249 user_email: User's email
250 team_id: Team ID if applicable
251 resource_name: Name of the resource
252 old_values: Previous values (for UPDATE/DELETE)
253 new_values: New values (for CREATE/UPDATE)
254 success: Whether the operation succeeded
255 error_message: Error message if failed
256 db: Optional database session
257 **kwargs: Additional arguments passed to log_action
259 Returns:
260 Created AuditTrail entry
261 """
262 # Calculate changes for UPDATE operations
263 changes = None
264 if operation == "UPDATE" and old_values and new_values:
265 changes = {}
266 for key in set(old_values.keys()) | set(new_values.keys()):
267 old_val = old_values.get(key)
268 new_val = new_values.get(key)
269 if old_val != new_val:
270 changes[key] = {"old": old_val, "new": new_val}
272 # Determine data classification based on resource type
273 data_classification = None
274 if resource_type in ["user", "team", "token", "credential"]:
275 data_classification = DataClassification.CONFIDENTIAL.value
276 elif resource_type in ["tool", "server", "prompt", "resource"]: 276 ↛ 280line 276 didn't jump to line 280 because the condition on line 276 was always true
277 data_classification = DataClassification.INTERNAL.value
279 # Determine if review is required
280 requires_review = False
281 if data_classification == DataClassification.CONFIDENTIAL.value:
282 requires_review = True
283 if operation == "DELETE" and resource_type in ["tool", "server", "gateway"]:
284 requires_review = True
286 return self.log_action(
287 action=operation,
288 resource_type=resource_type,
289 resource_id=resource_id,
290 user_id=user_id,
291 user_email=user_email,
292 team_id=team_id,
293 resource_name=resource_name,
294 old_values=old_values,
295 new_values=new_values,
296 changes=changes,
297 data_classification=data_classification,
298 requires_review=requires_review,
299 success=success,
300 error_message=error_message,
301 db=db,
302 **kwargs,
303 )
305 def log_data_access(
306 self,
307 resource_type: str,
308 resource_id: str,
309 user_id: str,
310 access_type: str = "READ",
311 user_email: Optional[str] = None,
312 team_id: Optional[str] = None,
313 resource_name: Optional[str] = None,
314 data_classification: Optional[str] = None,
315 db: Optional[Session] = None,
316 **kwargs,
317 ) -> Optional[AuditTrail]:
318 """Log data access for compliance tracking.
320 Args:
321 resource_type: Type of resource accessed
322 resource_id: ID of the resource
323 user_id: User who accessed the data
324 access_type: Type of access (READ, EXPORT, etc.)
325 user_email: User's email
326 team_id: Team ID if applicable
327 resource_name: Name of the resource
328 data_classification: Data classification level
329 db: Optional database session
330 **kwargs: Additional arguments passed to log_action
332 Returns:
333 Created AuditTrail entry
334 """
335 requires_review = data_classification in [DataClassification.CONFIDENTIAL.value, DataClassification.RESTRICTED.value]
337 return self.log_action(
338 action=access_type,
339 resource_type=resource_type,
340 resource_id=resource_id,
341 user_id=user_id,
342 user_email=user_email,
343 team_id=team_id,
344 resource_name=resource_name,
345 data_classification=data_classification,
346 requires_review=requires_review,
347 success=True,
348 db=db,
349 **kwargs,
350 )
352 def log_audit(
353 self, user_id: str, resource_type: str, resource_id: str, action: str, user_email: Optional[str] = None, description: Optional[str] = None, db: Optional[Session] = None, **kwargs
354 ) -> Optional[AuditTrail]:
355 """Convenience method for simple audit logging.
357 Args:
358 user_id: User who performed the action
359 resource_type: Type of resource
360 resource_id: ID of the resource
361 action: Action performed
362 user_email: User's email
363 description: Description of the action
364 db: Optional database session
365 **kwargs: Additional arguments passed to log_action
367 Returns:
368 Created AuditTrail entry
369 """
370 # Build context if description provided
371 context = kwargs.pop("context", {})
372 if description: 372 ↛ 375line 372 didn't jump to line 375 because the condition on line 372 was always true
373 context["description"] = description
375 return self.log_action(action=action, resource_type=resource_type, resource_id=resource_id, user_id=user_id, user_email=user_email, context=context if context else None, db=db, **kwargs)
377 def get_audit_trail(
378 self,
379 resource_type: Optional[str] = None,
380 resource_id: Optional[str] = None,
381 user_id: Optional[str] = None,
382 action: Optional[str] = None,
383 start_time: Optional[datetime] = None,
384 end_time: Optional[datetime] = None,
385 limit: int = 100,
386 offset: int = 0,
387 db: Optional[Session] = None,
388 ) -> list[AuditTrail]:
389 """Query audit trail entries.
391 Args:
392 resource_type: Filter by resource type
393 resource_id: Filter by resource ID
394 user_id: Filter by user ID
395 action: Filter by action
396 start_time: Filter by start time
397 end_time: Filter by end time
398 limit: Maximum number of results
399 offset: Offset for pagination
400 db: Optional database session
402 Returns:
403 List of AuditTrail entries
404 """
405 close_db = False
406 if db is None: 406 ↛ 410line 406 didn't jump to line 410 because the condition on line 406 was always true
407 db = SessionLocal()
408 close_db = True
410 try:
411 query = select(AuditTrail)
413 if resource_type: 413 ↛ 415line 413 didn't jump to line 415 because the condition on line 413 was always true
414 query = query.where(AuditTrail.resource_type == resource_type)
415 if resource_id:
416 query = query.where(AuditTrail.resource_id == resource_id)
417 if user_id:
418 query = query.where(AuditTrail.user_id == user_id)
419 if action:
420 query = query.where(AuditTrail.action == action)
421 if start_time:
422 query = query.where(AuditTrail.timestamp >= start_time)
423 if end_time:
424 query = query.where(AuditTrail.timestamp <= end_time)
426 query = query.order_by(AuditTrail.timestamp.desc())
427 query = query.limit(limit).offset(offset)
429 result = db.execute(query)
430 return list(result.scalars().all())
432 finally:
433 if close_db:
434 db.commit() # End read-only transaction cleanly
435 db.close()
438# Singleton instance
439_audit_trail_service: Optional[AuditTrailService] = None
442def get_audit_trail_service() -> AuditTrailService:
443 """Get or create the singleton audit trail service instance.
445 Returns:
446 AuditTrailService instance
447 """
448 global _audit_trail_service # pylint: disable=global-statement
449 if _audit_trail_service is None:
450 _audit_trail_service = AuditTrailService()
451 return _audit_trail_service