Coverage for mcpgateway / utils / metadata_capture.py: 100%
55 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/utils/metadata_capture.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Metadata capture utilities for comprehensive audit tracking.
8This module provides utilities for capturing comprehensive metadata during
9entity creation and modification operations. It extracts request context
10information such as authenticated user, IP address, user agent, and source
11type for audit trail purposes.
13Examples:
14 >>> from mcpgateway.utils.metadata_capture import MetadataCapture
15 >>> from types import SimpleNamespace
16 >>> # Create mock request for testing
17 >>> request = SimpleNamespace()
18 >>> request.client = SimpleNamespace()
19 >>> request.client.host = "192.168.1.1"
20 >>> request.headers = {"user-agent": "test/1.0"}
21 >>> request.url = SimpleNamespace()
22 >>> request.url.path = "/admin/tools"
23 >>> # Metadata capture during entity creation
24 >>> metadata = MetadataCapture.extract_creation_metadata(request, user="admin")
25 >>> metadata["created_by"]
26 'admin'
27 >>> metadata["created_via"]
28 'ui'
29"""
31# Standard
32from typing import Dict, Optional
34# Third-Party
35from fastapi import Request
38class MetadataCapture:
39 """Utilities for capturing comprehensive metadata during entity operations."""
41 @staticmethod
42 def extract_request_context(request: Request) -> Dict[str, Optional[str]]:
43 """Extract basic request context information.
45 Args:
46 request: FastAPI request object
48 Returns:
49 Dict containing IP address, user agent, and source type
51 Examples:
52 >>> # Mock request for testing
53 >>> from types import SimpleNamespace
54 >>> mock_request = SimpleNamespace()
55 >>> mock_request.client = SimpleNamespace()
56 >>> mock_request.client.host = "192.168.1.100"
57 >>> mock_request.headers = {"user-agent": "Mozilla/5.0"}
58 >>> mock_request.url = SimpleNamespace()
59 >>> mock_request.url.path = "/admin/tools"
60 >>> context = MetadataCapture.extract_request_context(mock_request)
61 >>> context["from_ip"]
62 '192.168.1.100'
63 >>> context["via"]
64 'ui'
65 """
66 # Extract IP address (handle various proxy scenarios)
67 client_ip = None
68 if request.client:
69 client_ip = request.client.host
71 # Check for forwarded headers (reverse proxy support)
72 forwarded_for = request.headers.get("x-forwarded-for")
73 if forwarded_for:
74 # Take the first IP in the chain (original client)
75 client_ip = forwarded_for.split(",")[0].strip()
77 # Extract user agent
78 user_agent = request.headers.get("user-agent")
80 # Determine source type based on URL path
81 via = "api" # default
82 if hasattr(request, "url") and hasattr(request.url, "path"):
83 path = str(request.url.path)
84 if "/admin/" in path:
85 via = "ui"
87 return {
88 "from_ip": client_ip,
89 "user_agent": user_agent,
90 "via": via,
91 }
93 @staticmethod
94 def extract_username(user) -> str:
95 """Extract username from auth response.
97 Args:
98 user: Response from require_auth - can be string or dict
100 Returns:
101 Username string
103 Examples:
104 >>> MetadataCapture.extract_username("admin")
105 'admin'
106 >>> MetadataCapture.extract_username({"username": "alice", "exp": 123})
107 'alice'
108 >>> MetadataCapture.extract_username({"sub": "bob", "exp": 123})
109 'bob'
110 >>> MetadataCapture.extract_username({"email": "user@example.com", "full_name": "User"})
111 'user@example.com'
112 """
113 if isinstance(user, str):
114 return user
115 elif isinstance(user, dict):
116 # Try to extract username from JWT payload or user context
117 return user.get("username") or user.get("sub") or user.get("email") or "unknown"
118 else:
119 return "unknown"
121 @staticmethod
122 def extract_creation_metadata(
123 request: Request,
124 user, # Can be str or dict from require_auth
125 import_batch_id: Optional[str] = None,
126 federation_source: Optional[str] = None,
127 ) -> Dict[str, Optional[str]]:
128 """Extract complete metadata for entity creation.
130 Args:
131 request: FastAPI request object
132 user: Authenticated user (string username or dict JWT payload)
133 import_batch_id: Optional UUID for bulk import operations
134 federation_source: Optional source gateway for federated entities
136 Returns:
137 Dict containing all creation metadata fields
139 Examples:
140 >>> from types import SimpleNamespace
141 >>> mock_request = SimpleNamespace()
142 >>> mock_request.client = SimpleNamespace()
143 >>> mock_request.client.host = "10.0.0.1"
144 >>> mock_request.headers = {"user-agent": "curl/7.68.0"}
145 >>> mock_request.url = SimpleNamespace()
146 >>> mock_request.url.path = "/tools"
147 >>> metadata = MetadataCapture.extract_creation_metadata(mock_request, "admin")
148 >>> metadata["created_by"]
149 'admin'
150 >>> metadata["created_via"]
151 'api'
152 >>> metadata["created_from_ip"]
153 '10.0.0.1'
154 """
155 context = MetadataCapture.extract_request_context(request)
157 return {
158 "created_by": MetadataCapture.extract_username(user),
159 "created_from_ip": context["from_ip"],
160 "created_via": context["via"],
161 "created_user_agent": context["user_agent"],
162 "import_batch_id": import_batch_id,
163 "federation_source": federation_source,
164 "version": 1,
165 }
167 @staticmethod
168 def extract_modification_metadata(
169 request: Request,
170 user, # Can be str or dict from require_auth
171 current_version: int = 1,
172 ) -> Dict[str, Optional[str]]:
173 """Extract metadata for entity modification.
175 Args:
176 request: FastAPI request object
177 user: Authenticated user (string username or dict JWT payload)
178 current_version: Current entity version (will be incremented)
180 Returns:
181 Dict containing modification metadata fields
183 Examples:
184 >>> from types import SimpleNamespace
185 >>> mock_request = SimpleNamespace()
186 >>> mock_request.client = SimpleNamespace()
187 >>> mock_request.client.host = "172.16.0.1"
188 >>> mock_request.headers = {"user-agent": "HTTPie/2.4.0"}
189 >>> mock_request.url = SimpleNamespace()
190 >>> mock_request.url.path = "/admin/tools/123/edit"
191 >>> metadata = MetadataCapture.extract_modification_metadata(mock_request, "alice", 2)
192 >>> metadata["modified_by"]
193 'alice'
194 >>> metadata["modified_via"]
195 'ui'
196 >>> metadata["version"]
197 3
198 """
199 context = MetadataCapture.extract_request_context(request)
201 return {
202 "modified_by": MetadataCapture.extract_username(user),
203 "modified_from_ip": context["from_ip"],
204 "modified_via": context["via"],
205 "modified_user_agent": context["user_agent"],
206 "version": current_version + 1,
207 }
209 @staticmethod
210 def determine_source_from_context(
211 import_batch_id: Optional[str] = None,
212 federation_source: Optional[str] = None,
213 via: str = "api",
214 ) -> str:
215 """Determine the source type based on available context.
217 Args:
218 import_batch_id: UUID for bulk import operations
219 federation_source: Source gateway for federated entities
220 via: Basic source type (api, ui)
222 Returns:
223 More specific source description
225 Examples:
226 >>> MetadataCapture.determine_source_from_context(via="ui")
227 'ui'
228 >>> MetadataCapture.determine_source_from_context(import_batch_id="123", via="api")
229 'import'
230 >>> MetadataCapture.determine_source_from_context(federation_source="gateway-1", via="api")
231 'federation'
232 """
233 if import_batch_id:
234 return "import"
235 elif federation_source:
236 return "federation"
237 else:
238 return via
240 @staticmethod
241 def sanitize_user_agent(user_agent: Optional[str]) -> Optional[str]:
242 """Sanitize user agent string for safe storage and display.
244 Args:
245 user_agent: Raw user agent string from request headers
247 Returns:
248 Sanitized user agent string or None
250 Examples:
251 >>> MetadataCapture.sanitize_user_agent("Mozilla/5.0 (Linux)")
252 'Mozilla/5.0 (Linux)'
253 >>> MetadataCapture.sanitize_user_agent(None)
254 >>> len(MetadataCapture.sanitize_user_agent("x" * 2000)) <= 503
255 True
256 """
257 if not user_agent:
258 return None
260 # Truncate excessively long user agents
261 if len(user_agent) > 500:
262 user_agent = user_agent[:500] + "..."
264 # Remove any potentially dangerous characters
265 user_agent = user_agent.replace("\n", " ").replace("\r", " ").replace("\t", " ")
267 return user_agent.strip()
269 @staticmethod
270 def validate_ip_address(ip_address: Optional[str]) -> Optional[str]:
271 """Validate and sanitize IP address for storage.
273 Args:
274 ip_address: IP address string from request
276 Returns:
277 Validated IP address or None
279 Examples:
280 >>> MetadataCapture.validate_ip_address("192.168.1.1")
281 '192.168.1.1'
282 >>> MetadataCapture.validate_ip_address("::1")
283 '::1'
284 >>> MetadataCapture.validate_ip_address(None)
285 >>> MetadataCapture.validate_ip_address("invalid-ip")
286 'invalid-ip'
287 """
288 if not ip_address:
289 return None
291 # Basic validation - store as-is but limit length
292 if len(ip_address) > 45: # Max length for IPv6
293 return ip_address[:45]
295 return ip_address.strip()