Coverage for mcpgateway / utils / services_auth.py: 100%

59 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/utils/services_auth.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7mcpgateway.utils.services_auth - Authentication utilities for ContextForge 

8Doctest examples 

9---------------- 

10>>> import os 

11>>> from mcpgateway.utils import services_auth 

12>>> os.environ['AUTH_ENCRYPTION_SECRET'] = 'doctest-secret' 

13>>> services_auth.settings.auth_encryption_secret = 'doctest-secret' 

14>>> key = services_auth.get_key() 

15>>> isinstance(key, bytes) 

16True 

17>>> d = {'user': 'alice'} 

18>>> token = services_auth.encode_auth(d) 

19>>> isinstance(token, str) 

20True 

21>>> services_auth.decode_auth(token) == d 

22True 

23>>> services_auth.encode_auth(None) is None 

24True 

25>>> services_auth.decode_auth(None) == {} 

26True 

27>>> services_auth.settings.auth_encryption_secret = '' 

28>>> try: 

29... services_auth.get_key() 

30... except ValueError as e: 

31... print('error') 

32error 

33""" 

34 

35# Standard 

36import base64 

37import hashlib 

38import os 

39from typing import Optional, Tuple 

40 

41# Third-Party 

42from cryptography.hazmat.primitives.ciphers.aead import AESGCM 

43import orjson 

44from pydantic import SecretStr 

45 

46# First-Party 

47from mcpgateway.config import settings 

48 

49# Cache for derived key and AESGCM instance 

50# Key: passphrase value, Value: (key_bytes, AESGCM instance) 

51_crypto_cache: dict[str, Tuple[bytes, AESGCM]] = {} 

52 

53 

54def _get_passphrase() -> str: 

55 """Extract passphrase from settings, handling SecretStr type. 

56 

57 Returns: 

58 str: The passphrase value 

59 

60 Raises: 

61 ValueError: If the passphrase is not set or empty 

62 """ 

63 passphrase = settings.auth_encryption_secret 

64 if not passphrase: 

65 raise ValueError("AUTH_ENCRYPTION_SECRET not set in environment.") 

66 

67 # If it's SecretStr, extract the real value 

68 if isinstance(passphrase, SecretStr): 

69 return passphrase.get_secret_value() 

70 return passphrase 

71 

72 

73def get_key() -> bytes: 

74 """ 

75 Generate a 32-byte AES encryption key derived from a passphrase. 

76 

77 The key is cached based on the passphrase value. If the passphrase 

78 changes, the cache is automatically invalidated. 

79 

80 Returns: 

81 bytes: A 32-byte encryption key. 

82 

83 Raises: 

84 ValueError: If the passphrase is not set or empty. 

85 

86 Doctest: 

87 >>> import os 

88 >>> from mcpgateway.utils import services_auth 

89 >>> os.environ['AUTH_ENCRYPTION_SECRET'] = 'doctest-secret' 

90 >>> services_auth.settings.auth_encryption_secret = 'doctest-secret' 

91 >>> key = services_auth.get_key() 

92 >>> isinstance(key, bytes) 

93 True 

94 >>> services_auth.settings.auth_encryption_secret = '' 

95 >>> try: 

96 ... services_auth.get_key() 

97 ... except ValueError as e: 

98 ... print('error') 

99 error 

100 """ 

101 passphrase = _get_passphrase() 

102 

103 # Check cache 

104 if passphrase in _crypto_cache: 

105 return _crypto_cache[passphrase][0] 

106 

107 # Derive key 

108 key = hashlib.sha256(passphrase.encode()).digest() # 32-byte key 

109 

110 # Cache key and AESGCM together 

111 aesgcm = AESGCM(key) 

112 _crypto_cache.clear() # Clear old entries 

113 _crypto_cache[passphrase] = (key, aesgcm) 

114 

115 return key 

116 

117 

118def _get_aesgcm() -> AESGCM: 

119 """Get cached AESGCM instance, creating if needed. 

120 

121 Returns: 

122 AESGCM: Cached AESGCM cipher instance 

123 

124 Raises: 

125 ValueError: If the passphrase is not set or empty 

126 """ 

127 passphrase = _get_passphrase() 

128 

129 # Check cache 

130 if passphrase in _crypto_cache: 

131 return _crypto_cache[passphrase][1] 

132 

133 # Derive key and create AESGCM 

134 key = hashlib.sha256(passphrase.encode()).digest() 

135 aesgcm = AESGCM(key) 

136 

137 # Cache both 

138 _crypto_cache.clear() # Clear old entries 

139 _crypto_cache[passphrase] = (key, aesgcm) 

140 

141 return aesgcm 

142 

143 

144def clear_crypto_cache() -> None: 

145 """Clear the crypto cache. 

146 

147 Call this function: 

148 - In test fixtures to ensure test isolation 

149 - After passphrase rotation (if supported at runtime) 

150 """ 

151 _crypto_cache.clear() 

152 

153 

154def _aesgcm_for_secret(secret: str) -> AESGCM: 

155 """Create an AESGCM instance for an explicit secret (not cached globally). 

156 

157 Args: 

158 secret: The encryption passphrase. 

159 

160 Returns: 

161 AESGCM: A cipher instance keyed to *secret*. 

162 """ 

163 key = hashlib.sha256(secret.encode()).digest() 

164 return AESGCM(key) 

165 

166 

167def encode_auth(auth_value: dict, *, secret: Optional[str] = None) -> Optional[str]: 

168 """ 

169 Encrypt and encode an authentication dictionary into a compact base64-url string. 

170 

171 Args: 

172 auth_value (dict): The authentication dictionary to encrypt and encode. 

173 secret (str, optional): Explicit encryption secret. When provided the 

174 global ``settings.auth_encryption_secret`` is **not** read, which 

175 avoids mutating shared state during concurrent re-keying. 

176 

177 Returns: 

178 str: A base64-url-safe encrypted string representing the dictionary, or None if input is None. 

179 

180 Doctest: 

181 >>> import os 

182 >>> from mcpgateway.utils import services_auth 

183 >>> os.environ['AUTH_ENCRYPTION_SECRET'] = 'doctest-secret' 

184 >>> services_auth.settings.auth_encryption_secret = 'doctest-secret' 

185 >>> token = services_auth.encode_auth({'user': 'alice'}) 

186 >>> isinstance(token, str) 

187 True 

188 >>> services_auth.encode_auth(None) is None 

189 True 

190 """ 

191 if not auth_value: 

192 return None 

193 plaintext = orjson.dumps(auth_value) 

194 aesgcm = _aesgcm_for_secret(secret) if secret else _get_aesgcm() 

195 nonce = os.urandom(12) 

196 ciphertext = aesgcm.encrypt(nonce, plaintext, None) 

197 combined = nonce + ciphertext 

198 encoded = base64.urlsafe_b64encode(combined).rstrip(b"=") 

199 return encoded.decode() 

200 

201 

202def decode_auth(encoded_value: str, *, secret: Optional[str] = None) -> dict: 

203 """ 

204 Decode and decrypt a base64-url-safe encrypted string back into the authentication dictionary. 

205 

206 Args: 

207 encoded_value (str): The encrypted base64-url string to decode and decrypt. 

208 secret (str, optional): Explicit encryption secret. When provided the 

209 global ``settings.auth_encryption_secret`` is **not** read. 

210 

211 Returns: 

212 dict: The decrypted authentication dictionary, or empty dict if input is None. 

213 

214 Doctest: 

215 >>> import os 

216 >>> from mcpgateway.utils import services_auth 

217 >>> os.environ['AUTH_ENCRYPTION_SECRET'] = 'doctest-secret' 

218 >>> services_auth.settings.auth_encryption_secret = 'doctest-secret' 

219 >>> d = {'user': 'alice'} 

220 >>> token = services_auth.encode_auth(d) 

221 >>> services_auth.decode_auth(token) == d 

222 True 

223 >>> services_auth.decode_auth(None) == {} 

224 True 

225 """ 

226 if not encoded_value: 

227 return {} 

228 aesgcm = _aesgcm_for_secret(secret) if secret else _get_aesgcm() 

229 # Fix base64 padding 

230 padded = encoded_value + "=" * (-len(encoded_value) % 4) 

231 combined = base64.urlsafe_b64decode(padded) 

232 nonce = combined[:12] 

233 ciphertext = combined[12:] 

234 plaintext = aesgcm.decrypt(nonce, ciphertext, None) 

235 return orjson.loads(plaintext)