Coverage for mcpgateway / services / argon2_service.py: 98%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/argon2_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Argon2id Password Hashing Service. 

8This module provides secure password hashing and verification using Argon2id, 

9the winner of the Password Hashing Competition and recommended by OWASP. 

10 

11Examples: 

12 >>> from mcpgateway.services.argon2_service import Argon2PasswordService 

13 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing 

14 >>> hash = service.hash_password("test123") 

15 >>> service.verify_password("test123", hash) 

16 True 

17 >>> service.verify_password("wrong", hash) 

18 False 

19""" 

20 

21# Standard 

22import asyncio 

23from typing import Optional 

24 

25# Third-Party 

26from argon2 import PasswordHasher 

27from argon2.exceptions import HashingError, InvalidHash, VerifyMismatchError 

28 

29# First-Party 

30from mcpgateway.config import settings 

31from mcpgateway.services.logging_service import LoggingService 

32 

33# Initialize logging 

34logging_service = LoggingService() 

35logger = logging_service.get_logger(__name__) 

36 

37 

38class Argon2PasswordService: 

39 """Service for Argon2id password hashing and verification. 

40 

41 This service provides secure password hashing using Argon2id with 

42 configurable parameters for time cost, memory cost, and parallelism. 

43 It follows OWASP recommendations for password storage. 

44 

45 Attributes: 

46 hasher (PasswordHasher): Configured Argon2 password hasher 

47 

48 Examples: 

49 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing 

50 >>> password = "secure_password_123" 

51 >>> hash_value = service.hash_password(password) 

52 >>> service.verify_password(password, hash_value) 

53 True 

54 >>> service.verify_password("wrong_password", hash_value) 

55 False 

56 """ 

57 

58 def __init__(self, time_cost: Optional[int] = None, memory_cost: Optional[int] = None, parallelism: Optional[int] = None, hash_len: int = 32, salt_len: int = 16): 

59 """Initialize the Argon2 password service. 

60 

61 Args: 

62 time_cost: Number of iterations (default from settings) 

63 memory_cost: Memory usage in KiB (default from settings) 

64 parallelism: Number of threads (default from settings) 

65 hash_len: Length of the hash in bytes 

66 salt_len: Length of the salt in bytes 

67 

68 Examples: 

69 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing 

70 >>> isinstance(service.hasher, PasswordHasher) 

71 True 

72 >>> custom_service = Argon2PasswordService(time_cost=1, memory_cost=2048) 

73 >>> isinstance(custom_service.hasher, PasswordHasher) 

74 True 

75 """ 

76 # Use settings values or provided defaults 

77 self.time_cost = time_cost or getattr(settings, "argon2id_time_cost", 3) 

78 self.memory_cost = memory_cost or getattr(settings, "argon2id_memory_cost", 65536) 

79 self.parallelism = parallelism or getattr(settings, "argon2id_parallelism", 1) 

80 

81 # Initialize Argon2 password hasher with configured parameters 

82 self.hasher = PasswordHasher(time_cost=self.time_cost, memory_cost=self.memory_cost, parallelism=self.parallelism, hash_len=hash_len, salt_len=salt_len) 

83 

84 logger.info(f"Initialized Argon2PasswordService with time_cost={self.time_cost}, memory_cost={self.memory_cost}, parallelism={self.parallelism}") 

85 

86 def hash_password(self, password: str) -> str: 

87 """Hash a password using Argon2id. 

88 

89 Args: 

90 password: The plain text password to hash 

91 

92 Returns: 

93 str: The Argon2id hash string 

94 

95 Raises: 

96 ValueError: If password is empty or None 

97 HashingError: If hashing fails 

98 

99 Examples: 

100 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing 

101 >>> hash_value = service.hash_password("test123") 

102 >>> hash_value.startswith("$argon2id$") 

103 True 

104 >>> len(hash_value) > 50 

105 True 

106 >>> service.hash_password("test123") != service.hash_password("test123") 

107 True 

108 """ 

109 if not password: 

110 raise ValueError("Password cannot be empty or None") 

111 

112 try: 

113 hash_value = self.hasher.hash(password) 

114 logger.debug("Successfully hashed password for user authentication") 

115 return hash_value 

116 except HashingError as e: 

117 logger.error(f"Failed to hash password: {e}") 

118 raise HashingError(f"Password hashing failed: {e}") from e 

119 

120 def verify_password(self, password: str, hash_value: str) -> bool: 

121 """Verify a password against its Argon2id hash. 

122 

123 Args: 

124 password: The plain text password to verify 

125 hash_value: The stored Argon2id hash 

126 

127 Returns: 

128 bool: True if password matches hash, False otherwise 

129 

130 Examples: 

131 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing 

132 >>> hash_val = service.hash_password("correct_password") 

133 >>> service.verify_password("correct_password", hash_val) 

134 True 

135 >>> service.verify_password("wrong_password", hash_val) 

136 False 

137 >>> service.verify_password("", hash_val) 

138 False 

139 """ 

140 if not password or not hash_value: 

141 return False 

142 

143 try: 

144 # verify() raises VerifyMismatchError if password doesn't match 

145 self.hasher.verify(hash_value, password) 

146 logger.debug("Password verification successful") 

147 return True 

148 except VerifyMismatchError: 

149 logger.debug("Password verification failed - password mismatch") 

150 return False 

151 except (InvalidHash, ValueError) as e: 

152 logger.warning(f"Invalid hash format during verification: {e}") 

153 return False 

154 except Exception as e: 

155 logger.error(f"Unexpected error during password verification: {e}") 

156 return False 

157 

158 async def hash_password_async(self, password: str) -> str: 

159 """Hash a password using Argon2id in a separate thread. 

160 

161 Args: 

162 password: The plain text password to hash 

163 

164 Returns: 

165 str: The Argon2id hash string 

166 """ 

167 return await asyncio.to_thread(self.hash_password, password) 

168 

169 async def verify_password_async(self, password: str, hash_value: str) -> bool: 

170 """Verify a password against its Argon2id hash in a separate thread. 

171 

172 Args: 

173 password: The plain text password to verify 

174 hash_value: The stored Argon2id hash 

175 

176 Returns: 

177 bool: True if password matches hash, False otherwise 

178 """ 

179 return await asyncio.to_thread(self.verify_password, password, hash_value) 

180 

181 def needs_rehash(self, hash_value: str) -> bool: 

182 """Check if a hash needs to be rehashed due to parameter changes. 

183 

184 This is useful for gradually updating password hashes when you 

185 change Argon2 parameters (e.g., increasing time_cost for security). 

186 

187 Args: 

188 hash_value: The stored Argon2id hash to check 

189 

190 Returns: 

191 bool: True if hash should be updated, False otherwise 

192 

193 Examples: 

194 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing 

195 >>> hash_val = service.hash_password("test") 

196 >>> service.needs_rehash(hash_val) 

197 False 

198 >>> service_new = Argon2PasswordService(time_cost=2, memory_cost=1024) 

199 >>> service_new.needs_rehash(hash_val) 

200 True 

201 """ 

202 if not hash_value: 

203 return True 

204 

205 try: 

206 return self.hasher.check_needs_rehash(hash_value) 

207 except (InvalidHash, ValueError) as e: 

208 logger.warning(f"Invalid hash format when checking rehash need: {e}") 

209 return True 

210 except Exception as e: 

211 logger.error(f"Unexpected error checking rehash need: {e}") 

212 return True 

213 

214 def get_hash_info(self, hash_value: str) -> Optional[dict]: 

215 """Extract information from an Argon2 hash. 

216 

217 Args: 

218 hash_value: The Argon2id hash to analyze 

219 

220 Returns: 

221 dict: Hash parameters or None if invalid 

222 

223 Examples: 

224 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing 

225 >>> hash_val = service.hash_password("test") 

226 >>> info = service.get_hash_info(hash_val) 

227 >>> info is not None 

228 True 

229 >>> 'time_cost' in info 

230 True 

231 >>> 'memory_cost' in info 

232 True 

233 """ 

234 if not hash_value: 

235 return None 

236 

237 try: 

238 # Parse the hash to extract parameters 

239 # Argon2 hash format: $argon2id$v=19$m=65536,t=3,p=1$salt$hash 

240 parts = hash_value.split("$") 

241 if len(parts) < 4 or parts[1] != "argon2id": 

242 return None 

243 

244 params_part = parts[3] # m=65536,t=3,p=1 

245 params = {} 

246 

247 for param in params_part.split(","): 

248 key, value = param.split("=") 

249 if key == "m": 

250 params["memory_cost"] = int(value) 

251 elif key == "t": 

252 params["time_cost"] = int(value) 

253 elif key == "p": 253 ↛ 247line 253 didn't jump to line 247 because the condition on line 253 was always true

254 params["parallelism"] = int(value) 

255 

256 params["variant"] = "argon2id" 

257 if len(parts) > 2: 257 ↛ 260line 257 didn't jump to line 260 because the condition on line 257 was always true

258 params["version"] = parts[2] 

259 

260 return params 

261 except (ValueError, IndexError) as e: 

262 logger.warning(f"Failed to parse Argon2 hash info: {e}") 

263 return None 

264 

265 def __repr__(self) -> str: 

266 """String representation of the service. 

267 

268 Returns: 

269 str: String representation of Argon2PasswordService instance 

270 """ 

271 return f"Argon2PasswordService(time_cost={self.time_cost}, memory_cost={self.memory_cost}, parallelism={self.parallelism})" 

272 

273 

274# Global instance for use throughout the application 

275password_service = Argon2PasswordService() 

276 

277 

278def hash_password(password: str) -> str: 

279 """Hash a password using the global Argon2 service. 

280 

281 Convenience function for password hashing. 

282 

283 Args: 

284 password: The password to hash 

285 

286 Returns: 

287 str: The hashed password 

288 

289 Examples: 

290 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing 

291 >>> hash_val = service.hash_password("test123") 

292 >>> hash_val.startswith("$argon2id$") 

293 True 

294 """ 

295 return password_service.hash_password(password) 

296 

297 

298def verify_password(password: str, hash_value: str) -> bool: 

299 """Verify a password using the global Argon2 service. 

300 

301 Convenience function for password verification. 

302 

303 Args: 

304 password: The password to verify 

305 hash_value: The stored hash 

306 

307 Returns: 

308 bool: True if password matches 

309 

310 Examples: 

311 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing 

312 >>> hash_val = service.hash_password("test123") 

313 >>> service.verify_password("test123", hash_val) 

314 True 

315 >>> service.verify_password("wrong", hash_val) 

316 False 

317 """ 

318 return password_service.verify_password(password, hash_value) 

319 

320 

321async def hash_password_async(password: str) -> str: 

322 """Hash a password using the global Argon2 service asynchronously. 

323 

324 Convenience function for password hashing. 

325 

326 Args: 

327 password: The password to hash 

328 

329 Returns: 

330 str: The hashed password 

331 """ 

332 return await password_service.hash_password_async(password) 

333 

334 

335async def verify_password_async(password: str, hash_value: str) -> bool: 

336 """Verify a password using the global Argon2 service asynchronously. 

337 

338 Convenience function for password verification. 

339 

340 Args: 

341 password: The password to verify 

342 hash_value: The stored hash 

343 

344 Returns: 

345 bool: True if password matches 

346 """ 

347 return await password_service.verify_password_async(password, hash_value) 

348 

349 

350def needs_rehash(hash_value: str) -> bool: 

351 """Check if a hash needs rehashing using the global service. 

352 

353 Args: 

354 hash_value: The hash to check 

355 

356 Returns: 

357 bool: True if rehash is needed 

358 """ 

359 return password_service.needs_rehash(hash_value)