Coverage for mcpgateway / utils / create_jwt_token.py: 100%
84 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""Location: ./mcpgateway/utils/create_jwt_token.py
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
6Authors: Mihai Criveti
8jwt_cli.py - generate, inspect, **and be imported** for token helpers.
9* **Run as a script** - friendly CLI (works with *no* flags).
10* **Import as a library** - drop-in async functions `create_jwt_token` & `get_jwt_token`
11 kept for backward-compatibility, now delegating to the shared core helper.
13Quick usage
14-----------
15CLI (default secret, default payload):
16 $ python3 jwt_cli.py
18Library:
19 from mcpgateway.utils.create_jwt_token import create_jwt_token, get_jwt_token
21 # inside async context
22 jwt = await create_jwt_token({"username": "alice"})
24Doctest examples
25----------------
26>>> from mcpgateway.utils import create_jwt_token as jwt_util
27>>> from mcpgateway.utils.jwt_config_helper import clear_jwt_caches
28>>> clear_jwt_caches()
29>>> jwt_util.settings.jwt_secret_key = 'secret'
30>>> jwt_util.settings.jwt_algorithm = 'HS256'
31>>> token = jwt_util._create_jwt_token({'sub': 'alice'}, expires_in_minutes=1, secret='secret', algorithm='HS256')
32>>> import jwt
33>>> jwt.decode(token, 'secret', algorithms=['HS256'], audience=jwt_util.settings.jwt_audience, issuer=jwt_util.settings.jwt_issuer)['sub'] == 'alice'
34True
35>>> import asyncio
36>>> t = asyncio.run(jwt_util.create_jwt_token({'sub': 'bob'}, expires_in_minutes=1, secret='secret', algorithm='HS256'))
37>>> jwt.decode(t, 'secret', algorithms=['HS256'], audience=jwt_util.settings.jwt_audience, issuer=jwt_util.settings.jwt_issuer)['sub'] == 'bob'
38True
39"""
41# Future
42from __future__ import annotations
44# Standard
45import argparse
46import asyncio
47import datetime as _dt
48import sys
49from typing import Any, Dict, List, Optional, Sequence
50import uuid
52# Third-Party
53import jwt # PyJWT
54import orjson
56# First-Party
57from mcpgateway.config import settings
59__all__: Sequence[str] = (
60 "create_jwt_token",
61 "get_jwt_token",
62 "_create_jwt_token",
63)
65# ---------------------------------------------------------------------------
66# Defaults & constants
67# ---------------------------------------------------------------------------
68# Note: DEFAULT_SECRET is retrieved at runtime to support dynamic configuration changes
69DEFAULT_ALGO: str = settings.jwt_algorithm
70DEFAULT_EXP_MINUTES: int = settings.token_expiry
71DEFAULT_USERNAME: str = settings.basic_auth_user
74# ---------------------------------------------------------------------------
75# Core sync helper (used by both CLI & async wrappers)
76# ---------------------------------------------------------------------------
79def _create_jwt_token(
80 data: Dict[str, Any],
81 expires_in_minutes: int = DEFAULT_EXP_MINUTES,
82 secret: str = "", # nosec B107 - Optional override; uses config if empty
83 algorithm: str = "", # Optional override; uses config if empty
84 user_data: Optional[Dict[str, Any]] = None,
85 teams: Optional[List[str]] = None,
86 scopes: Optional[Dict[str, Any]] = None,
87) -> str:
88 """Create a signed JWT token with automatic key selection and validation.
90 This internal function handles JWT token creation with both symmetric (HMAC) and
91 asymmetric (RSA/ECDSA) algorithms. It automatically validates the JWT configuration,
92 selects the appropriate signing key based on the configured algorithm, and creates
93 a properly formatted JWT token with standard claims.
95 Supports both simple tokens (minimal claims) and rich tokens (with user, teams,
96 and scopes). This enables consistent token format across CLI and API
97 token creation paths.
99 Args:
100 data: Dictionary containing payload data to encode in the token.
101 expires_in_minutes: Token expiration time in minutes. Set to 0 to disable expiration.
102 secret: Optional secret key for signing. If empty, uses JWT_SECRET_KEY from config.
103 algorithm: Optional signing algorithm. If empty, uses JWT_ALGORITHM from config.
104 user_data: Optional user information dict with keys: email, full_name, is_admin, auth_provider.
105 teams: Optional list of team IDs the token is scoped to.
106 scopes: Optional scopes dict with keys: server_id, permissions, ip_restrictions, time_restrictions.
108 Returns:
109 str: The signed JWT token string.
111 Raises:
112 JWTConfigurationError: If JWT configuration is invalid or keys are missing.
113 FileNotFoundError: If asymmetric key files don't exist.
115 Note:
116 This is an internal function. Use create_jwt_token() for the async interface.
117 When secret/algorithm are provided, they override the configuration values.
118 When not provided (empty), configuration values are used as defaults.
119 """
120 # First-Party
121 from mcpgateway.utils.jwt_config_helper import get_jwt_private_key_or_secret, validate_jwt_algo_and_keys
123 # Use provided secret/algorithm or fall back to configuration
124 if not secret:
125 validate_jwt_algo_and_keys()
126 secret = get_jwt_private_key_or_secret()
127 if not algorithm:
128 algorithm = settings.jwt_algorithm
130 payload = data.copy()
131 now = _dt.datetime.now(_dt.timezone.utc)
133 # Add standard JWT claims
134 payload["iat"] = int(now.timestamp()) # Issued at
135 payload["iss"] = settings.jwt_issuer # Issuer
136 payload["aud"] = settings.jwt_audience # Audience
137 payload["jti"] = payload.get("jti") or str(uuid.uuid4()) # JWT ID for revocation support
139 # Optionally embed environment claim for cross-environment isolation
140 if settings.embed_environment_in_tokens:
141 payload["env"] = settings.environment
143 # Handle legacy username format - convert to sub for consistency
144 if "username" in payload and "sub" not in payload:
145 payload["sub"] = payload["username"]
147 # Add rich claims if provided (for API/CLI token parity)
148 if user_data:
149 payload["user"] = user_data
151 if teams is not None:
152 payload["teams"] = teams
154 if scopes is not None:
155 payload["scopes"] = scopes
157 payload_exp = payload.get("exp", 0)
158 if payload_exp > 0:
159 pass # The token already has a valid expiration time
160 elif expires_in_minutes > 0:
161 expire = now + _dt.timedelta(minutes=expires_in_minutes)
162 payload["exp"] = int(expire.timestamp())
163 else:
164 # Warn about non-expiring token
165 print(
166 "⚠️ WARNING: Creating token without expiration. This is a security risk!\n"
167 " Consider using --exp with a value > 0 for production use.\n"
168 " Once JWT API (#425) is available, use it for automatic token renewal.",
169 file=sys.stderr,
170 )
172 return jwt.encode(payload, secret, algorithm=algorithm)
175# ---------------------------------------------------------------------------
176# **Async** wrappers for backward compatibility
177# ---------------------------------------------------------------------------
180async def create_jwt_token(
181 data: Dict[str, Any],
182 expires_in_minutes: int = DEFAULT_EXP_MINUTES,
183 *,
184 secret: str = None,
185 algorithm: str = None,
186 user_data: Optional[Dict[str, Any]] = None,
187 teams: Optional[List[str]] = None,
188 scopes: Optional[Dict[str, Any]] = None,
189) -> str:
190 """
191 Async facade for historic code. Internally synchronous-almost instant.
193 Args:
194 data: Dictionary containing payload data to encode in the token.
195 expires_in_minutes: Token expiration time in minutes. Default is 7 days.
196 Set to 0 to disable expiration.
197 secret: Optional secret key for signing. If None/empty, uses JWT_SECRET_KEY from config.
198 algorithm: Optional signing algorithm. If None/empty, uses JWT_ALGORITHM from config.
199 user_data: Optional user information dict with keys: email, full_name, is_admin, auth_provider.
200 teams: Optional list of team IDs the token is scoped to.
201 scopes: Optional scopes dict with keys: server_id, permissions, ip_restrictions, time_restrictions.
203 Returns:
204 The JWT token string.
206 Doctest:
207 >>> from mcpgateway.utils import create_jwt_token as jwt_util
208 >>> from mcpgateway.utils.jwt_config_helper import clear_jwt_caches
209 >>> clear_jwt_caches()
210 >>> jwt_util.settings.jwt_secret_key = 'secret'
211 >>> jwt_util.settings.jwt_algorithm = 'HS256'
212 >>> import asyncio
213 >>> t = asyncio.run(jwt_util.create_jwt_token({'sub': 'bob'}, expires_in_minutes=1))
214 >>> import jwt
215 >>> jwt.decode(t, jwt_util.settings.jwt_secret_key, algorithms=[jwt_util.settings.jwt_algorithm], audience=jwt_util.settings.jwt_audience, issuer=jwt_util.settings.jwt_issuer)['sub'] == 'bob'
216 True
217 """
218 # Pass through secret/algorithm; _create_jwt_token will use config as fallback
219 return _create_jwt_token(data, expires_in_minutes, secret or "", algorithm or "", user_data, teams, scopes)
222async def get_jwt_token() -> str:
223 """Return a token for ``{"username": "admin"}``, mirroring old behaviour.
225 Returns:
226 The JWT token string with default admin username.
227 """
228 user_data = {"username": DEFAULT_USERNAME}
229 return await create_jwt_token(user_data)
232# ---------------------------------------------------------------------------
233# **Decode** helper (non-verifying) - used by the CLI
234# ---------------------------------------------------------------------------
237def _decode_jwt_token(token: str, algorithms: List[str] | None = None) -> Dict[str, Any]:
238 """Decode with proper audience and issuer verification.
240 Args:
241 token: JWT token string to decode.
242 algorithms: List of allowed algorithms for decoding. Defaults to [DEFAULT_ALGO].
244 Returns:
245 Dictionary containing the decoded payload.
247 Examples:
248 >>> # Test algorithm parameter handling
249 >>> algs = ['HS256', 'HS512']
250 >>> len(algs)
251 2
252 >>> 'HS256' in algs
253 True
254 >>> # Test None algorithms handling
255 >>> default_algo = [DEFAULT_ALGO]
256 >>> isinstance(default_algo, list)
257 True
258 """
259 # Get the actual string value from SecretStr if needed
260 secret_key = settings.jwt_secret_key.get_secret_value() if hasattr(settings.jwt_secret_key, "get_secret_value") else settings.jwt_secret_key
261 return jwt.decode(
262 token,
263 secret_key,
264 algorithms=algorithms or [DEFAULT_ALGO],
265 audience=settings.jwt_audience,
266 issuer=settings.jwt_issuer,
267 # options={"require": ["exp"]}, # Require expiration
268 )
271# ---------------------------------------------------------------------------
272# CLI Parsing & helpers
273# ---------------------------------------------------------------------------
276def _parse_args():
277 """Parse command line arguments for JWT token operations.
279 Sets up an argument parser with mutually exclusive options for:
280 - Creating tokens with username (-u/--username)
281 - Creating tokens with custom data (-d/--data)
282 - Decoding existing tokens (--decode)
284 Additional options control expiration, secret key, algorithm, and output format.
286 Returns:
287 argparse.Namespace: Parsed command line arguments containing:
288 - username: Optional username for simple payload
289 - data: Optional JSON or key=value pairs for custom payload
290 - decode: Optional token string to decode
291 - exp: Expiration time in minutes (default: DEFAULT_EXP_MINUTES)
292 - secret: Secret key for signing (default: "" - uses JWT_SECRET_KEY from config)
293 - algo: Signing algorithm (default: "" - uses JWT_ALGORITHM from config)
294 - pretty: Whether to pretty-print payload before encoding
296 Examples:
297 >>> # Simulating command line args
298 >>> import sys
299 >>> sys.argv = ['jwt_cli.py', '-u', 'alice', '-e', '60']
300 >>> args = _parse_args() # doctest: +SKIP
301 >>> args.username # doctest: +SKIP
302 'alice'
303 >>> args.exp # doctest: +SKIP
304 60
305 """
306 p = argparse.ArgumentParser(
307 description="Generate or inspect JSON Web Tokens.",
308 formatter_class=argparse.ArgumentDefaultsHelpFormatter,
309 )
311 group = p.add_mutually_exclusive_group()
312 group.add_argument("-u", "--username", help="Add username=<value> to the payload.")
313 group.add_argument("-d", "--data", help="Raw JSON payload or comma-separated key=value pairs.")
314 group.add_argument("--decode", metavar="TOKEN", help="Token string to decode (no verification).")
316 p.add_argument(
317 "-e",
318 "--exp",
319 type=int,
320 default=DEFAULT_EXP_MINUTES,
321 help="Expiration in minutes (0 disables the exp claim).",
322 )
323 p.add_argument("-s", "--secret", default="", help="Secret key for signing. If not provided, uses JWT_SECRET_KEY from config.")
324 p.add_argument("--algo", default="", help="Signing algorithm (e.g., HS256, RS256). If not provided, uses JWT_ALGORITHM from config.")
325 p.add_argument("--pretty", action="store_true", help="Pretty-print payload before encoding.")
327 # Rich token creation arguments (requires JWT_SECRET_KEY)
328 p.add_argument("--admin", action="store_true", help="Mark user as admin (DEV/TEST ONLY - requires JWT_SECRET_KEY)")
329 p.add_argument("--teams", help="Comma-separated team IDs (e.g., team-123,team-456). DEV/TEST ONLY")
330 p.add_argument("--scopes", help='JSON scopes object for permission restrictions (e.g., \'{"permissions": ["tools.read"]}\'). DEV/TEST ONLY')
331 p.add_argument("--full-name", help="User's full name for the token")
333 return p.parse_args()
336def _payload_from_cli(args) -> Dict[str, Any]:
337 """Extract JWT payload from parsed command line arguments.
339 Processes arguments in priority order:
340 1. If username is specified, creates {"username": <value>}
341 2. If data is specified, parses as JSON or key=value pairs
342 3. Otherwise, returns default payload with admin username
344 The data argument supports two formats:
345 - JSON string: '{"key": "value", "foo": "bar"}'
346 - Comma-separated pairs: 'key=value,foo=bar'
348 Args:
349 args: Parsed command line arguments from argparse containing
350 username, data, and other JWT options.
352 Returns:
353 Dict[str, Any]: The payload dictionary to encode in the JWT.
355 Raises:
356 ValueError: If data contains invalid key=value pairs (missing '=').
358 Examples:
359 >>> from argparse import Namespace
360 >>> args = Namespace(username='alice', data=None)
361 >>> _payload_from_cli(args)
362 {'username': 'alice'}
363 >>> args = Namespace(username=None, data='{"role": "admin", "id": 123}')
364 >>> _payload_from_cli(args)
365 {'role': 'admin', 'id': 123}
366 >>> args = Namespace(username=None, data='name=bob,role=user')
367 >>> _payload_from_cli(args)
368 {'name': 'bob', 'role': 'user'}
369 >>> args = Namespace(username=None, data='invalid_format')
370 >>> _payload_from_cli(args) # doctest: +ELLIPSIS
371 Traceback (most recent call last):
372 ...
373 ValueError: Invalid key=value pair: 'invalid_format'
374 """
375 if args.username is not None:
376 return {"username": args.username}
378 if args.data is not None:
379 # Attempt JSON first
380 try:
381 return orjson.loads(args.data)
382 except orjson.JSONDecodeError:
383 pairs = [kv.strip() for kv in args.data.split(",") if kv.strip()]
384 payload: Dict[str, Any] = {}
385 for pair in pairs:
386 if "=" not in pair:
387 raise ValueError(f"Invalid key=value pair: '{pair}'")
388 k, v = pair.split("=", 1)
389 payload[k.strip()] = v.strip()
390 return payload
392 # Fallback default payload
393 return {"username": DEFAULT_USERNAME}
396# ---------------------------------------------------------------------------
397# Entry point for ``python3 jwt_cli.py``
398# ---------------------------------------------------------------------------
401def main() -> None: # pragma: no cover
402 """Entry point for JWT command line interface.
404 Provides two main modes of operation:
405 1. Token creation: Generates a new JWT with specified payload
406 2. Token decoding: Decodes and displays an existing JWT (without verification)
408 In creation mode, supports:
409 - Simple username payload (-u/--username)
410 - Custom JSON or key=value payload (-d/--data)
411 - Rich tokens with admin/team/scope claims (--admin, --teams, --scopes)
412 - Configurable expiration, secret, and algorithm
413 - Optional pretty-printing of payload before encoding
415 In decode mode, displays the decoded payload as formatted JSON.
417 The function handles being run in different contexts:
418 - Direct script execution: Runs synchronously
419 - Within existing asyncio loop: Delegates to executor to avoid blocking
421 Examples:
422 Command line usage::
424 # Create token with username
425 $ python jwt_cli.py -u alice
426 eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...
428 # Create admin token (DEV/TEST ONLY)
429 $ python jwt_cli.py -u admin@example.com --admin --full-name "Admin User"
430 ⚠️ WARNING: Creating token with elevated claims...
431 eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...
433 # Create team-scoped token (DEV/TEST ONLY)
434 $ python jwt_cli.py -u user@example.com --teams team-123,team-456
435 ⚠️ WARNING: Creating token with elevated claims...
436 eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...
438 # Decode existing token
439 $ python jwt_cli.py --decode eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...
440 {
441 "username": "alice",
442 "exp": 1234567890
443 }
444 """
445 args = _parse_args()
447 # Decode mode takes precedence
448 if args.decode:
449 decoded = _decode_jwt_token(args.decode, algorithms=[args.algo or DEFAULT_ALGO])
450 sys.stdout.write(orjson.dumps(decoded, default=str, option=orjson.OPT_INDENT_2).decode())
451 sys.stdout.write("\n")
452 return
454 # Validate: --algo requires --secret to avoid algorithm/key mismatch
455 if args.algo and not args.secret:
456 print(
457 "ERROR: --algo requires --secret to be specified.\n"
458 " Using --algo alone would mix config-based keys with a different algorithm,\n"
459 " which can produce invalid tokens. Either:\n"
460 " - Provide both --secret and --algo for full override\n"
461 " - Omit both to use JWT_SECRET_KEY and JWT_ALGORITHM from config",
462 file=sys.stderr,
463 )
464 sys.exit(1)
466 # Security warning for rich tokens
467 if args.admin or args.teams or args.scopes:
468 print(
469 "⚠️ WARNING: Creating token with elevated claims (admin/teams/scopes)\n"
470 " This requires JWT_SECRET_KEY and is intended for development/testing only.\n"
471 " For production token management, use the /tokens API endpoint.\n",
472 file=sys.stderr,
473 )
475 payload = _payload_from_cli(args)
477 # Build rich token parameters if provided
478 user_data = None
479 teams = None
480 scopes_dict = None
482 if args.admin or args.teams or args.scopes or args.full_name:
483 user_email = payload.get("sub") or payload.get("username", "admin@example.com")
485 # Build user data
486 user_data = {
487 "email": user_email,
488 "full_name": args.full_name or "CLI User",
489 "is_admin": bool(args.admin),
490 "auth_provider": "cli", # Mark as CLI-generated for auditing
491 }
493 # Build teams list
494 if args.teams:
495 teams = [t.strip() for t in args.teams.split(",") if t.strip()]
497 # Build scopes
498 if args.scopes:
499 try:
500 scopes_dict = orjson.loads(args.scopes)
501 except orjson.JSONDecodeError as e:
502 print(f"ERROR: Invalid JSON for --scopes: {e}", file=sys.stderr)
503 sys.exit(1)
505 if args.pretty:
506 print("Payload:")
507 print(orjson.dumps(payload, default=str, option=orjson.OPT_INDENT_2).decode())
508 if user_data:
509 print("User Data:")
510 print(orjson.dumps(user_data, default=str, option=orjson.OPT_INDENT_2).decode())
511 if teams:
512 print(f"Teams: {teams}")
513 if scopes_dict:
514 print("Scopes:")
515 print(orjson.dumps(scopes_dict, default=str, option=orjson.OPT_INDENT_2).decode())
516 print("-")
518 token = _create_jwt_token(payload, args.exp, args.secret, args.algo, user_data, teams, scopes_dict)
519 print(token)
522if __name__ == "__main__":
523 # Support being run via ``python3 -m mcpgateway.utils.create_jwt_token`` too
524 try:
525 # Respect existing asyncio loop if present (e.g. inside uvicorn dev server)
526 loop = asyncio.get_running_loop()
527 loop.run_until_complete(asyncio.sleep(0)) # no-op to ensure loop alive
528 except RuntimeError:
529 # No loop; we're just a simple CLI call - run main synchronously
530 main()
531 else:
532 # We're inside an active asyncio program - delegate to executor to avoid blocking
533 loop.run_in_executor(None, main)