Skip to content

src.llm.adapters.watsonx.ibm_token_manager.IBMTokenManager

Manages IBM Cloud OAuth2 token lifecycle for WatsonX API access.

This class handles authentication token management for IBM Cloud services, including automatic token refresh and thread-safe token access. It implements a singleton pattern to maintain one token instance across the application.

Attributes:

Name Type Description
api_key str

IBM Cloud API key for authentication.

token_url str

IBM IAM authentication endpoint URL.

refresh_buffer int

Time buffer in seconds before token expiry to trigger refresh.

access_token Optional[str]

Current valid access token.

expiry_time float

Unix timestamp when the current token expires.

lock Lock

Async lock for thread-safe token refresh operations.

Source code in src/llm/adapters/watsonx/ibm_token_manager.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class IBMTokenManager:
    """Manages IBM Cloud OAuth2 token lifecycle for WatsonX API access.

    This class handles authentication token management for IBM Cloud services,
    including automatic token refresh and thread-safe token access. It implements
    a singleton pattern to maintain one token instance across the application.

    Attributes:
        api_key (str): IBM Cloud API key for authentication.
        token_url (str): IBM IAM authentication endpoint URL.
        refresh_buffer (int): Time buffer in seconds before token expiry to trigger refresh.
        access_token (Optional[str]): Current valid access token.
        expiry_time (float): Unix timestamp when the current token expires.
        lock (asyncio.Lock): Async lock for thread-safe token refresh operations.
    """

    def __init__(self, api_key: str, refresh_buffer: int = 60):
        """Initialize the IBM Token Manager.

        Args:
            api_key (str): IBM WatsonX API key for authentication.
            refresh_buffer (int, optional): Buffer time in seconds before token expiry
                to trigger a refresh. Defaults to 60 seconds.

        Raises:
            ValueError: If api_key is empty or None.
            EnvironmentError: If IBM_AUTH_URL environment variable is not set.
        """
        if not api_key:
            raise ValueError("API key cannot be empty or None")

        self.api_key = api_key
        self.token_url = os.getenv("IBM_AUTH_URL")
        if not self.token_url:
            raise EnvironmentError("IBM_AUTH_URL environment variable not set")

        self.refresh_buffer = refresh_buffer
        self.access_token: Optional[str] = None
        self.expiry_time: float = 0
        self.lock = asyncio.Lock()

        logger.debug("Initialized IBMTokenManager with refresh buffer of %d seconds", refresh_buffer)

    async def _is_token_expired(self) -> bool:
        """Check if the current token is expired or approaching expiry.

        Returns:
            bool: True if token is expired or will expire soon, False otherwise.
        """
        current_time = time.time()
        is_expired = self.access_token is None or current_time > (self.expiry_time - self.refresh_buffer)

        if is_expired:
            logger.debug("Token is expired or approaching expiry")
        return is_expired

    async def _refresh_token(self) -> None:
        """Fetch a new OAuth token from IBM IAM.

        Raises:
            aiohttp.ClientError: If the token refresh request fails.
            ValueError: If the response doesn't contain expected token information.
            Exception: For any other unexpected errors during token refresh.
        """
        async with self.lock:
            if not await self._is_token_expired():
                logger.debug("Token refresh skipped - current token still valid")
                return

            logger.debug("Starting token refresh process")
            try:
                headers = {"Content-Type": "application/x-www-form-urlencoded"}
                payload = {
                    "grant_type": "urn:ibm:params:oauth:grant-type:apikey",
                    "apikey": self.api_key
                }

                async with aiohttp.ClientSession() as session:
                    logger.debug("Making token refresh request to IBM IAM")
                    async with session.post(self.token_url, headers=headers, data=payload) as response:
                        response.raise_for_status()
                        token_info = await response.json()

                        if "access_token" not in token_info or "expires_in" not in token_info:
                            raise ValueError("Invalid token response from IBM IAM")

                        self.access_token = token_info["access_token"]
                        expires_in = int(token_info["expires_in"])
                        self.expiry_time = time.time() + expires_in

                        logger.info("Successfully refreshed IBM token. Expires in %d seconds", expires_in)
                        logger.debug("Token expiry time set to %s", time.ctime(self.expiry_time))

            except aiohttp.ClientError as e:
                logger.error("HTTP error during token refresh: %s", str(e))
                self.access_token = None
                raise
            except Exception as e:
                logger.error("Unexpected error during token refresh: %s", str(e))
                self.access_token = None
                raise

    async def get_token(self) -> Optional[str]:
        """Retrieve a valid access token, refreshing if necessary.

        Returns:
            Optional[str]: Valid access token if successful, None if token
                refresh fails.

        Raises:
            Exception: If token refresh fails when attempted.
        """
        logger.debug("Token requested")
        if await self._is_token_expired():
            logger.debug("Token refresh needed before returning")
            await self._refresh_token()
        return self.access_token

__init__(api_key, refresh_buffer=60)

Initialize the IBM Token Manager.

Parameters:

Name Type Description Default
api_key str

IBM WatsonX API key for authentication.

required
refresh_buffer int

Buffer time in seconds before token expiry to trigger a refresh. Defaults to 60 seconds.

60

Raises:

Type Description
ValueError

If api_key is empty or None.

EnvironmentError

If IBM_AUTH_URL environment variable is not set.

Source code in src/llm/adapters/watsonx/ibm_token_manager.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(self, api_key: str, refresh_buffer: int = 60):
    """Initialize the IBM Token Manager.

    Args:
        api_key (str): IBM WatsonX API key for authentication.
        refresh_buffer (int, optional): Buffer time in seconds before token expiry
            to trigger a refresh. Defaults to 60 seconds.

    Raises:
        ValueError: If api_key is empty or None.
        EnvironmentError: If IBM_AUTH_URL environment variable is not set.
    """
    if not api_key:
        raise ValueError("API key cannot be empty or None")

    self.api_key = api_key
    self.token_url = os.getenv("IBM_AUTH_URL")
    if not self.token_url:
        raise EnvironmentError("IBM_AUTH_URL environment variable not set")

    self.refresh_buffer = refresh_buffer
    self.access_token: Optional[str] = None
    self.expiry_time: float = 0
    self.lock = asyncio.Lock()

    logger.debug("Initialized IBMTokenManager with refresh buffer of %d seconds", refresh_buffer)

get_token() async

Retrieve a valid access token, refreshing if necessary.

Returns:

Type Description
Optional[str]

Optional[str]: Valid access token if successful, None if token refresh fails.

Raises:

Type Description
Exception

If token refresh fails when attempted.

Source code in src/llm/adapters/watsonx/ibm_token_manager.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
async def get_token(self) -> Optional[str]:
    """Retrieve a valid access token, refreshing if necessary.

    Returns:
        Optional[str]: Valid access token if successful, None if token
            refresh fails.

    Raises:
        Exception: If token refresh fails when attempted.
    """
    logger.debug("Token requested")
    if await self._is_token_expired():
        logger.debug("Token refresh needed before returning")
        await self._refresh_token()
    return self.access_token