Coverage for mcpgateway / services / argon2_service.py: 98%
91 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/argon2_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Argon2id Password Hashing Service.
8This module provides secure password hashing and verification using Argon2id,
9the winner of the Password Hashing Competition and recommended by OWASP.
11Examples:
12 >>> from mcpgateway.services.argon2_service import Argon2PasswordService
13 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing
14 >>> hash = service.hash_password("test123")
15 >>> service.verify_password("test123", hash)
16 True
17 >>> service.verify_password("wrong", hash)
18 False
19"""
21# Standard
22import asyncio
23from typing import Optional
25# Third-Party
26from argon2 import PasswordHasher
27from argon2.exceptions import HashingError, InvalidHash, VerifyMismatchError
29# First-Party
30from mcpgateway.config import settings
31from mcpgateway.services.logging_service import LoggingService
33# Initialize logging
34logging_service = LoggingService()
35logger = logging_service.get_logger(__name__)
38class Argon2PasswordService:
39 """Service for Argon2id password hashing and verification.
41 This service provides secure password hashing using Argon2id with
42 configurable parameters for time cost, memory cost, and parallelism.
43 It follows OWASP recommendations for password storage.
45 Attributes:
46 hasher (PasswordHasher): Configured Argon2 password hasher
48 Examples:
49 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing
50 >>> password = "secure_password_123"
51 >>> hash_value = service.hash_password(password)
52 >>> service.verify_password(password, hash_value)
53 True
54 >>> service.verify_password("wrong_password", hash_value)
55 False
56 """
58 def __init__(self, time_cost: Optional[int] = None, memory_cost: Optional[int] = None, parallelism: Optional[int] = None, hash_len: int = 32, salt_len: int = 16):
59 """Initialize the Argon2 password service.
61 Args:
62 time_cost: Number of iterations (default from settings)
63 memory_cost: Memory usage in KiB (default from settings)
64 parallelism: Number of threads (default from settings)
65 hash_len: Length of the hash in bytes
66 salt_len: Length of the salt in bytes
68 Examples:
69 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing
70 >>> isinstance(service.hasher, PasswordHasher)
71 True
72 >>> custom_service = Argon2PasswordService(time_cost=1, memory_cost=2048)
73 >>> isinstance(custom_service.hasher, PasswordHasher)
74 True
75 """
76 # Use settings values or provided defaults
77 self.time_cost = time_cost or getattr(settings, "argon2id_time_cost", 3)
78 self.memory_cost = memory_cost or getattr(settings, "argon2id_memory_cost", 65536)
79 self.parallelism = parallelism or getattr(settings, "argon2id_parallelism", 1)
81 # Initialize Argon2 password hasher with configured parameters
82 self.hasher = PasswordHasher(time_cost=self.time_cost, memory_cost=self.memory_cost, parallelism=self.parallelism, hash_len=hash_len, salt_len=salt_len)
84 logger.info(f"Initialized Argon2PasswordService with time_cost={self.time_cost}, memory_cost={self.memory_cost}, parallelism={self.parallelism}")
86 def hash_password(self, password: str) -> str:
87 """Hash a password using Argon2id.
89 Args:
90 password: The plain text password to hash
92 Returns:
93 str: The Argon2id hash string
95 Raises:
96 ValueError: If password is empty or None
97 HashingError: If hashing fails
99 Examples:
100 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing
101 >>> hash_value = service.hash_password("test123")
102 >>> hash_value.startswith("$argon2id$")
103 True
104 >>> len(hash_value) > 50
105 True
106 >>> service.hash_password("test123") != service.hash_password("test123")
107 True
108 """
109 if not password:
110 raise ValueError("Password cannot be empty or None")
112 try:
113 hash_value = self.hasher.hash(password)
114 logger.debug("Successfully hashed password for user authentication")
115 return hash_value
116 except HashingError as e:
117 logger.error(f"Failed to hash password: {e}")
118 raise HashingError(f"Password hashing failed: {e}") from e
120 def verify_password(self, password: str, hash_value: str) -> bool:
121 """Verify a password against its Argon2id hash.
123 Args:
124 password: The plain text password to verify
125 hash_value: The stored Argon2id hash
127 Returns:
128 bool: True if password matches hash, False otherwise
130 Examples:
131 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing
132 >>> hash_val = service.hash_password("correct_password")
133 >>> service.verify_password("correct_password", hash_val)
134 True
135 >>> service.verify_password("wrong_password", hash_val)
136 False
137 >>> service.verify_password("", hash_val)
138 False
139 """
140 if not password or not hash_value:
141 return False
143 try:
144 # verify() raises VerifyMismatchError if password doesn't match
145 self.hasher.verify(hash_value, password)
146 logger.debug("Password verification successful")
147 return True
148 except VerifyMismatchError:
149 logger.debug("Password verification failed - password mismatch")
150 return False
151 except (InvalidHash, ValueError) as e:
152 logger.warning(f"Invalid hash format during verification: {e}")
153 return False
154 except Exception as e:
155 logger.error(f"Unexpected error during password verification: {e}")
156 return False
158 async def hash_password_async(self, password: str) -> str:
159 """Hash a password using Argon2id in a separate thread.
161 Args:
162 password: The plain text password to hash
164 Returns:
165 str: The Argon2id hash string
166 """
167 return await asyncio.to_thread(self.hash_password, password)
169 async def verify_password_async(self, password: str, hash_value: str) -> bool:
170 """Verify a password against its Argon2id hash in a separate thread.
172 Args:
173 password: The plain text password to verify
174 hash_value: The stored Argon2id hash
176 Returns:
177 bool: True if password matches hash, False otherwise
178 """
179 return await asyncio.to_thread(self.verify_password, password, hash_value)
181 def needs_rehash(self, hash_value: str) -> bool:
182 """Check if a hash needs to be rehashed due to parameter changes.
184 This is useful for gradually updating password hashes when you
185 change Argon2 parameters (e.g., increasing time_cost for security).
187 Args:
188 hash_value: The stored Argon2id hash to check
190 Returns:
191 bool: True if hash should be updated, False otherwise
193 Examples:
194 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing
195 >>> hash_val = service.hash_password("test")
196 >>> service.needs_rehash(hash_val)
197 False
198 >>> service_new = Argon2PasswordService(time_cost=2, memory_cost=1024)
199 >>> service_new.needs_rehash(hash_val)
200 True
201 """
202 if not hash_value:
203 return True
205 try:
206 return self.hasher.check_needs_rehash(hash_value)
207 except (InvalidHash, ValueError) as e:
208 logger.warning(f"Invalid hash format when checking rehash need: {e}")
209 return True
210 except Exception as e:
211 logger.error(f"Unexpected error checking rehash need: {e}")
212 return True
214 def get_hash_info(self, hash_value: str) -> Optional[dict]:
215 """Extract information from an Argon2 hash.
217 Args:
218 hash_value: The Argon2id hash to analyze
220 Returns:
221 dict: Hash parameters or None if invalid
223 Examples:
224 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing
225 >>> hash_val = service.hash_password("test")
226 >>> info = service.get_hash_info(hash_val)
227 >>> info is not None
228 True
229 >>> 'time_cost' in info
230 True
231 >>> 'memory_cost' in info
232 True
233 """
234 if not hash_value:
235 return None
237 try:
238 # Parse the hash to extract parameters
239 # Argon2 hash format: $argon2id$v=19$m=65536,t=3,p=1$salt$hash
240 parts = hash_value.split("$")
241 if len(parts) < 4 or parts[1] != "argon2id":
242 return None
244 params_part = parts[3] # m=65536,t=3,p=1
245 params = {}
247 for param in params_part.split(","):
248 key, value = param.split("=")
249 if key == "m":
250 params["memory_cost"] = int(value)
251 elif key == "t":
252 params["time_cost"] = int(value)
253 elif key == "p": 253 ↛ 247line 253 didn't jump to line 247 because the condition on line 253 was always true
254 params["parallelism"] = int(value)
256 params["variant"] = "argon2id"
257 if len(parts) > 2: 257 ↛ 260line 257 didn't jump to line 260 because the condition on line 257 was always true
258 params["version"] = parts[2]
260 return params
261 except (ValueError, IndexError) as e:
262 logger.warning(f"Failed to parse Argon2 hash info: {e}")
263 return None
265 def __repr__(self) -> str:
266 """String representation of the service.
268 Returns:
269 str: String representation of Argon2PasswordService instance
270 """
271 return f"Argon2PasswordService(time_cost={self.time_cost}, memory_cost={self.memory_cost}, parallelism={self.parallelism})"
274# Global instance for use throughout the application
275password_service = Argon2PasswordService()
278def hash_password(password: str) -> str:
279 """Hash a password using the global Argon2 service.
281 Convenience function for password hashing.
283 Args:
284 password: The password to hash
286 Returns:
287 str: The hashed password
289 Examples:
290 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing
291 >>> hash_val = service.hash_password("test123")
292 >>> hash_val.startswith("$argon2id$")
293 True
294 """
295 return password_service.hash_password(password)
298def verify_password(password: str, hash_value: str) -> bool:
299 """Verify a password using the global Argon2 service.
301 Convenience function for password verification.
303 Args:
304 password: The password to verify
305 hash_value: The stored hash
307 Returns:
308 bool: True if password matches
310 Examples:
311 >>> service = Argon2PasswordService(time_cost=1, memory_cost=1024) # Light params for testing
312 >>> hash_val = service.hash_password("test123")
313 >>> service.verify_password("test123", hash_val)
314 True
315 >>> service.verify_password("wrong", hash_val)
316 False
317 """
318 return password_service.verify_password(password, hash_value)
321async def hash_password_async(password: str) -> str:
322 """Hash a password using the global Argon2 service asynchronously.
324 Convenience function for password hashing.
326 Args:
327 password: The password to hash
329 Returns:
330 str: The hashed password
331 """
332 return await password_service.hash_password_async(password)
335async def verify_password_async(password: str, hash_value: str) -> bool:
336 """Verify a password using the global Argon2 service asynchronously.
338 Convenience function for password verification.
340 Args:
341 password: The password to verify
342 hash_value: The stored hash
344 Returns:
345 bool: True if password matches
346 """
347 return await password_service.verify_password_async(password, hash_value)
350def needs_rehash(hash_value: str) -> bool:
351 """Check if a hash needs rehashing using the global service.
353 Args:
354 hash_value: The hash to check
356 Returns:
357 bool: True if rehash is needed
358 """
359 return password_service.needs_rehash(hash_value)