Coverage for mcpgateway / utils / metadata_capture.py: 100%

55 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/utils/metadata_capture.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Metadata capture utilities for comprehensive audit tracking. 

8This module provides utilities for capturing comprehensive metadata during 

9entity creation and modification operations. It extracts request context 

10information such as authenticated user, IP address, user agent, and source 

11type for audit trail purposes. 

12 

13Examples: 

14 >>> from mcpgateway.utils.metadata_capture import MetadataCapture 

15 >>> from types import SimpleNamespace 

16 >>> # Create mock request for testing 

17 >>> request = SimpleNamespace() 

18 >>> request.client = SimpleNamespace() 

19 >>> request.client.host = "192.168.1.1" 

20 >>> request.headers = {"user-agent": "test/1.0"} 

21 >>> request.url = SimpleNamespace() 

22 >>> request.url.path = "/admin/tools" 

23 >>> # Metadata capture during entity creation 

24 >>> metadata = MetadataCapture.extract_creation_metadata(request, user="admin") 

25 >>> metadata["created_by"] 

26 'admin' 

27 >>> metadata["created_via"] 

28 'ui' 

29""" 

30 

31# Standard 

32from typing import Dict, Optional 

33 

34# Third-Party 

35from fastapi import Request 

36 

37 

38class MetadataCapture: 

39 """Utilities for capturing comprehensive metadata during entity operations.""" 

40 

41 @staticmethod 

42 def extract_request_context(request: Request) -> Dict[str, Optional[str]]: 

43 """Extract basic request context information. 

44 

45 Args: 

46 request: FastAPI request object 

47 

48 Returns: 

49 Dict containing IP address, user agent, and source type 

50 

51 Examples: 

52 >>> # Mock request for testing 

53 >>> from types import SimpleNamespace 

54 >>> mock_request = SimpleNamespace() 

55 >>> mock_request.client = SimpleNamespace() 

56 >>> mock_request.client.host = "192.168.1.100" 

57 >>> mock_request.headers = {"user-agent": "Mozilla/5.0"} 

58 >>> mock_request.url = SimpleNamespace() 

59 >>> mock_request.url.path = "/admin/tools" 

60 >>> context = MetadataCapture.extract_request_context(mock_request) 

61 >>> context["from_ip"] 

62 '192.168.1.100' 

63 >>> context["via"] 

64 'ui' 

65 """ 

66 # Extract IP address (handle various proxy scenarios) 

67 client_ip = None 

68 if request.client: 

69 client_ip = request.client.host 

70 

71 # Check for forwarded headers (reverse proxy support) 

72 forwarded_for = request.headers.get("x-forwarded-for") 

73 if forwarded_for: 

74 # Take the first IP in the chain (original client) 

75 client_ip = forwarded_for.split(",")[0].strip() 

76 

77 # Extract user agent 

78 user_agent = request.headers.get("user-agent") 

79 

80 # Determine source type based on URL path 

81 via = "api" # default 

82 if hasattr(request, "url") and hasattr(request.url, "path"): 

83 path = str(request.url.path) 

84 if "/admin/" in path: 

85 via = "ui" 

86 

87 return { 

88 "from_ip": client_ip, 

89 "user_agent": user_agent, 

90 "via": via, 

91 } 

92 

93 @staticmethod 

94 def extract_username(user) -> str: 

95 """Extract username from auth response. 

96 

97 Args: 

98 user: Response from require_auth - can be string or dict 

99 

100 Returns: 

101 Username string 

102 

103 Examples: 

104 >>> MetadataCapture.extract_username("admin") 

105 'admin' 

106 >>> MetadataCapture.extract_username({"username": "alice", "exp": 123}) 

107 'alice' 

108 >>> MetadataCapture.extract_username({"sub": "bob", "exp": 123}) 

109 'bob' 

110 >>> MetadataCapture.extract_username({"email": "user@example.com", "full_name": "User"}) 

111 'user@example.com' 

112 """ 

113 if isinstance(user, str): 

114 return user 

115 elif isinstance(user, dict): 

116 # Try to extract username from JWT payload or user context 

117 return user.get("username") or user.get("sub") or user.get("email") or "unknown" 

118 else: 

119 return "unknown" 

120 

121 @staticmethod 

122 def extract_creation_metadata( 

123 request: Request, 

124 user, # Can be str or dict from require_auth 

125 import_batch_id: Optional[str] = None, 

126 federation_source: Optional[str] = None, 

127 ) -> Dict[str, Optional[str]]: 

128 """Extract complete metadata for entity creation. 

129 

130 Args: 

131 request: FastAPI request object 

132 user: Authenticated user (string username or dict JWT payload) 

133 import_batch_id: Optional UUID for bulk import operations 

134 federation_source: Optional source gateway for federated entities 

135 

136 Returns: 

137 Dict containing all creation metadata fields 

138 

139 Examples: 

140 >>> from types import SimpleNamespace 

141 >>> mock_request = SimpleNamespace() 

142 >>> mock_request.client = SimpleNamespace() 

143 >>> mock_request.client.host = "10.0.0.1" 

144 >>> mock_request.headers = {"user-agent": "curl/7.68.0"} 

145 >>> mock_request.url = SimpleNamespace() 

146 >>> mock_request.url.path = "/tools" 

147 >>> metadata = MetadataCapture.extract_creation_metadata(mock_request, "admin") 

148 >>> metadata["created_by"] 

149 'admin' 

150 >>> metadata["created_via"] 

151 'api' 

152 >>> metadata["created_from_ip"] 

153 '10.0.0.1' 

154 """ 

155 context = MetadataCapture.extract_request_context(request) 

156 

157 return { 

158 "created_by": MetadataCapture.extract_username(user), 

159 "created_from_ip": context["from_ip"], 

160 "created_via": context["via"], 

161 "created_user_agent": context["user_agent"], 

162 "import_batch_id": import_batch_id, 

163 "federation_source": federation_source, 

164 "version": 1, 

165 } 

166 

167 @staticmethod 

168 def extract_modification_metadata( 

169 request: Request, 

170 user, # Can be str or dict from require_auth 

171 current_version: int = 1, 

172 ) -> Dict[str, Optional[str]]: 

173 """Extract metadata for entity modification. 

174 

175 Args: 

176 request: FastAPI request object 

177 user: Authenticated user (string username or dict JWT payload) 

178 current_version: Current entity version (will be incremented) 

179 

180 Returns: 

181 Dict containing modification metadata fields 

182 

183 Examples: 

184 >>> from types import SimpleNamespace 

185 >>> mock_request = SimpleNamespace() 

186 >>> mock_request.client = SimpleNamespace() 

187 >>> mock_request.client.host = "172.16.0.1" 

188 >>> mock_request.headers = {"user-agent": "HTTPie/2.4.0"} 

189 >>> mock_request.url = SimpleNamespace() 

190 >>> mock_request.url.path = "/admin/tools/123/edit" 

191 >>> metadata = MetadataCapture.extract_modification_metadata(mock_request, "alice", 2) 

192 >>> metadata["modified_by"] 

193 'alice' 

194 >>> metadata["modified_via"] 

195 'ui' 

196 >>> metadata["version"] 

197 3 

198 """ 

199 context = MetadataCapture.extract_request_context(request) 

200 

201 return { 

202 "modified_by": MetadataCapture.extract_username(user), 

203 "modified_from_ip": context["from_ip"], 

204 "modified_via": context["via"], 

205 "modified_user_agent": context["user_agent"], 

206 "version": current_version + 1, 

207 } 

208 

209 @staticmethod 

210 def determine_source_from_context( 

211 import_batch_id: Optional[str] = None, 

212 federation_source: Optional[str] = None, 

213 via: str = "api", 

214 ) -> str: 

215 """Determine the source type based on available context. 

216 

217 Args: 

218 import_batch_id: UUID for bulk import operations 

219 federation_source: Source gateway for federated entities 

220 via: Basic source type (api, ui) 

221 

222 Returns: 

223 More specific source description 

224 

225 Examples: 

226 >>> MetadataCapture.determine_source_from_context(via="ui") 

227 'ui' 

228 >>> MetadataCapture.determine_source_from_context(import_batch_id="123", via="api") 

229 'import' 

230 >>> MetadataCapture.determine_source_from_context(federation_source="gateway-1", via="api") 

231 'federation' 

232 """ 

233 if import_batch_id: 

234 return "import" 

235 elif federation_source: 

236 return "federation" 

237 else: 

238 return via 

239 

240 @staticmethod 

241 def sanitize_user_agent(user_agent: Optional[str]) -> Optional[str]: 

242 """Sanitize user agent string for safe storage and display. 

243 

244 Args: 

245 user_agent: Raw user agent string from request headers 

246 

247 Returns: 

248 Sanitized user agent string or None 

249 

250 Examples: 

251 >>> MetadataCapture.sanitize_user_agent("Mozilla/5.0 (Linux)") 

252 'Mozilla/5.0 (Linux)' 

253 >>> MetadataCapture.sanitize_user_agent(None) 

254 >>> len(MetadataCapture.sanitize_user_agent("x" * 2000)) <= 503 

255 True 

256 """ 

257 if not user_agent: 

258 return None 

259 

260 # Truncate excessively long user agents 

261 if len(user_agent) > 500: 

262 user_agent = user_agent[:500] + "..." 

263 

264 # Remove any potentially dangerous characters 

265 user_agent = user_agent.replace("\n", " ").replace("\r", " ").replace("\t", " ") 

266 

267 return user_agent.strip() 

268 

269 @staticmethod 

270 def validate_ip_address(ip_address: Optional[str]) -> Optional[str]: 

271 """Validate and sanitize IP address for storage. 

272 

273 Args: 

274 ip_address: IP address string from request 

275 

276 Returns: 

277 Validated IP address or None 

278 

279 Examples: 

280 >>> MetadataCapture.validate_ip_address("192.168.1.1") 

281 '192.168.1.1' 

282 >>> MetadataCapture.validate_ip_address("::1") 

283 '::1' 

284 >>> MetadataCapture.validate_ip_address(None) 

285 >>> MetadataCapture.validate_ip_address("invalid-ip") 

286 'invalid-ip' 

287 """ 

288 if not ip_address: 

289 return None 

290 

291 # Basic validation - store as-is but limit length 

292 if len(ip_address) > 45: # Max length for IPv6 

293 return ip_address[:45] 

294 

295 return ip_address.strip()