Source code for ibm_watson_machine_learning.instance_new_plan

#  -----------------------------------------------------------------------------------------
#  (C) Copyright IBM Corp. 2020-2024.
#  https://opensource.org/licenses/BSD-3-Clause
#  -----------------------------------------------------------------------------------------

import ibm_watson_machine_learning._wrappers.requests as requests
import json
import base64
import logging
from datetime import datetime, timedelta
from ibm_watson_machine_learning.wml_client_error import NoWMLCredentialsProvided, ApiRequestFailure, WMLClientError, \
                                                         CannotAutogenerateBedrockUrl
from ibm_watson_machine_learning.href_definitions import HrefDefinitions


[docs] class ServiceInstanceNewPlan: """Connect, get details and check usage of Watson Machine Learning service instance.""" def __init__(self, client): self._logger = logging.getLogger(__name__) self._client = client self._wml_credentials = client.wml_credentials self._expiration_datetime = None self._min_expiration_time = timedelta(minutes=15) if self._client.ICP_PLATFORM_SPACES: if self.get_instance_id() == 'openshift' or self.get_instance_id() == 'wml_local': self._wml_credentials[u'url'] = self.get_url() else: self._wml_credentials[u'url'] = self.get_url() + ':31843' # TODO: Check if this is used anywhere.. from initial searches, doesn't seem like # self._wml_credentials[u'instance_id'] = "999" # This is used in connections.py self._href_definitions = HrefDefinitions(self._client, self._client.CLOUD_PLATFORM_SPACES, self._client.PLATFORM_URL, self._client.CAMS_URL, self._client.ICP_PLATFORM_SPACES) self._client.wml_token = self._get_token() if not self._client.proceed: # there is no 'token' in wml_credentials delta = self._get_expiration_datetime() - datetime.now() if delta < self._min_expiration_time: self._min_expiration_time = delta - timedelta(minutes=1) if delta > timedelta(minutes=1) else delta # self._logger.info(u'Successfully prepared token: ' + self._client.wml_token) # ml_repository_client is initialized in repo self.details = None
[docs] def get_instance_id(self): """Get instance id of Watson Machine Learning service. :return: instance id :rtype: str **Example** .. code-block:: python instance_details = client.service_instance.get_instance_id() """ if self._wml_credentials['instance_id'] == 'invalid': raise WMLClientError('instance_id for this plan is picked up from the space with which' 'this instance_id is associated with. Set the space with associated' 'instance_id to be able to use this function') return self._wml_credentials['instance_id']
[docs] def get_api_key(self): """Get api key of Watson Machine Learning service. :return: api key :rtype: str **Example** .. code-block:: python instance_details = client.service_instance.get_api_key() """ return self._wml_credentials['apikey']
[docs] def get_url(self): """Get instance url of Watson Machine Learning service. :return: instance url :rtype: str **Example** .. code-block:: python instance_details = client.service_instance.get_url() """ return self._wml_credentials['url']
[docs] def get_username(self): """Get username for Watson Machine Learning service. Applicable only for IBM Cloud PakĀ® for Data. :return: username :rtype: str **Example** .. code-block:: python instance_details = client.service_instance.get_username() """ if self._client.ICP_PLATFORM_SPACES: try: return self._wml_credentials['username'] except: raise WMLClientError('`username` missing in wml_credentials.') else: raise WMLClientError("Not applicable for Cloud")
[docs] def get_password(self): """Get password for Watson Machine Learning service. Applicable only for IBM Cloud PakĀ® for Data. :return: password :rtype: str **Example** .. code-block:: python instance_details = client.service_instance.get_password() """ if self._client.ICP_PLATFORM_SPACES: try: return self._wml_credentials['password'] except: raise WMLClientError('`password` missing in wml_credentials.') else: raise WMLClientError("Not applicable for Cloud")
[docs] def get_details(self): """Get information about Watson Machine Learning instance. :return: metadata of service instance :rtype: dict **Example** .. code-block:: python instance_details = client.service_instance.get_details() """ if not self._client.ICP: if self._wml_credentials is not None: if self._wml_credentials['instance_id'] == 'invalid': raise WMLClientError('instance_id for this plan is picked up from the space with which ' 'this instance_id is associated with. Set the space with associated ' 'instance_id to be able to use this function') # /ml/v4/instances will need either space_id or project_id as mandatory params # We will enable this service instance class only during create space or # set space/project. So, space_id/project_id would have been populated at this point headers = self._client._get_headers() del headers[u'X-WML-User-Client'] if 'ML-Instance-ID' in headers: headers.pop('ML-Instance-ID') headers.pop(u'x-wml-internal-switch-to-new-v4') # params = {'version': self._client.version_param} response_get_instance = requests.get( self._href_definitions.get_v4_instance_id_href(), params=self._client._params(skip_space_project_chk=True), # params={'version': self._client.version_param}, headers=headers ) if response_get_instance.status_code == 200: return response_get_instance.json() else: raise ApiRequestFailure(u'Getting instance details failed.', response_get_instance) else: raise NoWMLCredentialsProvided else: return {}
def _get_token(self): if self._client.wml_token is None: return self._create_token() if self._is_token_refresh_possible(): if self._client.ICP: if self._get_expiration_datetime(): if self._get_expiration_datetime() - timedelta(minutes=50) < datetime.now(): self._client.wml_token = self._get_cpd_token_from_request() self._client.repository._refresh_repo_client() else: self._client.wml_token = self._get_cpd_token_from_request() self._client.repository._refresh_repo_client() elif self._client._is_IAM(): if self._get_expiration_datetime() - self._min_expiration_time < datetime.now(): self._client.wml_token = self._get_IAM_token() self._client.repository._refresh_repo_client() elif self._get_expiration_datetime() - timedelta(minutes=30) < datetime.now(): self._client.repository._refresh_repo_client() self._refresh_token() return self._client.wml_token def _create_token(self): if self._client.proceed is True: return self._wml_credentials["token"] if not self._client.ICP_PLATFORM_SPACES: if self._client._is_IAM(): return self._get_IAM_token() else: raise WMLClientError('apikey for IAM token is not provided in credentials for the client.') else: return self._get_cpd_token_from_request() def _refresh_token(self): if self._client.proceed is True: self._client.wml_token = self._wml_credentials["token"] self._client.wml_token = self._get_cpd_token_from_request() def _get_expiration_datetime(self): if self._expiration_datetime is not None: return self._expiration_datetime token_parts = self._client.wml_token.split('.') token_padded = token_parts[1] + '===' try: token_info = json.loads(base64.b64decode(token_padded).decode('utf-8', errors='ignore')) except ValueError: # If there is a problem with decoding (e.g. special char in token), add altchars token_info = json.loads(base64.b64decode(token_padded, altchars='_-').decode('utf-8', errors='ignore')) token_expire = token_info.get('exp') return datetime.fromtimestamp(token_expire) def _is_iam(self): try: token_parts = self._client.wml_token.split('.') token_padded = token_parts[1] + '===' try: token_info = json.loads(base64.b64decode(token_padded).decode('utf-8', errors='ignore')) except ValueError: # If there is a problem with decoding (e.g. special char in token), add altchars token_info = json.loads(base64.b64decode(token_padded, altchars='_-').decode('utf-8', errors='ignore')) instanceId = token_info.get('instanceId') return instanceId except: return False def _get_IAM_token(self): if self._client.proceed is True: return self._wml_credentials["token"] headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Authorization': 'Basic Yng6Yng=' } mystr = 'apikey=' + self._href_definitions.get_iam_token_api() response = requests.post( self._href_definitions.get_iam_token_url(), data=mystr, headers=headers ) if response.status_code == 200: token = response.json().get(u'access_token') self._expiration_datetime = None else: raise WMLClientError(u'Error getting IAM Token.', response) return token def _is_token_refresh_possible(self): """Checks if necessary credentials were passed for token refresh. For CP4D we need (username & password)/(username & api_key). For Cloud we need api_key. :return: `True` if token refresh can be performed `False` otherwise :rtype: bool """ if self._client._is_IAM(): return 'apikey' in self._wml_credentials else: return 'username' in self._wml_credentials and ( 'password' in self._wml_credentials or 'apikey' in self._wml_credentials) def _get_cpd_auth_pair(self): """Get a pair of credentials required for generation of token. :return: string representing a dictionary of authentication credentials (username & password) or (username & api_key). :rtype: str """ if "apikey" in self._wml_credentials: return f'{{\"username\": \"{self.get_username()}\", \"api_key\": \"{self.get_api_key()}\"}}' else: return f'{{\"username\": \"{self.get_username()}\", \"password\": \"{self.get_password()}\"}}' def _get_cpd_bedrock_auth_data(self): """Get data required for generation of token. :return: string representing a dictionary of authentication credentials :rtype: str """ return f'grant_type=password&username={self.get_username()}&password={self.get_password()}&scope=openid' def _get_cpd_token_from_request_old_auth_flow(self): token_url = self._href_definitions.get_cpd_token_endpoint_href() response = requests.post(token_url, headers={ 'Content-Type': 'application/json' }, data=self._get_cpd_auth_pair()) if response.status_code == 200: self._expiration_datetime = None return response.json().get(u'token') else: raise WMLClientError(u'Error refreshing the token.', response) def _get_cpd_token_from_request_new_auth_flow(self): bedrock_url = self._href_definitions.get_cpd_bedrock_token_endpoint_href() response = requests.post(bedrock_url, headers={ 'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8' }, data=self._get_cpd_bedrock_auth_data()) if response.status_code != 200: raise WMLClientError(u'Error refreshing the token.', response, logg_messages=False) iam_token = response.json()['access_token'] self._expiration_datetime = datetime.now() + timedelta(seconds=response.json()['expires_in']) # refresh_token = response.json()['refresh_token'] token_url = self._href_definitions.get_cpd_validation_token_endpoint_href() response = requests.get(token_url, headers={ 'username': self.get_username(), 'iam-token': iam_token }) if response.status_code == 200: return response.json()['accessToken'] else: raise WMLClientError(u'Error refreshing the token.', response) def _get_cpd_token_from_request(self): """Send a request for token on CPD. :return: newly created token is returned if no errors occurred :rtype: str """ if self._client.CPD_version and 'bedrock_url' in self._client.wml_credentials and 'password' in self._client.wml_credentials: try: return self._get_cpd_token_from_request_new_auth_flow() except Exception as e1: if not hasattr(self._client, '_is_bedrock_url_autogenerated'): raise e1 try: res = self._get_cpd_token_from_request_old_auth_flow() # if it worked then iamintegration=False, then removing bedrock_url will shorten the path del self._client.wml_credentials['bedrock_url'] return res except Exception as e2: if hasattr(self._client, '_is_bedrock_url_autogenerated') and self._client._is_bedrock_url_autogenerated: raise CannotAutogenerateBedrockUrl(e1, e2) else: raise e2 else: return self._get_cpd_token_from_request_old_auth_flow()