Coverage for mcpgateway / utils / create_jwt_token.py: 100%

84 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3"""Location: ./mcpgateway/utils/create_jwt_token.py 

4Copyright 2025 

5SPDX-License-Identifier: Apache-2.0 

6Authors: Mihai Criveti 

7 

8jwt_cli.py - generate, inspect, **and be imported** for token helpers. 

9* **Run as a script** - friendly CLI (works with *no* flags). 

10* **Import as a library** - drop-in async functions `create_jwt_token` & `get_jwt_token` 

11 kept for backward-compatibility, now delegating to the shared core helper. 

12 

13Quick usage 

14----------- 

15CLI (default secret, default payload): 

16 $ python3 jwt_cli.py 

17 

18Library: 

19 from mcpgateway.utils.create_jwt_token import create_jwt_token, get_jwt_token 

20 

21 # inside async context 

22 jwt = await create_jwt_token({"username": "alice"}) 

23 

24Doctest examples 

25---------------- 

26>>> from mcpgateway.utils import create_jwt_token as jwt_util 

27>>> from mcpgateway.utils.jwt_config_helper import clear_jwt_caches 

28>>> clear_jwt_caches() 

29>>> jwt_util.settings.jwt_secret_key = 'secret' 

30>>> jwt_util.settings.jwt_algorithm = 'HS256' 

31>>> token = jwt_util._create_jwt_token({'sub': 'alice'}, expires_in_minutes=1, secret='secret', algorithm='HS256') 

32>>> import jwt 

33>>> jwt.decode(token, 'secret', algorithms=['HS256'], audience=jwt_util.settings.jwt_audience, issuer=jwt_util.settings.jwt_issuer)['sub'] == 'alice' 

34True 

35>>> import asyncio 

36>>> t = asyncio.run(jwt_util.create_jwt_token({'sub': 'bob'}, expires_in_minutes=1, secret='secret', algorithm='HS256')) 

37>>> jwt.decode(t, 'secret', algorithms=['HS256'], audience=jwt_util.settings.jwt_audience, issuer=jwt_util.settings.jwt_issuer)['sub'] == 'bob' 

38True 

39""" 

40 

41# Future 

42from __future__ import annotations 

43 

44# Standard 

45import argparse 

46import asyncio 

47import datetime as _dt 

48import sys 

49from typing import Any, Dict, List, Optional, Sequence 

50import uuid 

51 

52# Third-Party 

53import jwt # PyJWT 

54import orjson 

55 

56# First-Party 

57from mcpgateway.config import settings 

58 

59__all__: Sequence[str] = ( 

60 "create_jwt_token", 

61 "get_jwt_token", 

62 "_create_jwt_token", 

63) 

64 

65# --------------------------------------------------------------------------- 

66# Defaults & constants 

67# --------------------------------------------------------------------------- 

68# Note: DEFAULT_SECRET is retrieved at runtime to support dynamic configuration changes 

69DEFAULT_ALGO: str = settings.jwt_algorithm 

70DEFAULT_EXP_MINUTES: int = settings.token_expiry 

71DEFAULT_USERNAME: str = settings.basic_auth_user 

72 

73 

74# --------------------------------------------------------------------------- 

75# Core sync helper (used by both CLI & async wrappers) 

76# --------------------------------------------------------------------------- 

77 

78 

79def _create_jwt_token( 

80 data: Dict[str, Any], 

81 expires_in_minutes: int = DEFAULT_EXP_MINUTES, 

82 secret: str = "", # nosec B107 - Optional override; uses config if empty 

83 algorithm: str = "", # Optional override; uses config if empty 

84 user_data: Optional[Dict[str, Any]] = None, 

85 teams: Optional[List[str]] = None, 

86 scopes: Optional[Dict[str, Any]] = None, 

87) -> str: 

88 """Create a signed JWT token with automatic key selection and validation. 

89 

90 This internal function handles JWT token creation with both symmetric (HMAC) and 

91 asymmetric (RSA/ECDSA) algorithms. It automatically validates the JWT configuration, 

92 selects the appropriate signing key based on the configured algorithm, and creates 

93 a properly formatted JWT token with standard claims. 

94 

95 Supports both simple tokens (minimal claims) and rich tokens (with user, teams, 

96 and scopes). This enables consistent token format across CLI and API 

97 token creation paths. 

98 

99 Args: 

100 data: Dictionary containing payload data to encode in the token. 

101 expires_in_minutes: Token expiration time in minutes. Set to 0 to disable expiration. 

102 secret: Optional secret key for signing. If empty, uses JWT_SECRET_KEY from config. 

103 algorithm: Optional signing algorithm. If empty, uses JWT_ALGORITHM from config. 

104 user_data: Optional user information dict with keys: email, full_name, is_admin, auth_provider. 

105 teams: Optional list of team IDs the token is scoped to. 

106 scopes: Optional scopes dict with keys: server_id, permissions, ip_restrictions, time_restrictions. 

107 

108 Returns: 

109 str: The signed JWT token string. 

110 

111 Raises: 

112 JWTConfigurationError: If JWT configuration is invalid or keys are missing. 

113 FileNotFoundError: If asymmetric key files don't exist. 

114 

115 Note: 

116 This is an internal function. Use create_jwt_token() for the async interface. 

117 When secret/algorithm are provided, they override the configuration values. 

118 When not provided (empty), configuration values are used as defaults. 

119 """ 

120 # First-Party 

121 from mcpgateway.utils.jwt_config_helper import get_jwt_private_key_or_secret, validate_jwt_algo_and_keys 

122 

123 # Use provided secret/algorithm or fall back to configuration 

124 if not secret: 

125 validate_jwt_algo_and_keys() 

126 secret = get_jwt_private_key_or_secret() 

127 if not algorithm: 

128 algorithm = settings.jwt_algorithm 

129 

130 payload = data.copy() 

131 now = _dt.datetime.now(_dt.timezone.utc) 

132 

133 # Add standard JWT claims 

134 payload["iat"] = int(now.timestamp()) # Issued at 

135 payload["iss"] = settings.jwt_issuer # Issuer 

136 payload["aud"] = settings.jwt_audience # Audience 

137 payload["jti"] = payload.get("jti") or str(uuid.uuid4()) # JWT ID for revocation support 

138 

139 # Optionally embed environment claim for cross-environment isolation 

140 if settings.embed_environment_in_tokens: 

141 payload["env"] = settings.environment 

142 

143 # Handle legacy username format - convert to sub for consistency 

144 if "username" in payload and "sub" not in payload: 

145 payload["sub"] = payload["username"] 

146 

147 # Add rich claims if provided (for API/CLI token parity) 

148 if user_data: 

149 payload["user"] = user_data 

150 

151 if teams is not None: 

152 payload["teams"] = teams 

153 

154 if scopes is not None: 

155 payload["scopes"] = scopes 

156 

157 payload_exp = payload.get("exp", 0) 

158 if payload_exp > 0: 

159 pass # The token already has a valid expiration time 

160 elif expires_in_minutes > 0: 

161 expire = now + _dt.timedelta(minutes=expires_in_minutes) 

162 payload["exp"] = int(expire.timestamp()) 

163 else: 

164 # Warn about non-expiring token 

165 print( 

166 "⚠️ WARNING: Creating token without expiration. This is a security risk!\n" 

167 " Consider using --exp with a value > 0 for production use.\n" 

168 " Once JWT API (#425) is available, use it for automatic token renewal.", 

169 file=sys.stderr, 

170 ) 

171 

172 return jwt.encode(payload, secret, algorithm=algorithm) 

173 

174 

175# --------------------------------------------------------------------------- 

176# **Async** wrappers for backward compatibility 

177# --------------------------------------------------------------------------- 

178 

179 

180async def create_jwt_token( 

181 data: Dict[str, Any], 

182 expires_in_minutes: int = DEFAULT_EXP_MINUTES, 

183 *, 

184 secret: str = None, 

185 algorithm: str = None, 

186 user_data: Optional[Dict[str, Any]] = None, 

187 teams: Optional[List[str]] = None, 

188 scopes: Optional[Dict[str, Any]] = None, 

189) -> str: 

190 """ 

191 Async facade for historic code. Internally synchronous-almost instant. 

192 

193 Args: 

194 data: Dictionary containing payload data to encode in the token. 

195 expires_in_minutes: Token expiration time in minutes. Default is 7 days. 

196 Set to 0 to disable expiration. 

197 secret: Optional secret key for signing. If None/empty, uses JWT_SECRET_KEY from config. 

198 algorithm: Optional signing algorithm. If None/empty, uses JWT_ALGORITHM from config. 

199 user_data: Optional user information dict with keys: email, full_name, is_admin, auth_provider. 

200 teams: Optional list of team IDs the token is scoped to. 

201 scopes: Optional scopes dict with keys: server_id, permissions, ip_restrictions, time_restrictions. 

202 

203 Returns: 

204 The JWT token string. 

205 

206 Doctest: 

207 >>> from mcpgateway.utils import create_jwt_token as jwt_util 

208 >>> from mcpgateway.utils.jwt_config_helper import clear_jwt_caches 

209 >>> clear_jwt_caches() 

210 >>> jwt_util.settings.jwt_secret_key = 'secret' 

211 >>> jwt_util.settings.jwt_algorithm = 'HS256' 

212 >>> import asyncio 

213 >>> t = asyncio.run(jwt_util.create_jwt_token({'sub': 'bob'}, expires_in_minutes=1)) 

214 >>> import jwt 

215 >>> jwt.decode(t, jwt_util.settings.jwt_secret_key, algorithms=[jwt_util.settings.jwt_algorithm], audience=jwt_util.settings.jwt_audience, issuer=jwt_util.settings.jwt_issuer)['sub'] == 'bob' 

216 True 

217 """ 

218 # Pass through secret/algorithm; _create_jwt_token will use config as fallback 

219 return _create_jwt_token(data, expires_in_minutes, secret or "", algorithm or "", user_data, teams, scopes) 

220 

221 

222async def get_jwt_token() -> str: 

223 """Return a token for ``{"username": "admin"}``, mirroring old behaviour. 

224 

225 Returns: 

226 The JWT token string with default admin username. 

227 """ 

228 user_data = {"username": DEFAULT_USERNAME} 

229 return await create_jwt_token(user_data) 

230 

231 

232# --------------------------------------------------------------------------- 

233# **Decode** helper (non-verifying) - used by the CLI 

234# --------------------------------------------------------------------------- 

235 

236 

237def _decode_jwt_token(token: str, algorithms: List[str] | None = None) -> Dict[str, Any]: 

238 """Decode with proper audience and issuer verification. 

239 

240 Args: 

241 token: JWT token string to decode. 

242 algorithms: List of allowed algorithms for decoding. Defaults to [DEFAULT_ALGO]. 

243 

244 Returns: 

245 Dictionary containing the decoded payload. 

246 

247 Examples: 

248 >>> # Test algorithm parameter handling 

249 >>> algs = ['HS256', 'HS512'] 

250 >>> len(algs) 

251 2 

252 >>> 'HS256' in algs 

253 True 

254 >>> # Test None algorithms handling 

255 >>> default_algo = [DEFAULT_ALGO] 

256 >>> isinstance(default_algo, list) 

257 True 

258 """ 

259 # Get the actual string value from SecretStr if needed 

260 secret_key = settings.jwt_secret_key.get_secret_value() if hasattr(settings.jwt_secret_key, "get_secret_value") else settings.jwt_secret_key 

261 return jwt.decode( 

262 token, 

263 secret_key, 

264 algorithms=algorithms or [DEFAULT_ALGO], 

265 audience=settings.jwt_audience, 

266 issuer=settings.jwt_issuer, 

267 # options={"require": ["exp"]}, # Require expiration 

268 ) 

269 

270 

271# --------------------------------------------------------------------------- 

272# CLI Parsing & helpers 

273# --------------------------------------------------------------------------- 

274 

275 

276def _parse_args(): 

277 """Parse command line arguments for JWT token operations. 

278 

279 Sets up an argument parser with mutually exclusive options for: 

280 - Creating tokens with username (-u/--username) 

281 - Creating tokens with custom data (-d/--data) 

282 - Decoding existing tokens (--decode) 

283 

284 Additional options control expiration, secret key, algorithm, and output format. 

285 

286 Returns: 

287 argparse.Namespace: Parsed command line arguments containing: 

288 - username: Optional username for simple payload 

289 - data: Optional JSON or key=value pairs for custom payload 

290 - decode: Optional token string to decode 

291 - exp: Expiration time in minutes (default: DEFAULT_EXP_MINUTES) 

292 - secret: Secret key for signing (default: "" - uses JWT_SECRET_KEY from config) 

293 - algo: Signing algorithm (default: "" - uses JWT_ALGORITHM from config) 

294 - pretty: Whether to pretty-print payload before encoding 

295 

296 Examples: 

297 >>> # Simulating command line args 

298 >>> import sys 

299 >>> sys.argv = ['jwt_cli.py', '-u', 'alice', '-e', '60'] 

300 >>> args = _parse_args() # doctest: +SKIP 

301 >>> args.username # doctest: +SKIP 

302 'alice' 

303 >>> args.exp # doctest: +SKIP 

304 60 

305 """ 

306 p = argparse.ArgumentParser( 

307 description="Generate or inspect JSON Web Tokens.", 

308 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 

309 ) 

310 

311 group = p.add_mutually_exclusive_group() 

312 group.add_argument("-u", "--username", help="Add username=<value> to the payload.") 

313 group.add_argument("-d", "--data", help="Raw JSON payload or comma-separated key=value pairs.") 

314 group.add_argument("--decode", metavar="TOKEN", help="Token string to decode (no verification).") 

315 

316 p.add_argument( 

317 "-e", 

318 "--exp", 

319 type=int, 

320 default=DEFAULT_EXP_MINUTES, 

321 help="Expiration in minutes (0 disables the exp claim).", 

322 ) 

323 p.add_argument("-s", "--secret", default="", help="Secret key for signing. If not provided, uses JWT_SECRET_KEY from config.") 

324 p.add_argument("--algo", default="", help="Signing algorithm (e.g., HS256, RS256). If not provided, uses JWT_ALGORITHM from config.") 

325 p.add_argument("--pretty", action="store_true", help="Pretty-print payload before encoding.") 

326 

327 # Rich token creation arguments (requires JWT_SECRET_KEY) 

328 p.add_argument("--admin", action="store_true", help="Mark user as admin (DEV/TEST ONLY - requires JWT_SECRET_KEY)") 

329 p.add_argument("--teams", help="Comma-separated team IDs (e.g., team-123,team-456). DEV/TEST ONLY") 

330 p.add_argument("--scopes", help='JSON scopes object for permission restrictions (e.g., \'{"permissions": ["tools.read"]}\'). DEV/TEST ONLY') 

331 p.add_argument("--full-name", help="User's full name for the token") 

332 

333 return p.parse_args() 

334 

335 

336def _payload_from_cli(args) -> Dict[str, Any]: 

337 """Extract JWT payload from parsed command line arguments. 

338 

339 Processes arguments in priority order: 

340 1. If username is specified, creates {"username": <value>} 

341 2. If data is specified, parses as JSON or key=value pairs 

342 3. Otherwise, returns default payload with admin username 

343 

344 The data argument supports two formats: 

345 - JSON string: '{"key": "value", "foo": "bar"}' 

346 - Comma-separated pairs: 'key=value,foo=bar' 

347 

348 Args: 

349 args: Parsed command line arguments from argparse containing 

350 username, data, and other JWT options. 

351 

352 Returns: 

353 Dict[str, Any]: The payload dictionary to encode in the JWT. 

354 

355 Raises: 

356 ValueError: If data contains invalid key=value pairs (missing '='). 

357 

358 Examples: 

359 >>> from argparse import Namespace 

360 >>> args = Namespace(username='alice', data=None) 

361 >>> _payload_from_cli(args) 

362 {'username': 'alice'} 

363 >>> args = Namespace(username=None, data='{"role": "admin", "id": 123}') 

364 >>> _payload_from_cli(args) 

365 {'role': 'admin', 'id': 123} 

366 >>> args = Namespace(username=None, data='name=bob,role=user') 

367 >>> _payload_from_cli(args) 

368 {'name': 'bob', 'role': 'user'} 

369 >>> args = Namespace(username=None, data='invalid_format') 

370 >>> _payload_from_cli(args) # doctest: +ELLIPSIS 

371 Traceback (most recent call last): 

372 ... 

373 ValueError: Invalid key=value pair: 'invalid_format' 

374 """ 

375 if args.username is not None: 

376 return {"username": args.username} 

377 

378 if args.data is not None: 

379 # Attempt JSON first 

380 try: 

381 return orjson.loads(args.data) 

382 except orjson.JSONDecodeError: 

383 pairs = [kv.strip() for kv in args.data.split(",") if kv.strip()] 

384 payload: Dict[str, Any] = {} 

385 for pair in pairs: 

386 if "=" not in pair: 

387 raise ValueError(f"Invalid key=value pair: '{pair}'") 

388 k, v = pair.split("=", 1) 

389 payload[k.strip()] = v.strip() 

390 return payload 

391 

392 # Fallback default payload 

393 return {"username": DEFAULT_USERNAME} 

394 

395 

396# --------------------------------------------------------------------------- 

397# Entry point for ``python3 jwt_cli.py`` 

398# --------------------------------------------------------------------------- 

399 

400 

401def main() -> None: # pragma: no cover 

402 """Entry point for JWT command line interface. 

403 

404 Provides two main modes of operation: 

405 1. Token creation: Generates a new JWT with specified payload 

406 2. Token decoding: Decodes and displays an existing JWT (without verification) 

407 

408 In creation mode, supports: 

409 - Simple username payload (-u/--username) 

410 - Custom JSON or key=value payload (-d/--data) 

411 - Rich tokens with admin/team/scope claims (--admin, --teams, --scopes) 

412 - Configurable expiration, secret, and algorithm 

413 - Optional pretty-printing of payload before encoding 

414 

415 In decode mode, displays the decoded payload as formatted JSON. 

416 

417 The function handles being run in different contexts: 

418 - Direct script execution: Runs synchronously 

419 - Within existing asyncio loop: Delegates to executor to avoid blocking 

420 

421 Examples: 

422 Command line usage:: 

423 

424 # Create token with username 

425 $ python jwt_cli.py -u alice 

426 eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9... 

427 

428 # Create admin token (DEV/TEST ONLY) 

429 $ python jwt_cli.py -u admin@example.com --admin --full-name "Admin User" 

430 ⚠️ WARNING: Creating token with elevated claims... 

431 eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9... 

432 

433 # Create team-scoped token (DEV/TEST ONLY) 

434 $ python jwt_cli.py -u user@example.com --teams team-123,team-456 

435 ⚠️ WARNING: Creating token with elevated claims... 

436 eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9... 

437 

438 # Decode existing token 

439 $ python jwt_cli.py --decode eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9... 

440 { 

441 "username": "alice", 

442 "exp": 1234567890 

443 } 

444 """ 

445 args = _parse_args() 

446 

447 # Decode mode takes precedence 

448 if args.decode: 

449 decoded = _decode_jwt_token(args.decode, algorithms=[args.algo or DEFAULT_ALGO]) 

450 sys.stdout.write(orjson.dumps(decoded, default=str, option=orjson.OPT_INDENT_2).decode()) 

451 sys.stdout.write("\n") 

452 return 

453 

454 # Validate: --algo requires --secret to avoid algorithm/key mismatch 

455 if args.algo and not args.secret: 

456 print( 

457 "ERROR: --algo requires --secret to be specified.\n" 

458 " Using --algo alone would mix config-based keys with a different algorithm,\n" 

459 " which can produce invalid tokens. Either:\n" 

460 " - Provide both --secret and --algo for full override\n" 

461 " - Omit both to use JWT_SECRET_KEY and JWT_ALGORITHM from config", 

462 file=sys.stderr, 

463 ) 

464 sys.exit(1) 

465 

466 # Security warning for rich tokens 

467 if args.admin or args.teams or args.scopes: 

468 print( 

469 "⚠️ WARNING: Creating token with elevated claims (admin/teams/scopes)\n" 

470 " This requires JWT_SECRET_KEY and is intended for development/testing only.\n" 

471 " For production token management, use the /tokens API endpoint.\n", 

472 file=sys.stderr, 

473 ) 

474 

475 payload = _payload_from_cli(args) 

476 

477 # Build rich token parameters if provided 

478 user_data = None 

479 teams = None 

480 scopes_dict = None 

481 

482 if args.admin or args.teams or args.scopes or args.full_name: 

483 user_email = payload.get("sub") or payload.get("username", "admin@example.com") 

484 

485 # Build user data 

486 user_data = { 

487 "email": user_email, 

488 "full_name": args.full_name or "CLI User", 

489 "is_admin": bool(args.admin), 

490 "auth_provider": "cli", # Mark as CLI-generated for auditing 

491 } 

492 

493 # Build teams list 

494 if args.teams: 

495 teams = [t.strip() for t in args.teams.split(",") if t.strip()] 

496 

497 # Build scopes 

498 if args.scopes: 

499 try: 

500 scopes_dict = orjson.loads(args.scopes) 

501 except orjson.JSONDecodeError as e: 

502 print(f"ERROR: Invalid JSON for --scopes: {e}", file=sys.stderr) 

503 sys.exit(1) 

504 

505 if args.pretty: 

506 print("Payload:") 

507 print(orjson.dumps(payload, default=str, option=orjson.OPT_INDENT_2).decode()) 

508 if user_data: 

509 print("User Data:") 

510 print(orjson.dumps(user_data, default=str, option=orjson.OPT_INDENT_2).decode()) 

511 if teams: 

512 print(f"Teams: {teams}") 

513 if scopes_dict: 

514 print("Scopes:") 

515 print(orjson.dumps(scopes_dict, default=str, option=orjson.OPT_INDENT_2).decode()) 

516 print("-") 

517 

518 token = _create_jwt_token(payload, args.exp, args.secret, args.algo, user_data, teams, scopes_dict) 

519 print(token) 

520 

521 

522if __name__ == "__main__": 

523 # Support being run via ``python3 -m mcpgateway.utils.create_jwt_token`` too 

524 try: 

525 # Respect existing asyncio loop if present (e.g. inside uvicorn dev server) 

526 loop = asyncio.get_running_loop() 

527 loop.run_until_complete(asyncio.sleep(0)) # no-op to ensure loop alive 

528 except RuntimeError: 

529 # No loop; we're just a simple CLI call - run main synchronously 

530 main() 

531 else: 

532 # We're inside an active asyncio program - delegate to executor to avoid blocking 

533 loop.run_in_executor(None, main)