Coverage for mcpgateway / services / audit_trail_service.py: 97%

124 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/audit_trail_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5 

6Audit Trail Service. 

7 

8This module provides audit trail management for CRUD operations, 

9data access tracking, and compliance logging. 

10""" 

11 

12# Standard 

13from datetime import datetime, timezone 

14from enum import Enum 

15import logging 

16from typing import Any, Dict, Optional 

17 

18# Third-Party 

19from sqlalchemy import select 

20from sqlalchemy.orm import Session 

21 

22# First-Party 

23from mcpgateway.config import settings 

24from mcpgateway.db import AuditTrail, SessionLocal 

25from mcpgateway.utils.correlation_id import get_or_generate_correlation_id 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30class AuditAction(str, Enum): 

31 """Audit trail action types.""" 

32 

33 CREATE = "CREATE" 

34 READ = "READ" 

35 UPDATE = "UPDATE" 

36 DELETE = "DELETE" 

37 EXECUTE = "EXECUTE" 

38 ACCESS = "ACCESS" 

39 EXPORT = "EXPORT" 

40 IMPORT = "IMPORT" 

41 

42 

43class DataClassification(str, Enum): 

44 """Data classification levels.""" 

45 

46 PUBLIC = "public" 

47 INTERNAL = "internal" 

48 CONFIDENTIAL = "confidential" 

49 RESTRICTED = "restricted" 

50 

51 

52REVIEW_REQUIRED_ACTIONS = { 

53 "delete_server", 

54 "delete_tool", 

55 "delete_resource", 

56 "delete_gateway", 

57 "update_sensitive_config", 

58 "bulk_delete", 

59} 

60 

61 

62class AuditTrailService: 

63 """Service for managing audit trails and compliance logging. 

64 

65 Provides comprehensive audit trail management with data classification, 

66 change tracking, and compliance reporting capabilities. 

67 """ 

68 

69 def __init__(self): 

70 """Initialize audit trail service.""" 

71 

72 def log_action( # pylint: disable=too-many-positional-arguments 

73 self, 

74 action: str, 

75 resource_type: str, 

76 resource_id: str, 

77 user_id: str, 

78 user_email: Optional[str] = None, 

79 team_id: Optional[str] = None, 

80 resource_name: Optional[str] = None, 

81 client_ip: Optional[str] = None, 

82 user_agent: Optional[str] = None, 

83 request_path: Optional[str] = None, 

84 request_method: Optional[str] = None, 

85 old_values: Optional[Dict[str, Any]] = None, 

86 new_values: Optional[Dict[str, Any]] = None, 

87 changes: Optional[Dict[str, Any]] = None, 

88 data_classification: Optional[str] = None, 

89 requires_review: Optional[bool] = None, 

90 success: bool = True, 

91 error_message: Optional[str] = None, 

92 context: Optional[Dict[str, Any]] = None, 

93 details: Optional[Dict[str, Any]] = None, 

94 metadata: Optional[Dict[str, Any]] = None, 

95 db: Optional[Session] = None, 

96 ) -> Optional[AuditTrail]: 

97 """Log an audit trail entry. 

98 

99 Args: 

100 action: Action performed (CREATE, READ, UPDATE, DELETE, etc.) 

101 resource_type: Type of resource (tool, server, prompt, etc.) 

102 resource_id: ID of the resource 

103 user_id: User who performed the action 

104 user_email: User's email address 

105 team_id: Team ID if applicable 

106 resource_name: Name of the resource 

107 client_ip: Client IP address 

108 user_agent: Client user agent 

109 request_path: HTTP request path 

110 request_method: HTTP request method 

111 old_values: Previous values before change 

112 new_values: New values after change 

113 changes: Specific changes made 

114 data_classification: Data classification level 

115 requires_review: Whether this action requires review (None = auto) 

116 success: Whether the action succeeded 

117 error_message: Error message if failed 

118 context: Additional context 

119 details: Extra key/value payload (stored under context.details) 

120 metadata: Extra metadata payload (stored under context.metadata) 

121 db: Optional database session 

122 

123 Returns: 

124 Created AuditTrail entry or None if logging disabled 

125 """ 

126 # Check if audit trail logging is enabled 

127 if not settings.audit_trail_enabled: 

128 return None 

129 

130 correlation_id = get_or_generate_correlation_id() 

131 

132 # Use provided session or create new one 

133 close_db = False 

134 if db is None: 134 ↛ 138line 134 didn't jump to line 138 because the condition on line 134 was always true

135 db = SessionLocal() 

136 close_db = True 

137 

138 try: 

139 context_payload: Dict[str, Any] = dict(context) if context else {} 

140 if details: 

141 context_payload["details"] = details 

142 if metadata: 

143 context_payload["metadata"] = metadata 

144 context_value = context_payload if context_payload else None 

145 

146 requires_review_flag = self._determine_requires_review( 

147 action=action, 

148 data_classification=data_classification, 

149 requires_review_param=requires_review, 

150 ) 

151 

152 # Create audit trail entry 

153 audit_entry = AuditTrail( 

154 timestamp=datetime.now(timezone.utc), 

155 correlation_id=correlation_id, 

156 action=action, 

157 resource_type=resource_type, 

158 resource_id=resource_id, 

159 resource_name=resource_name, 

160 user_id=user_id, 

161 user_email=user_email, 

162 team_id=team_id, 

163 client_ip=client_ip, 

164 user_agent=user_agent, 

165 request_path=request_path, 

166 request_method=request_method, 

167 old_values=old_values, 

168 new_values=new_values, 

169 changes=changes, 

170 data_classification=data_classification, 

171 requires_review=requires_review_flag, 

172 success=success, 

173 error_message=error_message, 

174 context=context_value, 

175 ) 

176 

177 db.add(audit_entry) 

178 db.commit() 

179 db.refresh(audit_entry) 

180 

181 logger.debug( 

182 f"Audit trail logged: {action} {resource_type}/{resource_id} by {user_id}", 

183 extra={"correlation_id": correlation_id, "action": action, "resource_type": resource_type, "resource_id": resource_id, "user_id": user_id, "success": success}, 

184 ) 

185 

186 return audit_entry 

187 

188 except Exception as e: 

189 logger.error(f"Failed to log audit trail: {e}", exc_info=True, extra={"correlation_id": correlation_id, "action": action, "resource_type": resource_type, "resource_id": resource_id}) 

190 if close_db: 190 ↛ 192line 190 didn't jump to line 192 because the condition on line 190 was always true

191 db.rollback() 

192 return None 

193 

194 finally: 

195 if close_db: 

196 db.close() 

197 

198 def _determine_requires_review( 

199 self, 

200 action: Optional[str], 

201 data_classification: Optional[str], 

202 requires_review_param: Optional[bool], 

203 ) -> bool: 

204 """Resolve whether an audit entry should require review. 

205 

206 Args: 

207 action: Action being performed 

208 data_classification: Data classification level 

209 requires_review_param: Explicit review requirement 

210 

211 Returns: 

212 bool: Whether the audit entry requires review 

213 """ 

214 if requires_review_param is not None: 

215 return requires_review_param 

216 

217 if data_classification in {DataClassification.CONFIDENTIAL.value, DataClassification.RESTRICTED.value}: 

218 return True 

219 

220 normalized_action = (action or "").lower() 

221 if normalized_action in REVIEW_REQUIRED_ACTIONS: 

222 return True 

223 

224 return False 

225 

226 def log_crud_operation( 

227 self, 

228 operation: str, 

229 resource_type: str, 

230 resource_id: str, 

231 user_id: str, 

232 user_email: Optional[str] = None, 

233 team_id: Optional[str] = None, 

234 resource_name: Optional[str] = None, 

235 old_values: Optional[Dict[str, Any]] = None, 

236 new_values: Optional[Dict[str, Any]] = None, 

237 success: bool = True, 

238 error_message: Optional[str] = None, 

239 db: Optional[Session] = None, 

240 **kwargs, 

241 ) -> Optional[AuditTrail]: 

242 """Log a CRUD operation with change tracking. 

243 

244 Args: 

245 operation: CRUD operation (CREATE, READ, UPDATE, DELETE) 

246 resource_type: Type of resource 

247 resource_id: ID of the resource 

248 user_id: User who performed the operation 

249 user_email: User's email 

250 team_id: Team ID if applicable 

251 resource_name: Name of the resource 

252 old_values: Previous values (for UPDATE/DELETE) 

253 new_values: New values (for CREATE/UPDATE) 

254 success: Whether the operation succeeded 

255 error_message: Error message if failed 

256 db: Optional database session 

257 **kwargs: Additional arguments passed to log_action 

258 

259 Returns: 

260 Created AuditTrail entry 

261 """ 

262 # Calculate changes for UPDATE operations 

263 changes = None 

264 if operation == "UPDATE" and old_values and new_values: 

265 changes = {} 

266 for key in set(old_values.keys()) | set(new_values.keys()): 

267 old_val = old_values.get(key) 

268 new_val = new_values.get(key) 

269 if old_val != new_val: 

270 changes[key] = {"old": old_val, "new": new_val} 

271 

272 # Determine data classification based on resource type 

273 data_classification = None 

274 if resource_type in ["user", "team", "token", "credential"]: 

275 data_classification = DataClassification.CONFIDENTIAL.value 

276 elif resource_type in ["tool", "server", "prompt", "resource"]: 276 ↛ 280line 276 didn't jump to line 280 because the condition on line 276 was always true

277 data_classification = DataClassification.INTERNAL.value 

278 

279 # Determine if review is required 

280 requires_review = False 

281 if data_classification == DataClassification.CONFIDENTIAL.value: 

282 requires_review = True 

283 if operation == "DELETE" and resource_type in ["tool", "server", "gateway"]: 

284 requires_review = True 

285 

286 return self.log_action( 

287 action=operation, 

288 resource_type=resource_type, 

289 resource_id=resource_id, 

290 user_id=user_id, 

291 user_email=user_email, 

292 team_id=team_id, 

293 resource_name=resource_name, 

294 old_values=old_values, 

295 new_values=new_values, 

296 changes=changes, 

297 data_classification=data_classification, 

298 requires_review=requires_review, 

299 success=success, 

300 error_message=error_message, 

301 db=db, 

302 **kwargs, 

303 ) 

304 

305 def log_data_access( 

306 self, 

307 resource_type: str, 

308 resource_id: str, 

309 user_id: str, 

310 access_type: str = "READ", 

311 user_email: Optional[str] = None, 

312 team_id: Optional[str] = None, 

313 resource_name: Optional[str] = None, 

314 data_classification: Optional[str] = None, 

315 db: Optional[Session] = None, 

316 **kwargs, 

317 ) -> Optional[AuditTrail]: 

318 """Log data access for compliance tracking. 

319 

320 Args: 

321 resource_type: Type of resource accessed 

322 resource_id: ID of the resource 

323 user_id: User who accessed the data 

324 access_type: Type of access (READ, EXPORT, etc.) 

325 user_email: User's email 

326 team_id: Team ID if applicable 

327 resource_name: Name of the resource 

328 data_classification: Data classification level 

329 db: Optional database session 

330 **kwargs: Additional arguments passed to log_action 

331 

332 Returns: 

333 Created AuditTrail entry 

334 """ 

335 requires_review = data_classification in [DataClassification.CONFIDENTIAL.value, DataClassification.RESTRICTED.value] 

336 

337 return self.log_action( 

338 action=access_type, 

339 resource_type=resource_type, 

340 resource_id=resource_id, 

341 user_id=user_id, 

342 user_email=user_email, 

343 team_id=team_id, 

344 resource_name=resource_name, 

345 data_classification=data_classification, 

346 requires_review=requires_review, 

347 success=True, 

348 db=db, 

349 **kwargs, 

350 ) 

351 

352 def log_audit( 

353 self, user_id: str, resource_type: str, resource_id: str, action: str, user_email: Optional[str] = None, description: Optional[str] = None, db: Optional[Session] = None, **kwargs 

354 ) -> Optional[AuditTrail]: 

355 """Convenience method for simple audit logging. 

356 

357 Args: 

358 user_id: User who performed the action 

359 resource_type: Type of resource 

360 resource_id: ID of the resource 

361 action: Action performed 

362 user_email: User's email 

363 description: Description of the action 

364 db: Optional database session 

365 **kwargs: Additional arguments passed to log_action 

366 

367 Returns: 

368 Created AuditTrail entry 

369 """ 

370 # Build context if description provided 

371 context = kwargs.pop("context", {}) 

372 if description: 372 ↛ 375line 372 didn't jump to line 375 because the condition on line 372 was always true

373 context["description"] = description 

374 

375 return self.log_action(action=action, resource_type=resource_type, resource_id=resource_id, user_id=user_id, user_email=user_email, context=context if context else None, db=db, **kwargs) 

376 

377 def get_audit_trail( 

378 self, 

379 resource_type: Optional[str] = None, 

380 resource_id: Optional[str] = None, 

381 user_id: Optional[str] = None, 

382 action: Optional[str] = None, 

383 start_time: Optional[datetime] = None, 

384 end_time: Optional[datetime] = None, 

385 limit: int = 100, 

386 offset: int = 0, 

387 db: Optional[Session] = None, 

388 ) -> list[AuditTrail]: 

389 """Query audit trail entries. 

390 

391 Args: 

392 resource_type: Filter by resource type 

393 resource_id: Filter by resource ID 

394 user_id: Filter by user ID 

395 action: Filter by action 

396 start_time: Filter by start time 

397 end_time: Filter by end time 

398 limit: Maximum number of results 

399 offset: Offset for pagination 

400 db: Optional database session 

401 

402 Returns: 

403 List of AuditTrail entries 

404 """ 

405 close_db = False 

406 if db is None: 406 ↛ 410line 406 didn't jump to line 410 because the condition on line 406 was always true

407 db = SessionLocal() 

408 close_db = True 

409 

410 try: 

411 query = select(AuditTrail) 

412 

413 if resource_type: 413 ↛ 415line 413 didn't jump to line 415 because the condition on line 413 was always true

414 query = query.where(AuditTrail.resource_type == resource_type) 

415 if resource_id: 

416 query = query.where(AuditTrail.resource_id == resource_id) 

417 if user_id: 

418 query = query.where(AuditTrail.user_id == user_id) 

419 if action: 

420 query = query.where(AuditTrail.action == action) 

421 if start_time: 

422 query = query.where(AuditTrail.timestamp >= start_time) 

423 if end_time: 

424 query = query.where(AuditTrail.timestamp <= end_time) 

425 

426 query = query.order_by(AuditTrail.timestamp.desc()) 

427 query = query.limit(limit).offset(offset) 

428 

429 result = db.execute(query) 

430 return list(result.scalars().all()) 

431 

432 finally: 

433 if close_db: 

434 db.commit() # End read-only transaction cleanly 

435 db.close() 

436 

437 

438# Singleton instance 

439_audit_trail_service: Optional[AuditTrailService] = None 

440 

441 

442def get_audit_trail_service() -> AuditTrailService: 

443 """Get or create the singleton audit trail service instance. 

444 

445 Returns: 

446 AuditTrailService instance 

447 """ 

448 global _audit_trail_service # pylint: disable=global-statement 

449 if _audit_trail_service is None: 

450 _audit_trail_service = AuditTrailService() 

451 return _audit_trail_service