# -----------------------------------------------------------------------------------------
# (C) Copyright IBM Corp. 2023-2024.
# https://opensource.org/licenses/BSD-3-Clause
# -----------------------------------------------------------------------------------------
"""
.. module:: APIClient
:platform: Unix, Windows
:synopsis: IBM watsonx.ai API Client.
.. moduleauthor:: IBM
"""
import copy
import logging
import os
from warnings import warn
from typing import Any, cast, TypeAlias
import json
import base64
import ibm_watsonx_ai.utils
from ibm_watsonx_ai.utils import get_user_agent_header
from ibm_watsonx_ai.utils.utils import _APIClientSession
from ibm_watsonx_ai.Set import Set
from ibm_watsonx_ai.assets import Assets
from ibm_watsonx_ai.connections import Connections
from ibm_watsonx_ai import Credentials
from ibm_watsonx_ai.deployments import Deployments
from ibm_watsonx_ai.experiments import Experiments
from ibm_watsonx_ai.export_assets import Export
from ibm_watsonx_ai.factsheets import Factsheets
from ibm_watsonx_ai.foundation_models_manager import FoundationModelsManager
from ibm_watsonx_ai.functions import Functions
from ibm_watsonx_ai.ai_services import AIServices
from ibm_watsonx_ai.hw_spec import HwSpec
from ibm_watsonx_ai.import_assets import Import
from ibm_watsonx_ai.service_instance import ServiceInstance
from ibm_watsonx_ai.messages.messages import Messages
from ibm_watsonx_ai.model_definition import ModelDefinition
from ibm_watsonx_ai.models import Models
from ibm_watsonx_ai.parameter_sets import ParameterSets
from ibm_watsonx_ai.pipelines import Pipelines
from ibm_watsonx_ai.pkg_extn import PkgExtn
from ibm_watsonx_ai.spaces import Spaces
from ibm_watsonx_ai.remote_training_system import RemoteTrainingSystem
from ibm_watsonx_ai.repository import Repository
from ibm_watsonx_ai.script import Script
from ibm_watsonx_ai.shiny import Shiny
from ibm_watsonx_ai.sw_spec import SwSpec
from ibm_watsonx_ai.task_credentials import TaskCredentials
from ibm_watsonx_ai.training import Training
from ibm_watsonx_ai.utils import CPDVersion
from ibm_watsonx_ai.volumes import Volume
from ibm_watsonx_ai.wml_client_error import NoWMLCredentialsProvided
from ibm_watsonx_ai.wml_client_error import WMLClientError
# requests module or requests.Session
RequestsLikeType: TypeAlias = Any
[docs]
class APIClient:
"""The main class of ibm_watsonx_ai. The very heart of the module. APIClient contains objects that manage the service reasources.
To explore how to use APIClient, refer to:
- :ref:`Setup<setup>` - to check correct initialization of APIClient for a specific environment.
- :ref:`Core<core>` - to explore core properties of an APIClient object.
:param url: URL of the service
:type url: str
:param credentials: credentials used to connect with the service
:type credentials: Credentials
:param project_id: ID of the project that is used
:type project_id: str, optional
:param space_id: ID of deployment space that is used
:type space_id: str, optional
:param verify: certificate verification flag, deprecated, use Credentials(verify=...) to set `verify`
:type verify: bool, optional
**Example:**
.. code-block:: python
from ibm_watsonx_ai import APIClient, Credentials
credentials = Credentials(
url = "<url>",
api_key = "<api_key>"
)
client = APIClient(credentials, space_id="<space_id>")
client.models.list()
client.deployments.get_details()
client.set.default_project("<project_id>")
...
"""
version: str | None = None
_internal: bool = False
def __init__(
self,
credentials: Credentials | dict[str, str] | None = None,
project_id: str | None = None,
space_id: str | None = None,
verify: str | bool | None = None,
**kwargs: Any,
) -> None:
if (wml_credentials := kwargs.get("wml_credentials")) is not None:
warn("`wml_credentials` parameter is deprecated, please use `credentials`")
if not credentials:
credentials = wml_credentials
if wml_credentials is None and credentials is None:
raise TypeError("APIClient() missing 1 required argument: 'credentials'")
self._logger = logging.getLogger(__name__)
if verify is not None:
warn(
"`verify` parameter is deprecated. Use `ibm_watsonx_ai.Credentials` for passing `verify` parameter.",
category=DeprecationWarning,
)
if isinstance(credentials, dict):
warn(
"`credentials` parameter as dict is deprecated. Use `ibm_watsonx_ai.Credentials` for passing parameters.",
category=DeprecationWarning,
)
credentials = Credentials.from_dict(credentials, _verify=verify)
if project_id is not None and space_id is not None:
raise WMLClientError(
"`project_id` parameter and `space_id` parameter cannot be set at the same time."
)
credentials._set_env_vars_from_credentials()
if isinstance(credentials.verify, str):
credentials.verify = True
from ibm_watsonx_ai._wrappers import requests
if credentials.proxies is not None:
requests.additional_settings["proxies"] = credentials.proxies
elif requests.additional_settings.get("proxies") is not None:
del requests.additional_settings["proxies"]
# At this stage `credentials` has type Dict[str, str]
credentials = cast(Credentials, credentials)
self.credentials = copy.deepcopy(credentials)
self.default_space_id = None
self.default_project_id = None
self.project_type = None
self.CLOUD_PLATFORM_SPACES = False
self.PLATFORM_URL = None
self.version_param = self._get_api_version_param()
self.ICP_PLATFORM_SPACES = False # This will be applicable for 3.5 and later and specific to convergence functionalities
self.CPD_version = CPDVersion()
self._iam_id = None
self._spec_ids_per_state: dict = {}
self.generate_ux_tag = True
self.WCA: bool = False
self._user_headers: dict | None = None # Used in set_headers() method
self.__session = None
self.PLATFORM_URLS_MAP = {
"https://ca-tor.ml.cloud.ibm.com": "https://ca-tor.dai.cloud.ibm.com",
"https://private.ca-tor.ml.cloud.ibm.com": "https://private.ca-tor.dai.cloud.ibm.com",
"https://wxai-qa.ml.cloud.ibm.com": "https://api.dai.test.cloud.ibm.com",
"https://private.wxai-qa.ml.cloud.ibm.com": "https://private.api.dai.test.cloud.ibm.com",
"https://wml-mcsp-dev.ml.test.cloud.ibm.com": "https://api.dai.dev.cloud.ibm.com",
"https://wml-dev.ml.test.cloud.ibm.com": "https://api.dataplatform.dev.cloud.ibm.com",
"https://wml-fvt.ml.test.cloud.ibm.com": "https://api.dataplatform.dev.cloud.ibm.com",
"https://private.wml-fvt.ml.test.cloud.ibm.com": "https://api.dataplatform.dev.cloud.ibm.com",
"https://us-south.ml.test.cloud.ibm.com": "https://api.dataplatform.dev.cloud.ibm.com",
"https://yp-qa.ml.cloud.ibm.com": "https://api.dataplatform.test.cloud.ibm.com",
"https://private.yp-qa.ml.cloud.ibm.com": "https://api.dataplatform.test.cloud.ibm.com",
"https://yp-cr.ml.cloud.ibm.com": "https://api.dataplatform.test.cloud.ibm.com",
"https://private.yp-cr.ml.cloud.ibm.com": "https://api.dataplatform.test.cloud.ibm.com",
"https://jp-tok.ml.cloud.ibm.com": "https://api.jp-tok.dataplatform.cloud.ibm.com",
"https://eu-gb.ml.cloud.ibm.com": "https://api.eu-gb.dataplatform.cloud.ibm.com",
"https://eu-de.ml.cloud.ibm.com": "https://api.eu-de.dataplatform.cloud.ibm.com",
"https://us-south.ml.cloud.ibm.com": "https://api.dataplatform.cloud.ibm.com",
"https://au-syd.ml.cloud.ibm.com": "https://api.au-syd.dai.cloud.ibm.com",
"https://private.jp-tok.ml.cloud.ibm.com": "https://api.jp-tok.dataplatform.cloud.ibm.com",
"https://private.eu-gb.ml.cloud.ibm.com": "https://api.eu-gb.dataplatform.cloud.ibm.com",
"https://private.eu-de.ml.cloud.ibm.com": "https://api.eu-de.dataplatform.cloud.ibm.com",
"https://private.us-south.ml.cloud.ibm.com": "https://api.dataplatform.cloud.ibm.com",
"https://private.us-south.ml.test.cloud.ibm.com": "https://api.dataplatform.dev.cloud.ibm.com",
"https://private.au-syd.ml.cloud.ibm.com": "https://api.au-syd.dai.cloud.ibm.com",
}
requests.packages.urllib3.disable_warnings() # type: ignore[attr-defined]
if self.credentials.token is not None:
if not (self.credentials.api_key or self.credentials.password):
self.proceed = True
else:
# _is_env_token is used for initialising client on cluster with USER_ACCESS_TOKEN environment variable.
self.proceed = not self.credentials._is_env_token
else:
self.proceed = False
self.token: str | None = None
self.__predefined_instance_type_list = ["icp", "openshift"]
if credentials is None:
raise NoWMLCredentialsProvided()
if self.credentials.url is None:
raise WMLClientError(Messages.get_message(message_id="url_not_provided"))
if not self.credentials.url.startswith("https://"):
raise WMLClientError(Messages.get_message(message_id="invalid_url"))
if self.credentials.url[-1] == "/":
self.credentials.url = self.credentials.url.rstrip("/")
with self._session:
if self.credentials.instance_id is None:
self.CLOUD_PLATFORM_SPACES = True
self.ICP_PLATFORM_SPACES = False
if self._internal:
self.PLATFORM_URL = self.credentials.url
else:
if self.credentials.platform_url:
self.PLATFORM_URL = self.credentials.platform_url
elif self.credentials.url in self.PLATFORM_URLS_MAP.keys():
self.PLATFORM_URL = self.PLATFORM_URLS_MAP[self.credentials.url]
else:
raise WMLClientError(
Messages.get_message(
message_id="invalid_cloud_scenario_url"
)
)
if not self._is_IAM():
raise WMLClientError(
Messages.get_message(message_id="apikey_not_provided")
)
else:
if (
"icp" == self.credentials.instance_id.lower()
or "openshift" == self.credentials.instance_id.lower()
):
self.ICP_PLATFORM_SPACES = True
os.environ["DEPLOYMENT_PLATFORM"] = "private"
# Validate the cpd version:
response_get_wml_services = self._session.get(
f"{self.credentials.url}/ml/wml_services/version",
headers={"User-Agent": get_user_agent_header()},
)
if (
response_get_wml_services.status_code != 200
): # retry with endpoint for cpd 4.8 and higher
response_get_wml_services = self._session.get(
f"{self.credentials.url}/ml/wml_services/v2/version",
headers={"User-Agent": get_user_agent_header()},
)
if response_get_wml_services.status_code == 200:
wml_version = response_get_wml_services.json().get(
"version", ""
)
if wml_version:
wml_version = ".".join(wml_version.split(".")[:2])
if self.credentials.version is None:
self.credentials.version = wml_version
elif self.credentials.version != wml_version:
warn(
f"The provided version: {self.credentials.version} "
f"is different from the current CP4D version: {wml_version}. "
f"Correct the credentials with proper CP4D version number."
)
if (
self.credentials.version
not in CPDVersion.supported_version_list
):
raise WMLClientError(
Messages.get_message(
self.credentials.version,
self.version,
message_id="invalid_version_from_automated_check",
)
)
else:
self._logger.debug(
f"GET /ml/wml_services/version failed with status code: {response_get_wml_services.status_code}."
)
# Condition for CAMS related changes to take effect (Might change)
if self.credentials.version is None:
raise WMLClientError(
Messages.get_message(
CPDVersion.supported_version_list,
message_id="version_not_provided",
)
)
if (
self.credentials.version.lower()
in CPDVersion.supported_version_list
):
self.CPD_version.cpd_version = self.credentials.version.lower()
os.environ["DEPLOYMENT_PRIVATE"] = "icp4d"
if self.credentials.bedrock_url is None and self.CPD_version:
if self.CPD_version < 4.7:
bedrock_prefix = "https://cp-console"
else:
namespace_from_url = "-".join(
self.credentials.url.split(".")[0].split("-")[1:]
)
route = (
"cpd" if self.CPD_version >= 5.1 else "cp-console"
)
bedrock_prefix = f"https://{route}-{namespace_from_url}"
self.credentials.bedrock_url = ".".join(
[bedrock_prefix] + self.credentials.url.split(".")[1:]
)
self._is_bedrock_url_autogenerated = True
else:
self.ICP_PLATFORM_SPACES = False
raise WMLClientError(
Messages.get_message(
", ".join(CPDVersion.supported_version_list),
message_id="invalid_version",
)
)
else:
if self.credentials.url in self.PLATFORM_URLS_MAP:
raise WMLClientError(
Messages.get_message(
message_id="instance_id_in_cloud_scenario"
)
)
else:
raise WMLClientError(
Messages.get_message(message_id="invalid_instance_id")
)
if (
self.credentials.instance_id is not None
and (
self.credentials.instance_id.lower()
not in self.__predefined_instance_type_list
)
and self.credentials.version is not None
):
raise WMLClientError(
Messages.get_message(message_id="provided_credentials_are_invalid")
)
self._use_fm_ga_api = self.CLOUD_PLATFORM_SPACES or (
self._check_if_fm_ga_api_available()
if self.CPD_version <= 4.8
else True
)
self._use_pta_ga_api = self.CLOUD_PLATFORM_SPACES or (
self.CPD_version >= 5.0
)
self.token = None
# For cloud, service_instance.details will be set during space creation( if instance is associated ) or
# while patching a space with an instance
self.service_instance: ServiceInstance = ServiceInstance(self)
self.volumes = Volume(self)
if self._use_fm_ga_api:
self.foundation_models = FoundationModelsManager(self)
if self.ICP_PLATFORM_SPACES:
self.service_instance._refresh_details = True
self.set = Set(self)
if project_id:
self.set.default_project(project_id) # recognizes project type
elif space_id:
self.set.default_space(space_id)
self.spaces = Spaces(self)
self.export_assets = Export(self)
self.import_assets = Import(self)
if self.ICP_PLATFORM_SPACES:
self.shiny = Shiny(self)
self.script = Script(self)
self.model_definitions = ModelDefinition(self)
self.package_extensions = PkgExtn(self)
self.software_specifications = SwSpec(self)
self.hardware_specifications = HwSpec(self)
self.connections = Connections(self)
self.training: Training = Training(self)
self.data_assets = Assets(self)
self.deployments = Deployments(self)
if self.CLOUD_PLATFORM_SPACES:
self.factsheets = Factsheets(self)
self.task_credentials = TaskCredentials(self)
# AI service available only on CLOUD
self.__ai_services = AIServices(self)
self.remote_training_systems = RemoteTrainingSystem(self)
self.repository = Repository(self)
self._models = Models(self)
self.pipelines = Pipelines(self)
self.experiments = Experiments(self)
self._functions = Functions(self)
self.parameter_sets = ParameterSets(self)
self._logger.info(
Messages.get_message(message_id="client_successfully_initialized")
)
@property
def wml_credentials(self) -> dict[str, str]:
warn(
"`wml_credentials` attribute is deprecated, please use `client.credentials` instead"
)
return self.credentials.to_dict()
@wml_credentials.setter
def wml_credentials(self, value: dict[str, str]) -> None:
warn(
"`wml_credentials` attribute is deprecated, please use `client.credentials` instead"
)
self.credentials = Credentials.from_dict(value)
@property
def wml_token(self) -> str | None:
warn("`wml_token` attribute is deprecated, please use `client.token` instead")
return self.token
@wml_token.setter
def wml_token(self, value: str) -> None:
warn("`wml_token` attribute is deprecated, please use `client.token` instead")
self.token = value
@property
def _session(self) -> RequestsLikeType:
if self.__session is None:
self.__session = _APIClientSession(self)
return self.__session
@_session.setter
def _session(self, value: RequestsLikeType):
self.__session = value
@property
def _ai_services(self) -> AIServices:
if self.CLOUD_PLATFORM_SPACES:
return self.__ai_services
else:
raise WMLClientError(
error_msg="AI service is unsupported for this release."
)
@staticmethod
def _get_api_version_param() -> str:
try:
file_name = "API_VERSION_PARAM"
path = os.path.dirname(ibm_watsonx_ai.utils.__file__)
with open(os.path.join(path, file_name)) as file:
return file.read().strip()
except Exception:
return "2021-06-21"
def _check_if_either_is_set(self) -> None:
if self.default_space_id is None and self.default_project_id is None:
raise WMLClientError(
Messages.get_message(
message_id="it_is_mandatory_to_set_the_space_project_id"
)
)
def _check_if_space_is_set(self) -> None:
if self.default_space_id is None:
raise WMLClientError(
Messages.get_message(message_id="it_is_mandatory_to_set_the_space_id")
)
def _params(
self,
skip_space_project_chk: bool = False,
skip_for_create: bool = False,
skip_userfs: bool = False,
) -> dict:
params = {}
params.update({"version": self.version_param})
if not skip_for_create:
if self.default_space_id is not None:
params.update({"space_id": self.default_space_id})
elif self.default_project_id is not None:
params.update({"project_id": self.default_project_id})
else:
# For system software/hardware specs
if skip_space_project_chk is False:
raise WMLClientError(
Messages.get_message(
message_id="it_is_mandatory_to_set_the_space_project_id"
)
)
if (
self.default_project_id
and self.project_type == "local_git_storage"
and not skip_userfs
):
params.update({"userfs": "true"})
if self._iam_id:
params.update({"iam_id": str(self._iam_id)})
if (
not self.default_project_id
or self.project_type != "local_git_storage"
or skip_userfs
) and "userfs" in params:
del params["userfs"]
return params
def _get_headers(
self,
content_type: str = "application/json",
no_content_type: bool = False,
zen: bool = False,
projects_token: bool = False,
) -> dict:
if zen:
headers = {"Content-Type": content_type}
token = self.service_instance._create_token()
if len(token.split(".")) == 1:
headers.update({"Authorization": "Basic " + token})
else:
headers.update({"Authorization": "Bearer " + token})
else:
if self.proceed is True:
token_to_use = (
self.credentials.projects_token
if projects_token and self.credentials.projects_token is not None
else self.credentials.token
)
if len(token_to_use.split(".")) == 1:
token = "Basic " + token_to_use
else:
token = "Bearer " + token_to_use
else:
token = "Bearer " + self.service_instance._get_token()
headers = {
"Authorization": token,
"User-Agent": get_user_agent_header(),
}
if not self.ICP_PLATFORM_SPACES:
if self.default_project_id is not None:
headers.update({"X-Watson-Project-ID": self.default_project_id})
if not no_content_type:
headers.update({"Content-Type": content_type})
if not self.generate_ux_tag:
headers.update({"X-WX-UX": "true"})
self.generate_ux_tag = True
if self.WCA:
headers.update({"IBM-WATSONXAI-CONSUMER": "wca"})
if (env_variable := os.environ.get("IBM_SDK_API_CLIENT_HEADERS")) is not None:
headers = headers | json.loads(
base64.b64decode(env_variable).decode("utf-8")
)
if self._user_headers:
headers = headers | self._user_headers
return headers
[docs]
def set_token(self, token: str) -> None:
"""
Method which allows refresh/set new User Authorization Token.
:param token: User Authorization Token
:type token: str
**Examples**
.. code-block:: python
client.set_token("<USER AUTHORIZATION TOKEN>")
"""
self.proceed = True
self.token = token
self.credentials.token = token
def _get_icptoken(self) -> str:
return self.service_instance._create_token()
def _is_default_space_set(self) -> bool:
if self.default_space_id is not None:
return True
return False
def _is_IAM(self) -> bool:
if self.credentials.api_key is not None:
if self.credentials.api_key != "":
return True
else:
raise WMLClientError(
Messages.get_message(message_id="apikey_value_cannot_be_empty")
)
elif self.credentials.token is not None:
if self.credentials.token != "":
return True
else:
raise WMLClientError(
Messages.get_message(message_id="token_value_cannot_be_empty")
)
else:
return False
def _check_if_fm_ga_api_available(self) -> bool:
response_ga_api = self._session.get(
url="{}/ml/v1/foundation_model_specs?limit={}".format(
self.credentials.url, "1"
),
params={"version": self.version_param},
)
return response_ga_api.status_code == 200