Coverage for mcpgateway / utils / create_jwt_token.py: 100%
85 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""Location: ./mcpgateway/utils/create_jwt_token.py
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
6Authors: Mihai Criveti
8jwt_cli.py - generate, inspect, **and be imported** for token helpers.
9* **Run as a script** - friendly CLI (works with *no* flags).
10* **Import as a library** - drop-in async functions `create_jwt_token` & `get_jwt_token`
11 kept for backward-compatibility, now delegating to the shared core helper.
13Quick usage
14-----------
15CLI (default secret, default payload):
16 $ python3 jwt_cli.py
18Library:
19 from mcpgateway.utils.create_jwt_token import create_jwt_token, get_jwt_token
21 # inside async context
22 jwt = await create_jwt_token({"username": "alice"})
24Doctest examples
25----------------
26>>> from mcpgateway.utils import create_jwt_token as jwt_util
27>>> from mcpgateway.utils.jwt_config_helper import clear_jwt_caches
28>>> clear_jwt_caches()
29>>> jwt_util.settings.jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
30>>> jwt_util.settings.jwt_algorithm = 'HS256'
31>>> token = jwt_util._create_jwt_token({'sub': 'alice'}, expires_in_minutes=1, secret='this-is-a-long-test-secret-key-32chars', algorithm='HS256')
32>>> import jwt
33>>> jwt.decode(token, 'this-is-a-long-test-secret-key-32chars', algorithms=['HS256'], audience=jwt_util.settings.jwt_audience, issuer=jwt_util.settings.jwt_issuer)['sub'] == 'alice'
34True
35>>> import asyncio
36>>> t = asyncio.run(jwt_util.create_jwt_token({'sub': 'bob'}, expires_in_minutes=1, secret='this-is-a-long-test-secret-key-32chars', algorithm='HS256'))
37>>> jwt.decode(t, 'this-is-a-long-test-secret-key-32chars', algorithms=['HS256'], audience=jwt_util.settings.jwt_audience, issuer=jwt_util.settings.jwt_issuer)['sub'] == 'bob'
38True
39"""
41# Future
42from __future__ import annotations
44# Standard
45import argparse
46import asyncio
47import datetime as _dt
48import sys
49from typing import Any, Dict, List, Optional, Sequence
50import uuid
52# Third-Party
53import jwt # PyJWT
54import orjson
56# First-Party
57from mcpgateway.config import settings
59__all__: Sequence[str] = (
60 "create_jwt_token",
61 "get_jwt_token",
62 "_create_jwt_token",
63)
65# ---------------------------------------------------------------------------
66# Defaults & constants
67# ---------------------------------------------------------------------------
68# Note: DEFAULT_SECRET is retrieved at runtime to support dynamic configuration changes
69DEFAULT_ALGO: str = settings.jwt_algorithm
70DEFAULT_EXP_MINUTES: int = settings.token_expiry
71DEFAULT_USERNAME: str = settings.basic_auth_user
72_TEAMS_UNSET = object()
75# ---------------------------------------------------------------------------
76# Core sync helper (used by both CLI & async wrappers)
77# ---------------------------------------------------------------------------
80def _create_jwt_token(
81 data: Dict[str, Any],
82 expires_in_minutes: int = DEFAULT_EXP_MINUTES,
83 secret: str = "", # nosec B107 - Optional override; uses config if empty
84 algorithm: str = "", # Optional override; uses config if empty
85 user_data: Optional[Dict[str, Any]] = None,
86 teams: Optional[List[str]] | object = _TEAMS_UNSET,
87 scopes: Optional[Dict[str, Any]] = None,
88) -> str:
89 """Create a signed JWT token with automatic key selection and validation.
91 This internal function handles JWT token creation with both symmetric (HMAC) and
92 asymmetric (RSA/ECDSA) algorithms. It automatically validates the JWT configuration,
93 selects the appropriate signing key based on the configured algorithm, and creates
94 a properly formatted JWT token with standard claims.
96 Supports both simple tokens (minimal claims) and rich tokens (with user, teams,
97 and scopes). This enables consistent token format across CLI and API
98 token creation paths.
100 Args:
101 data: Dictionary containing payload data to encode in the token.
102 expires_in_minutes: Token expiration time in minutes. Set to 0 to disable expiration.
103 secret: Optional secret key for signing. If empty, uses JWT_SECRET_KEY from config.
104 algorithm: Optional signing algorithm. If empty, uses JWT_ALGORITHM from config.
105 user_data: Optional user information dict with keys: email, full_name, is_admin, auth_provider.
106 teams: Optional list of team IDs the token is scoped to.
107 Pass ``None`` explicitly to serialize ``"teams": null``.
108 Omit the argument to leave the claim unchanged/absent.
109 scopes: Optional scopes dict with keys: server_id, permissions, ip_restrictions, time_restrictions.
111 Returns:
112 str: The signed JWT token string.
114 Raises:
115 JWTConfigurationError: If JWT configuration is invalid or keys are missing.
116 FileNotFoundError: If asymmetric key files don't exist.
118 Note:
119 This is an internal function. Use create_jwt_token() for the async interface.
120 When secret/algorithm are provided, they override the configuration values.
121 When not provided (empty), configuration values are used as defaults.
122 """
123 # First-Party
124 from mcpgateway.utils.jwt_config_helper import get_jwt_private_key_or_secret, validate_jwt_algo_and_keys
126 # Use provided secret/algorithm or fall back to configuration
127 if not secret:
128 validate_jwt_algo_and_keys()
129 secret = get_jwt_private_key_or_secret()
130 if not algorithm:
131 algorithm = settings.jwt_algorithm
133 payload = data.copy()
134 now = _dt.datetime.now(_dt.timezone.utc)
136 # Add standard JWT claims
137 payload["iat"] = int(now.timestamp()) # Issued at
138 payload["iss"] = settings.jwt_issuer # Issuer
139 payload["aud"] = settings.jwt_audience # Audience
140 payload["jti"] = payload.get("jti") or str(uuid.uuid4()) # JWT ID for revocation support
142 # Optionally embed environment claim for cross-environment isolation
143 if settings.embed_environment_in_tokens:
144 payload["env"] = settings.environment
146 # Handle legacy username format - convert to sub for consistency
147 if "username" in payload and "sub" not in payload:
148 payload["sub"] = payload["username"]
150 # Add rich claims if provided (for API/CLI token parity)
151 if user_data:
152 payload["user"] = user_data
154 if teams is not _TEAMS_UNSET:
155 payload["teams"] = teams
157 if scopes is not None:
158 payload["scopes"] = scopes
160 payload_exp = payload.get("exp", 0)
161 if payload_exp > 0:
162 pass # The token already has a valid expiration time
163 elif expires_in_minutes > 0:
164 expire = now + _dt.timedelta(minutes=expires_in_minutes)
165 payload["exp"] = int(expire.timestamp())
166 else:
167 # Warn about non-expiring token
168 print(
169 "⚠️ WARNING: Creating token without expiration. This is a security risk!\n"
170 " Consider using --exp with a value > 0 for production use.\n"
171 " Once JWT API (#425) is available, use it for automatic token renewal.",
172 file=sys.stderr,
173 )
175 return jwt.encode(payload, secret, algorithm=algorithm)
178# ---------------------------------------------------------------------------
179# **Async** wrappers for backward compatibility
180# ---------------------------------------------------------------------------
183async def create_jwt_token(
184 data: Dict[str, Any],
185 expires_in_minutes: int = DEFAULT_EXP_MINUTES,
186 *,
187 secret: str = None,
188 algorithm: str = None,
189 user_data: Optional[Dict[str, Any]] = None,
190 teams: Optional[List[str]] | object = _TEAMS_UNSET,
191 scopes: Optional[Dict[str, Any]] = None,
192) -> str:
193 """
194 Async facade for historic code. Internally synchronous-almost instant.
196 Args:
197 data: Dictionary containing payload data to encode in the token.
198 expires_in_minutes: Token expiration time in minutes. Default is 7 days.
199 Set to 0 to disable expiration.
200 secret: Optional secret key for signing. If None/empty, uses JWT_SECRET_KEY from config.
201 algorithm: Optional signing algorithm. If None/empty, uses JWT_ALGORITHM from config.
202 user_data: Optional user information dict with keys: email, full_name, is_admin, auth_provider.
203 teams: Optional list of team IDs the token is scoped to.
204 Pass ``None`` explicitly to serialize ``"teams": null``.
205 Omit to leave teams absent.
206 scopes: Optional scopes dict with keys: server_id, permissions, ip_restrictions, time_restrictions.
208 Returns:
209 The JWT token string.
211 Doctest:
212 >>> from mcpgateway.utils import create_jwt_token as jwt_util
213 >>> from mcpgateway.utils.jwt_config_helper import clear_jwt_caches
214 >>> clear_jwt_caches()
215 >>> jwt_util.settings.jwt_secret_key = 'this-is-a-long-test-secret-key-32chars'
216 >>> jwt_util.settings.jwt_algorithm = 'HS256'
217 >>> import asyncio
218 >>> t = asyncio.run(jwt_util.create_jwt_token({'sub': 'bob'}, expires_in_minutes=1))
219 >>> import jwt
220 >>> jwt.decode(t, jwt_util.settings.jwt_secret_key, algorithms=[jwt_util.settings.jwt_algorithm], audience=jwt_util.settings.jwt_audience, issuer=jwt_util.settings.jwt_issuer)['sub'] == 'bob'
221 True
222 """
223 # Pass through secret/algorithm; _create_jwt_token will use config as fallback
224 return _create_jwt_token(data, expires_in_minutes, secret or "", algorithm or "", user_data, teams, scopes)
227async def get_jwt_token() -> str:
228 """Return a token for ``{"username": "admin"}``, mirroring old behaviour.
230 Returns:
231 The JWT token string with default admin username.
232 """
233 user_data = {"username": DEFAULT_USERNAME}
234 return await create_jwt_token(user_data)
237# ---------------------------------------------------------------------------
238# **Decode** helper (non-verifying) - used by the CLI
239# ---------------------------------------------------------------------------
242def _decode_jwt_token(token: str, algorithms: List[str] | None = None) -> Dict[str, Any]:
243 """Decode with proper audience and issuer verification.
245 Args:
246 token: JWT token string to decode.
247 algorithms: List of allowed algorithms for decoding. Defaults to [DEFAULT_ALGO].
249 Returns:
250 Dictionary containing the decoded payload.
252 Examples:
253 >>> # Test algorithm parameter handling
254 >>> algs = ['HS256', 'HS512']
255 >>> len(algs)
256 2
257 >>> 'HS256' in algs
258 True
259 >>> # Test None algorithms handling
260 >>> default_algo = [DEFAULT_ALGO]
261 >>> isinstance(default_algo, list)
262 True
263 """
264 # Get the actual string value from SecretStr if needed
265 secret_key = settings.jwt_secret_key.get_secret_value() if hasattr(settings.jwt_secret_key, "get_secret_value") else settings.jwt_secret_key
266 return jwt.decode(
267 token,
268 secret_key,
269 algorithms=algorithms or [DEFAULT_ALGO],
270 audience=settings.jwt_audience,
271 issuer=settings.jwt_issuer,
272 # options={"require": ["exp"]}, # Require expiration
273 )
276# ---------------------------------------------------------------------------
277# CLI Parsing & helpers
278# ---------------------------------------------------------------------------
281def _parse_args():
282 """Parse command line arguments for JWT token operations.
284 Sets up an argument parser with mutually exclusive options for:
285 - Creating tokens with username (-u/--username)
286 - Creating tokens with custom data (-d/--data)
287 - Decoding existing tokens (--decode)
289 Additional options control expiration, secret key, algorithm, and output format.
291 Returns:
292 argparse.Namespace: Parsed command line arguments containing:
293 - username: Optional username for simple payload
294 - data: Optional JSON or key=value pairs for custom payload
295 - decode: Optional token string to decode
296 - exp: Expiration time in minutes (default: DEFAULT_EXP_MINUTES)
297 - secret: Secret key for signing (default: "" - uses JWT_SECRET_KEY from config)
298 - algo: Signing algorithm (default: "" - uses JWT_ALGORITHM from config)
299 - pretty: Whether to pretty-print payload before encoding
301 Examples:
302 >>> # Simulating command line args
303 >>> import sys
304 >>> sys.argv = ['jwt_cli.py', '-u', 'alice', '-e', '60']
305 >>> args = _parse_args() # doctest: +SKIP
306 >>> args.username # doctest: +SKIP
307 'alice'
308 >>> args.exp # doctest: +SKIP
309 60
310 """
311 p = argparse.ArgumentParser(
312 description="Generate or inspect JSON Web Tokens.",
313 formatter_class=argparse.ArgumentDefaultsHelpFormatter,
314 )
316 group = p.add_mutually_exclusive_group()
317 group.add_argument("-u", "--username", help="Add username=<value> to the payload.")
318 group.add_argument("-d", "--data", help="Raw JSON payload or comma-separated key=value pairs.")
319 group.add_argument("--decode", metavar="TOKEN", help="Token string to decode (no verification).")
321 p.add_argument(
322 "-e",
323 "--exp",
324 type=int,
325 default=DEFAULT_EXP_MINUTES,
326 help="Expiration in minutes (0 disables the exp claim).",
327 )
328 p.add_argument("-s", "--secret", default="", help="Secret key for signing. If not provided, uses JWT_SECRET_KEY from config.")
329 p.add_argument("--algo", default="", help="Signing algorithm (e.g., HS256, RS256). If not provided, uses JWT_ALGORITHM from config.")
330 p.add_argument("--pretty", action="store_true", help="Pretty-print payload before encoding.")
332 # Rich token creation arguments (requires JWT_SECRET_KEY)
333 p.add_argument("--admin", action="store_true", help="Mark user as admin (DEV/TEST ONLY - requires JWT_SECRET_KEY)")
334 p.add_argument("--teams", help="Comma-separated team IDs (e.g., team-123,team-456). DEV/TEST ONLY")
335 p.add_argument("--scopes", help='JSON scopes object for permission restrictions (e.g., \'{"permissions": ["tools.read"]}\'). DEV/TEST ONLY')
336 p.add_argument("--full-name", help="User's full name for the token")
338 return p.parse_args()
341def _payload_from_cli(args) -> Dict[str, Any]:
342 """Extract JWT payload from parsed command line arguments.
344 Processes arguments in priority order:
345 1. If username is specified, creates {"username": <value>}
346 2. If data is specified, parses as JSON or key=value pairs
347 3. Otherwise, returns default payload with admin username
349 The data argument supports two formats:
350 - JSON string: '{"key": "value", "foo": "bar"}'
351 - Comma-separated pairs: 'key=value,foo=bar'
353 Args:
354 args: Parsed command line arguments from argparse containing
355 username, data, and other JWT options.
357 Returns:
358 Dict[str, Any]: The payload dictionary to encode in the JWT.
360 Raises:
361 ValueError: If data contains invalid key=value pairs (missing '=').
363 Examples:
364 >>> from argparse import Namespace
365 >>> args = Namespace(username='alice', data=None)
366 >>> _payload_from_cli(args)
367 {'username': 'alice'}
368 >>> args = Namespace(username=None, data='{"role": "admin", "id": 123}')
369 >>> _payload_from_cli(args)
370 {'role': 'admin', 'id': 123}
371 >>> args = Namespace(username=None, data='name=bob,role=user')
372 >>> _payload_from_cli(args)
373 {'name': 'bob', 'role': 'user'}
374 >>> args = Namespace(username=None, data='invalid_format')
375 >>> _payload_from_cli(args) # doctest: +ELLIPSIS
376 Traceback (most recent call last):
377 ...
378 ValueError: Invalid key=value pair: 'invalid_format'
379 """
380 if args.username is not None:
381 return {"username": args.username}
383 if args.data is not None:
384 # Attempt JSON first
385 try:
386 return orjson.loads(args.data)
387 except orjson.JSONDecodeError:
388 pairs = [kv.strip() for kv in args.data.split(",") if kv.strip()]
389 payload: Dict[str, Any] = {}
390 for pair in pairs:
391 if "=" not in pair:
392 raise ValueError(f"Invalid key=value pair: '{pair}'")
393 k, v = pair.split("=", 1)
394 payload[k.strip()] = v.strip()
395 return payload
397 # Fallback default payload
398 return {"username": DEFAULT_USERNAME}
401# ---------------------------------------------------------------------------
402# Entry point for ``python3 jwt_cli.py``
403# ---------------------------------------------------------------------------
406def main() -> None: # pragma: no cover
407 """Entry point for JWT command line interface.
409 Provides two main modes of operation:
410 1. Token creation: Generates a new JWT with specified payload
411 2. Token decoding: Decodes and displays an existing JWT (without verification)
413 In creation mode, supports:
414 - Simple username payload (-u/--username)
415 - Custom JSON or key=value payload (-d/--data)
416 - Rich tokens with admin/team/scope claims (--admin, --teams, --scopes)
417 - Configurable expiration, secret, and algorithm
418 - Optional pretty-printing of payload before encoding
420 In decode mode, displays the decoded payload as formatted JSON.
422 The function handles being run in different contexts:
423 - Direct script execution: Runs synchronously
424 - Within existing asyncio loop: Delegates to executor to avoid blocking
426 Examples:
427 Command line usage::
429 # Create token with username
430 $ python jwt_cli.py -u alice
431 eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...
433 # Create admin token (DEV/TEST ONLY)
434 $ python jwt_cli.py -u admin@example.com --admin --full-name "Admin User"
435 ⚠️ WARNING: Creating token with elevated claims...
436 eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...
438 # Create team-scoped token (DEV/TEST ONLY)
439 $ python jwt_cli.py -u user@example.com --teams team-123,team-456
440 ⚠️ WARNING: Creating token with elevated claims...
441 eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...
443 # Decode existing token
444 $ python jwt_cli.py --decode eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...
445 {
446 "username": "alice",
447 "exp": 1234567890
448 }
449 """
450 args = _parse_args()
452 # Decode mode takes precedence
453 if args.decode:
454 decoded = _decode_jwt_token(args.decode, algorithms=[args.algo or DEFAULT_ALGO])
455 sys.stdout.write(orjson.dumps(decoded, default=str, option=orjson.OPT_INDENT_2).decode())
456 sys.stdout.write("\n")
457 return
459 # Validate: --algo requires --secret to avoid algorithm/key mismatch
460 if args.algo and not args.secret:
461 print(
462 "ERROR: --algo requires --secret to be specified.\n"
463 " Using --algo alone would mix config-based keys with a different algorithm,\n"
464 " which can produce invalid tokens. Either:\n"
465 " - Provide both --secret and --algo for full override\n"
466 " - Omit both to use JWT_SECRET_KEY and JWT_ALGORITHM from config",
467 file=sys.stderr,
468 )
469 sys.exit(1)
471 # Security warning for rich tokens
472 if args.admin or args.teams or args.scopes:
473 print(
474 "⚠️ WARNING: Creating token with elevated claims (admin/teams/scopes)\n"
475 " This requires JWT_SECRET_KEY and is intended for development/testing only.\n"
476 " For production token management, use the /tokens API endpoint.\n",
477 file=sys.stderr,
478 )
480 payload = _payload_from_cli(args)
482 # Build rich token parameters if provided
483 user_data = None
484 teams: object = _TEAMS_UNSET
485 scopes_dict = None
487 if args.admin or args.teams or args.scopes or args.full_name:
488 user_email = payload.get("sub") or payload.get("username", "admin@example.com")
490 # Build user data
491 user_data = {
492 "email": user_email,
493 "full_name": args.full_name or "CLI User",
494 "is_admin": bool(args.admin),
495 "auth_provider": "cli", # Mark as CLI-generated for auditing
496 }
498 # Build teams claim. In rich-token mode, explicit null preserves
499 # normalize_token_teams semantics for admin bypass when intended.
500 teams = None
501 if args.teams:
502 teams = [t.strip() for t in args.teams.split(",") if t.strip()]
504 # Build scopes
505 if args.scopes:
506 try:
507 scopes_dict = orjson.loads(args.scopes)
508 except orjson.JSONDecodeError as e:
509 print(f"ERROR: Invalid JSON for --scopes: {e}", file=sys.stderr)
510 sys.exit(1)
512 if args.pretty:
513 print("Payload:")
514 print(orjson.dumps(payload, default=str, option=orjson.OPT_INDENT_2).decode())
515 if user_data:
516 print("User Data:")
517 print(orjson.dumps(user_data, default=str, option=orjson.OPT_INDENT_2).decode())
518 if teams:
519 print(f"Teams: {teams}")
520 if scopes_dict:
521 print("Scopes:")
522 print(orjson.dumps(scopes_dict, default=str, option=orjson.OPT_INDENT_2).decode())
523 print("-")
525 token = _create_jwt_token(
526 payload,
527 args.exp,
528 args.secret,
529 args.algo,
530 user_data=user_data,
531 teams=teams,
532 scopes=scopes_dict,
533 )
534 print(token)
537if __name__ == "__main__":
538 # Support being run via ``python3 -m mcpgateway.utils.create_jwt_token`` too
539 try:
540 # Respect existing asyncio loop if present (e.g. inside uvicorn dev server)
541 loop = asyncio.get_running_loop()
542 loop.run_until_complete(asyncio.sleep(0)) # no-op to ensure loop alive
543 except RuntimeError:
544 # No loop; we're just a simple CLI call - run main synchronously
545 main()
546 else:
547 # We're inside an active asyncio program - delegate to executor to avoid blocking
548 loop.run_in_executor(None, main)