Coverage for mcpgateway / utils / create_jwt_token.py: 100%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3"""Location: ./mcpgateway/utils/create_jwt_token.py 

4Copyright 2025 

5SPDX-License-Identifier: Apache-2.0 

6Authors: Mihai Criveti 

7 

8jwt_cli.py - generate, inspect, **and be imported** for token helpers. 

9* **Run as a script** - friendly CLI (works with *no* flags). 

10* **Import as a library** - drop-in async functions `create_jwt_token` & `get_jwt_token` 

11 kept for backward-compatibility, now delegating to the shared core helper. 

12 

13Quick usage 

14----------- 

15CLI (default secret, default payload): 

16 $ python3 jwt_cli.py 

17 

18Library: 

19 from mcpgateway.utils.create_jwt_token import create_jwt_token, get_jwt_token 

20 

21 # inside async context 

22 jwt = await create_jwt_token({"username": "alice"}) 

23 

24Doctest examples 

25---------------- 

26>>> from mcpgateway.utils import create_jwt_token as jwt_util 

27>>> from mcpgateway.utils.jwt_config_helper import clear_jwt_caches 

28>>> clear_jwt_caches() 

29>>> jwt_util.settings.jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

30>>> jwt_util.settings.jwt_algorithm = 'HS256' 

31>>> token = jwt_util._create_jwt_token({'sub': 'alice'}, expires_in_minutes=1, secret='this-is-a-long-test-secret-key-32chars', algorithm='HS256') 

32>>> import jwt 

33>>> jwt.decode(token, 'this-is-a-long-test-secret-key-32chars', algorithms=['HS256'], audience=jwt_util.settings.jwt_audience, issuer=jwt_util.settings.jwt_issuer)['sub'] == 'alice' 

34True 

35>>> import asyncio 

36>>> t = asyncio.run(jwt_util.create_jwt_token({'sub': 'bob'}, expires_in_minutes=1, secret='this-is-a-long-test-secret-key-32chars', algorithm='HS256')) 

37>>> jwt.decode(t, 'this-is-a-long-test-secret-key-32chars', algorithms=['HS256'], audience=jwt_util.settings.jwt_audience, issuer=jwt_util.settings.jwt_issuer)['sub'] == 'bob' 

38True 

39""" 

40 

41# Future 

42from __future__ import annotations 

43 

44# Standard 

45import argparse 

46import asyncio 

47import datetime as _dt 

48import sys 

49from typing import Any, Dict, List, Optional, Sequence 

50import uuid 

51 

52# Third-Party 

53import jwt # PyJWT 

54import orjson 

55 

56# First-Party 

57from mcpgateway.config import settings 

58 

59__all__: Sequence[str] = ( 

60 "create_jwt_token", 

61 "get_jwt_token", 

62 "_create_jwt_token", 

63) 

64 

65# --------------------------------------------------------------------------- 

66# Defaults & constants 

67# --------------------------------------------------------------------------- 

68# Note: DEFAULT_SECRET is retrieved at runtime to support dynamic configuration changes 

69DEFAULT_ALGO: str = settings.jwt_algorithm 

70DEFAULT_EXP_MINUTES: int = settings.token_expiry 

71DEFAULT_USERNAME: str = settings.basic_auth_user 

72_TEAMS_UNSET = object() 

73 

74 

75# --------------------------------------------------------------------------- 

76# Core sync helper (used by both CLI & async wrappers) 

77# --------------------------------------------------------------------------- 

78 

79 

80def _create_jwt_token( 

81 data: Dict[str, Any], 

82 expires_in_minutes: int = DEFAULT_EXP_MINUTES, 

83 secret: str = "", # nosec B107 - Optional override; uses config if empty 

84 algorithm: str = "", # Optional override; uses config if empty 

85 user_data: Optional[Dict[str, Any]] = None, 

86 teams: Optional[List[str]] | object = _TEAMS_UNSET, 

87 scopes: Optional[Dict[str, Any]] = None, 

88) -> str: 

89 """Create a signed JWT token with automatic key selection and validation. 

90 

91 This internal function handles JWT token creation with both symmetric (HMAC) and 

92 asymmetric (RSA/ECDSA) algorithms. It automatically validates the JWT configuration, 

93 selects the appropriate signing key based on the configured algorithm, and creates 

94 a properly formatted JWT token with standard claims. 

95 

96 Supports both simple tokens (minimal claims) and rich tokens (with user, teams, 

97 and scopes). This enables consistent token format across CLI and API 

98 token creation paths. 

99 

100 Args: 

101 data: Dictionary containing payload data to encode in the token. 

102 expires_in_minutes: Token expiration time in minutes. Set to 0 to disable expiration. 

103 secret: Optional secret key for signing. If empty, uses JWT_SECRET_KEY from config. 

104 algorithm: Optional signing algorithm. If empty, uses JWT_ALGORITHM from config. 

105 user_data: Optional user information dict with keys: email, full_name, is_admin, auth_provider. 

106 teams: Optional list of team IDs the token is scoped to. 

107 Pass ``None`` explicitly to serialize ``"teams": null``. 

108 Omit the argument to leave the claim unchanged/absent. 

109 scopes: Optional scopes dict with keys: server_id, permissions, ip_restrictions, time_restrictions. 

110 

111 Returns: 

112 str: The signed JWT token string. 

113 

114 Raises: 

115 JWTConfigurationError: If JWT configuration is invalid or keys are missing. 

116 FileNotFoundError: If asymmetric key files don't exist. 

117 

118 Note: 

119 This is an internal function. Use create_jwt_token() for the async interface. 

120 When secret/algorithm are provided, they override the configuration values. 

121 When not provided (empty), configuration values are used as defaults. 

122 """ 

123 # First-Party 

124 from mcpgateway.utils.jwt_config_helper import get_jwt_private_key_or_secret, validate_jwt_algo_and_keys 

125 

126 # Use provided secret/algorithm or fall back to configuration 

127 if not secret: 

128 validate_jwt_algo_and_keys() 

129 secret = get_jwt_private_key_or_secret() 

130 if not algorithm: 

131 algorithm = settings.jwt_algorithm 

132 

133 payload = data.copy() 

134 now = _dt.datetime.now(_dt.timezone.utc) 

135 

136 # Add standard JWT claims 

137 payload["iat"] = int(now.timestamp()) # Issued at 

138 payload["iss"] = settings.jwt_issuer # Issuer 

139 payload["aud"] = settings.jwt_audience # Audience 

140 payload["jti"] = payload.get("jti") or str(uuid.uuid4()) # JWT ID for revocation support 

141 

142 # Optionally embed environment claim for cross-environment isolation 

143 if settings.embed_environment_in_tokens: 

144 payload["env"] = settings.environment 

145 

146 # Handle legacy username format - convert to sub for consistency 

147 if "username" in payload and "sub" not in payload: 

148 payload["sub"] = payload["username"] 

149 

150 # Add rich claims if provided (for API/CLI token parity) 

151 if user_data: 

152 payload["user"] = user_data 

153 

154 if teams is not _TEAMS_UNSET: 

155 payload["teams"] = teams 

156 

157 if scopes is not None: 

158 payload["scopes"] = scopes 

159 

160 payload_exp = payload.get("exp", 0) 

161 if payload_exp > 0: 

162 pass # The token already has a valid expiration time 

163 elif expires_in_minutes > 0: 

164 expire = now + _dt.timedelta(minutes=expires_in_minutes) 

165 payload["exp"] = int(expire.timestamp()) 

166 else: 

167 # Warn about non-expiring token 

168 print( 

169 "⚠️ WARNING: Creating token without expiration. This is a security risk!\n" 

170 " Consider using --exp with a value > 0 for production use.\n" 

171 " Once JWT API (#425) is available, use it for automatic token renewal.", 

172 file=sys.stderr, 

173 ) 

174 

175 return jwt.encode(payload, secret, algorithm=algorithm) 

176 

177 

178# --------------------------------------------------------------------------- 

179# **Async** wrappers for backward compatibility 

180# --------------------------------------------------------------------------- 

181 

182 

183async def create_jwt_token( 

184 data: Dict[str, Any], 

185 expires_in_minutes: int = DEFAULT_EXP_MINUTES, 

186 *, 

187 secret: str = None, 

188 algorithm: str = None, 

189 user_data: Optional[Dict[str, Any]] = None, 

190 teams: Optional[List[str]] | object = _TEAMS_UNSET, 

191 scopes: Optional[Dict[str, Any]] = None, 

192) -> str: 

193 """ 

194 Async facade for historic code. Internally synchronous-almost instant. 

195 

196 Args: 

197 data: Dictionary containing payload data to encode in the token. 

198 expires_in_minutes: Token expiration time in minutes. Default is 7 days. 

199 Set to 0 to disable expiration. 

200 secret: Optional secret key for signing. If None/empty, uses JWT_SECRET_KEY from config. 

201 algorithm: Optional signing algorithm. If None/empty, uses JWT_ALGORITHM from config. 

202 user_data: Optional user information dict with keys: email, full_name, is_admin, auth_provider. 

203 teams: Optional list of team IDs the token is scoped to. 

204 Pass ``None`` explicitly to serialize ``"teams": null``. 

205 Omit to leave teams absent. 

206 scopes: Optional scopes dict with keys: server_id, permissions, ip_restrictions, time_restrictions. 

207 

208 Returns: 

209 The JWT token string. 

210 

211 Doctest: 

212 >>> from mcpgateway.utils import create_jwt_token as jwt_util 

213 >>> from mcpgateway.utils.jwt_config_helper import clear_jwt_caches 

214 >>> clear_jwt_caches() 

215 >>> jwt_util.settings.jwt_secret_key = 'this-is-a-long-test-secret-key-32chars' 

216 >>> jwt_util.settings.jwt_algorithm = 'HS256' 

217 >>> import asyncio 

218 >>> t = asyncio.run(jwt_util.create_jwt_token({'sub': 'bob'}, expires_in_minutes=1)) 

219 >>> import jwt 

220 >>> jwt.decode(t, jwt_util.settings.jwt_secret_key, algorithms=[jwt_util.settings.jwt_algorithm], audience=jwt_util.settings.jwt_audience, issuer=jwt_util.settings.jwt_issuer)['sub'] == 'bob' 

221 True 

222 """ 

223 # Pass through secret/algorithm; _create_jwt_token will use config as fallback 

224 return _create_jwt_token(data, expires_in_minutes, secret or "", algorithm or "", user_data, teams, scopes) 

225 

226 

227async def get_jwt_token() -> str: 

228 """Return a token for ``{"username": "admin"}``, mirroring old behaviour. 

229 

230 Returns: 

231 The JWT token string with default admin username. 

232 """ 

233 user_data = {"username": DEFAULT_USERNAME} 

234 return await create_jwt_token(user_data) 

235 

236 

237# --------------------------------------------------------------------------- 

238# **Decode** helper (non-verifying) - used by the CLI 

239# --------------------------------------------------------------------------- 

240 

241 

242def _decode_jwt_token(token: str, algorithms: List[str] | None = None) -> Dict[str, Any]: 

243 """Decode with proper audience and issuer verification. 

244 

245 Args: 

246 token: JWT token string to decode. 

247 algorithms: List of allowed algorithms for decoding. Defaults to [DEFAULT_ALGO]. 

248 

249 Returns: 

250 Dictionary containing the decoded payload. 

251 

252 Examples: 

253 >>> # Test algorithm parameter handling 

254 >>> algs = ['HS256', 'HS512'] 

255 >>> len(algs) 

256 2 

257 >>> 'HS256' in algs 

258 True 

259 >>> # Test None algorithms handling 

260 >>> default_algo = [DEFAULT_ALGO] 

261 >>> isinstance(default_algo, list) 

262 True 

263 """ 

264 # Get the actual string value from SecretStr if needed 

265 secret_key = settings.jwt_secret_key.get_secret_value() if hasattr(settings.jwt_secret_key, "get_secret_value") else settings.jwt_secret_key 

266 return jwt.decode( 

267 token, 

268 secret_key, 

269 algorithms=algorithms or [DEFAULT_ALGO], 

270 audience=settings.jwt_audience, 

271 issuer=settings.jwt_issuer, 

272 # options={"require": ["exp"]}, # Require expiration 

273 ) 

274 

275 

276# --------------------------------------------------------------------------- 

277# CLI Parsing & helpers 

278# --------------------------------------------------------------------------- 

279 

280 

281def _parse_args(): 

282 """Parse command line arguments for JWT token operations. 

283 

284 Sets up an argument parser with mutually exclusive options for: 

285 - Creating tokens with username (-u/--username) 

286 - Creating tokens with custom data (-d/--data) 

287 - Decoding existing tokens (--decode) 

288 

289 Additional options control expiration, secret key, algorithm, and output format. 

290 

291 Returns: 

292 argparse.Namespace: Parsed command line arguments containing: 

293 - username: Optional username for simple payload 

294 - data: Optional JSON or key=value pairs for custom payload 

295 - decode: Optional token string to decode 

296 - exp: Expiration time in minutes (default: DEFAULT_EXP_MINUTES) 

297 - secret: Secret key for signing (default: "" - uses JWT_SECRET_KEY from config) 

298 - algo: Signing algorithm (default: "" - uses JWT_ALGORITHM from config) 

299 - pretty: Whether to pretty-print payload before encoding 

300 

301 Examples: 

302 >>> # Simulating command line args 

303 >>> import sys 

304 >>> sys.argv = ['jwt_cli.py', '-u', 'alice', '-e', '60'] 

305 >>> args = _parse_args() # doctest: +SKIP 

306 >>> args.username # doctest: +SKIP 

307 'alice' 

308 >>> args.exp # doctest: +SKIP 

309 60 

310 """ 

311 p = argparse.ArgumentParser( 

312 description="Generate or inspect JSON Web Tokens.", 

313 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 

314 ) 

315 

316 group = p.add_mutually_exclusive_group() 

317 group.add_argument("-u", "--username", help="Add username=<value> to the payload.") 

318 group.add_argument("-d", "--data", help="Raw JSON payload or comma-separated key=value pairs.") 

319 group.add_argument("--decode", metavar="TOKEN", help="Token string to decode (no verification).") 

320 

321 p.add_argument( 

322 "-e", 

323 "--exp", 

324 type=int, 

325 default=DEFAULT_EXP_MINUTES, 

326 help="Expiration in minutes (0 disables the exp claim).", 

327 ) 

328 p.add_argument("-s", "--secret", default="", help="Secret key for signing. If not provided, uses JWT_SECRET_KEY from config.") 

329 p.add_argument("--algo", default="", help="Signing algorithm (e.g., HS256, RS256). If not provided, uses JWT_ALGORITHM from config.") 

330 p.add_argument("--pretty", action="store_true", help="Pretty-print payload before encoding.") 

331 

332 # Rich token creation arguments (requires JWT_SECRET_KEY) 

333 p.add_argument("--admin", action="store_true", help="Mark user as admin (DEV/TEST ONLY - requires JWT_SECRET_KEY)") 

334 p.add_argument("--teams", help="Comma-separated team IDs (e.g., team-123,team-456). DEV/TEST ONLY") 

335 p.add_argument("--scopes", help='JSON scopes object for permission restrictions (e.g., \'{"permissions": ["tools.read"]}\'). DEV/TEST ONLY') 

336 p.add_argument("--full-name", help="User's full name for the token") 

337 

338 return p.parse_args() 

339 

340 

341def _payload_from_cli(args) -> Dict[str, Any]: 

342 """Extract JWT payload from parsed command line arguments. 

343 

344 Processes arguments in priority order: 

345 1. If username is specified, creates {"username": <value>} 

346 2. If data is specified, parses as JSON or key=value pairs 

347 3. Otherwise, returns default payload with admin username 

348 

349 The data argument supports two formats: 

350 - JSON string: '{"key": "value", "foo": "bar"}' 

351 - Comma-separated pairs: 'key=value,foo=bar' 

352 

353 Args: 

354 args: Parsed command line arguments from argparse containing 

355 username, data, and other JWT options. 

356 

357 Returns: 

358 Dict[str, Any]: The payload dictionary to encode in the JWT. 

359 

360 Raises: 

361 ValueError: If data contains invalid key=value pairs (missing '='). 

362 

363 Examples: 

364 >>> from argparse import Namespace 

365 >>> args = Namespace(username='alice', data=None) 

366 >>> _payload_from_cli(args) 

367 {'username': 'alice'} 

368 >>> args = Namespace(username=None, data='{"role": "admin", "id": 123}') 

369 >>> _payload_from_cli(args) 

370 {'role': 'admin', 'id': 123} 

371 >>> args = Namespace(username=None, data='name=bob,role=user') 

372 >>> _payload_from_cli(args) 

373 {'name': 'bob', 'role': 'user'} 

374 >>> args = Namespace(username=None, data='invalid_format') 

375 >>> _payload_from_cli(args) # doctest: +ELLIPSIS 

376 Traceback (most recent call last): 

377 ... 

378 ValueError: Invalid key=value pair: 'invalid_format' 

379 """ 

380 if args.username is not None: 

381 return {"username": args.username} 

382 

383 if args.data is not None: 

384 # Attempt JSON first 

385 try: 

386 return orjson.loads(args.data) 

387 except orjson.JSONDecodeError: 

388 pairs = [kv.strip() for kv in args.data.split(",") if kv.strip()] 

389 payload: Dict[str, Any] = {} 

390 for pair in pairs: 

391 if "=" not in pair: 

392 raise ValueError(f"Invalid key=value pair: '{pair}'") 

393 k, v = pair.split("=", 1) 

394 payload[k.strip()] = v.strip() 

395 return payload 

396 

397 # Fallback default payload 

398 return {"username": DEFAULT_USERNAME} 

399 

400 

401# --------------------------------------------------------------------------- 

402# Entry point for ``python3 jwt_cli.py`` 

403# --------------------------------------------------------------------------- 

404 

405 

406def main() -> None: # pragma: no cover 

407 """Entry point for JWT command line interface. 

408 

409 Provides two main modes of operation: 

410 1. Token creation: Generates a new JWT with specified payload 

411 2. Token decoding: Decodes and displays an existing JWT (without verification) 

412 

413 In creation mode, supports: 

414 - Simple username payload (-u/--username) 

415 - Custom JSON or key=value payload (-d/--data) 

416 - Rich tokens with admin/team/scope claims (--admin, --teams, --scopes) 

417 - Configurable expiration, secret, and algorithm 

418 - Optional pretty-printing of payload before encoding 

419 

420 In decode mode, displays the decoded payload as formatted JSON. 

421 

422 The function handles being run in different contexts: 

423 - Direct script execution: Runs synchronously 

424 - Within existing asyncio loop: Delegates to executor to avoid blocking 

425 

426 Examples: 

427 Command line usage:: 

428 

429 # Create token with username 

430 $ python jwt_cli.py -u alice 

431 eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9... 

432 

433 # Create admin token (DEV/TEST ONLY) 

434 $ python jwt_cli.py -u admin@example.com --admin --full-name "Admin User" 

435 ⚠️ WARNING: Creating token with elevated claims... 

436 eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9... 

437 

438 # Create team-scoped token (DEV/TEST ONLY) 

439 $ python jwt_cli.py -u user@example.com --teams team-123,team-456 

440 ⚠️ WARNING: Creating token with elevated claims... 

441 eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9... 

442 

443 # Decode existing token 

444 $ python jwt_cli.py --decode eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9... 

445 { 

446 "username": "alice", 

447 "exp": 1234567890 

448 } 

449 """ 

450 args = _parse_args() 

451 

452 # Decode mode takes precedence 

453 if args.decode: 

454 decoded = _decode_jwt_token(args.decode, algorithms=[args.algo or DEFAULT_ALGO]) 

455 sys.stdout.write(orjson.dumps(decoded, default=str, option=orjson.OPT_INDENT_2).decode()) 

456 sys.stdout.write("\n") 

457 return 

458 

459 # Validate: --algo requires --secret to avoid algorithm/key mismatch 

460 if args.algo and not args.secret: 

461 print( 

462 "ERROR: --algo requires --secret to be specified.\n" 

463 " Using --algo alone would mix config-based keys with a different algorithm,\n" 

464 " which can produce invalid tokens. Either:\n" 

465 " - Provide both --secret and --algo for full override\n" 

466 " - Omit both to use JWT_SECRET_KEY and JWT_ALGORITHM from config", 

467 file=sys.stderr, 

468 ) 

469 sys.exit(1) 

470 

471 # Security warning for rich tokens 

472 if args.admin or args.teams or args.scopes: 

473 print( 

474 "⚠️ WARNING: Creating token with elevated claims (admin/teams/scopes)\n" 

475 " This requires JWT_SECRET_KEY and is intended for development/testing only.\n" 

476 " For production token management, use the /tokens API endpoint.\n", 

477 file=sys.stderr, 

478 ) 

479 

480 payload = _payload_from_cli(args) 

481 

482 # Build rich token parameters if provided 

483 user_data = None 

484 teams: object = _TEAMS_UNSET 

485 scopes_dict = None 

486 

487 if args.admin or args.teams or args.scopes or args.full_name: 

488 user_email = payload.get("sub") or payload.get("username", "admin@example.com") 

489 

490 # Build user data 

491 user_data = { 

492 "email": user_email, 

493 "full_name": args.full_name or "CLI User", 

494 "is_admin": bool(args.admin), 

495 "auth_provider": "cli", # Mark as CLI-generated for auditing 

496 } 

497 

498 # Build teams claim. In rich-token mode, explicit null preserves 

499 # normalize_token_teams semantics for admin bypass when intended. 

500 teams = None 

501 if args.teams: 

502 teams = [t.strip() for t in args.teams.split(",") if t.strip()] 

503 

504 # Build scopes 

505 if args.scopes: 

506 try: 

507 scopes_dict = orjson.loads(args.scopes) 

508 except orjson.JSONDecodeError as e: 

509 print(f"ERROR: Invalid JSON for --scopes: {e}", file=sys.stderr) 

510 sys.exit(1) 

511 

512 if args.pretty: 

513 print("Payload:") 

514 print(orjson.dumps(payload, default=str, option=orjson.OPT_INDENT_2).decode()) 

515 if user_data: 

516 print("User Data:") 

517 print(orjson.dumps(user_data, default=str, option=orjson.OPT_INDENT_2).decode()) 

518 if teams: 

519 print(f"Teams: {teams}") 

520 if scopes_dict: 

521 print("Scopes:") 

522 print(orjson.dumps(scopes_dict, default=str, option=orjson.OPT_INDENT_2).decode()) 

523 print("-") 

524 

525 token = _create_jwt_token( 

526 payload, 

527 args.exp, 

528 args.secret, 

529 args.algo, 

530 user_data=user_data, 

531 teams=teams, 

532 scopes=scopes_dict, 

533 ) 

534 print(token) 

535 

536 

537if __name__ == "__main__": 

538 # Support being run via ``python3 -m mcpgateway.utils.create_jwt_token`` too 

539 try: 

540 # Respect existing asyncio loop if present (e.g. inside uvicorn dev server) 

541 loop = asyncio.get_running_loop() 

542 loop.run_until_complete(asyncio.sleep(0)) # no-op to ensure loop alive 

543 except RuntimeError: 

544 # No loop; we're just a simple CLI call - run main synchronously 

545 main() 

546 else: 

547 # We're inside an active asyncio program - delegate to executor to avoid blocking 

548 loop.run_in_executor(None, main)