Source code for client

#  -----------------------------------------------------------------------------------------
#  (C) Copyright IBM Corp. 2023-2026.
#  https://opensource.org/licenses/BSD-3-Clause
#  -----------------------------------------------------------------------------------------
"""
.. module:: APIClient
   :platform: Unix, Windows
   :synopsis: IBM watsonx.ai API Client.

.. moduleauthor:: IBM
"""

from __future__ import annotations

import asyncio
import base64
import copy
import json
import logging
import os
from functools import cached_property
from pathlib import Path
from types import MappingProxyType
from typing import Any, cast
from urllib.parse import urlparse
from warnings import warn

import httpx

import ibm_watsonx_ai.utils
from ibm_watsonx_ai._wrappers import httpx_wrapper
from ibm_watsonx_ai._wrappers.httpx_wrapper import (
    _get_async_httpx_client,
    _get_httpx_client,
    _httpx_async_transport_params,
    _httpx_transport_params,
)
from ibm_watsonx_ai.ai_services import AIServices
from ibm_watsonx_ai.assets import Assets
from ibm_watsonx_ai.connections import Connections
from ibm_watsonx_ai.credentials import Credentials
from ibm_watsonx_ai.deployments import Deployments
from ibm_watsonx_ai.experiments import Experiments
from ibm_watsonx_ai.export_assets import Export
from ibm_watsonx_ai.factsheets import Factsheets
from ibm_watsonx_ai.folder_assets import FolderAssets
from ibm_watsonx_ai.foundation_models_manager import FoundationModelsManager
from ibm_watsonx_ai.functions import Functions
from ibm_watsonx_ai.hw_spec import HwSpec
from ibm_watsonx_ai.import_assets import Import
from ibm_watsonx_ai.messages.messages import Messages
from ibm_watsonx_ai.model_definition import ModelDefinition
from ibm_watsonx_ai.models import Models
from ibm_watsonx_ai.parameter_sets import ParameterSets
from ibm_watsonx_ai.pipelines import Pipelines
from ibm_watsonx_ai.pkg_extn import PkgExtn
from ibm_watsonx_ai.projects import Projects
from ibm_watsonx_ai.repository import Repository
from ibm_watsonx_ai.runtime_definitions import RuntimeDefinitions
from ibm_watsonx_ai.script import Script
from ibm_watsonx_ai.service_instance import ServiceInstance
from ibm_watsonx_ai.Set import Set
from ibm_watsonx_ai.shiny import Shiny
from ibm_watsonx_ai.spaces import Spaces
from ibm_watsonx_ai.sw_spec import SwSpec
from ibm_watsonx_ai.task_credentials import TaskCredentials
from ibm_watsonx_ai.training import Training
from ibm_watsonx_ai.trashed_assets import TrashedAssets
from ibm_watsonx_ai.utils import CPDVersion, get_user_agent_header
from ibm_watsonx_ai.utils.auth import TokenAuth, TrustedProfileAuth, get_auth_method
from ibm_watsonx_ai.utils.auth.base_auth import TokenRemovedDuringClientCopyPlaceholder
from ibm_watsonx_ai.utils.utils import (
    DEFAULT_HTTP_CLIENT_CONFIG,
    HttpClientConfig,
    _create_href_definitions,
    _validate_gov_cloud_env,
)
from ibm_watsonx_ai.volumes import Volume
from ibm_watsonx_ai.wml_client_error import NoWMLCredentialsProvided, WMLClientError


[docs] class APIClient: """The main class of ibm_watsonx_ai. The very heart of the module. APIClient contains objects that manage the service resources. To explore how to use APIClient, refer to: - :ref:`Setup<setup>` - to check correct initialization of APIClient for a specific environment. - :ref:`Core<core>` - to explore core properties of an APIClient object. :param url: URL of the service :type url: str :param credentials: credentials used to connect with the service :type credentials: Credentials :param project_id: ID of the project that is used :type project_id: str, optional :param space_id: ID of deployment space that is used :type space_id: str, optional :param verify: certificate verification flag, deprecated, use Credentials(verify=...) to set `verify` :type verify: bool | str | Path, optional :param httpx_client: A customizable `httpx.Client` for ModelInference, Embeddings and methods related to the deployments management and scoring. The `httpx.Client` is used to improve performance across deployments, foundation models, and embeddings. This parameter accepts two types of input: - A direct instance of `httpx.Client()` - A set of parameters provided via the `HttpClientConfig` class **Example:** .. code-block:: python from ibm_watsonx_ai.utils.utils import HttpClientConfig limits = httpx.Limits(max_connections=5) timeout = httpx.Timeout(7) http_config = HttpClientConfig(timeout=timeout, limits=limits) If not provided, a default instance of `httpx.Client` is created. .. note:: If you need to adjust timeouts or limits, using ``HttpClientConfig`` is the recommended approach. When the ``proxies`` parameter is provided in credentials, ``httpx.Client`` will use these proxies. However, if you want to create a separate ``httpx.Client``, all parameters must be provided by the user. :type httpx_client: httpx.Client | HttpClientConfig, optional :param async_httpx_client: A customizable `httpx.AsyncClient` for ModelInference. The `httpx.AsyncClient` is used to improve performance of foundation models inference. This parameter accepts two types of input: - A direct instance of `httpx.AsyncClient` - A set of parameters provided via the `HttpClientConfig` class **Example:** .. code-block:: python from ibm_watsonx_ai.utils.utils import HttpClientConfig limits = httpx.Limits(max_connections=5) timeout = httpx.Timeout(7) http_config = HttpClientConfig(timeout=timeout, limits=limits) If not provided, a default instance of `httpx.AsyncClient` is created. .. note:: If you need to adjust timeouts or limits, using ``HttpClientConfig`` is the recommended approach. When the ``proxies`` parameter is provided in credentials, ``httpx.Client`` will use these proxies. However, if you want to create a separate ``httpx.Client``, all parameters must be provided by the user. :type async_httpx_client: httpx.AsyncClient | HttpClientConfig, optional **Example:** .. code-block:: python from ibm_watsonx_ai import APIClient, Credentials credentials = Credentials(url="<url>", api_key=IAM_API_KEY) client = APIClient(credentials, space_id="<space_id>") client.models.list() client.deployments.get_details() client.set.default_project("<project_id>") ... """ version: str | None = None _internal: bool = False PLATFORM_URLS_MAP = MappingProxyType( { # Dallas "https://us-south.ml.cloud.ibm.com": "https://api.dataplatform.cloud.ibm.com", "https://private.us-south.ml.cloud.ibm.com": "https://private.api.dataplatform.cloud.ibm.com", # Frankfurt "https://eu-de.ml.cloud.ibm.com": "https://api.eu-de.dataplatform.cloud.ibm.com", "https://private.eu-de.ml.cloud.ibm.com": "https://private.api.eu-de.dataplatform.cloud.ibm.com", # London "https://eu-gb.ml.cloud.ibm.com": "https://api.eu-gb.dataplatform.cloud.ibm.com", "https://private.eu-gb.ml.cloud.ibm.com": "https://private.api.eu-gb.dataplatform.cloud.ibm.com", # Tokio "https://jp-tok.ml.cloud.ibm.com": "https://api.jp-tok.dataplatform.cloud.ibm.com", "https://private.jp-tok.ml.cloud.ibm.com": "https://private.api.jp-tok.dataplatform.cloud.ibm.com", # Sydney "https://au-syd.ml.cloud.ibm.com": "https://api.au-syd.dai.cloud.ibm.com", "https://private.au-syd.ml.cloud.ibm.com": "https://private.api.au-syd.dai.cloud.ibm.com", # Toronto "https://ca-tor.ml.cloud.ibm.com": "https://api.ca-tor.dai.cloud.ibm.com", "https://private.ca-tor.ml.cloud.ibm.com": "https://private.api.ca-tor.dai.cloud.ibm.com", # Mumbai (AWS) "https://ap-south-1.aws.wxai.ibm.com": "https://api.ap-south-1.aws.data.ibm.com", "https://private.ap-south-1.aws.wxai.ibm.com": "https://api.ap-south-1.aws.data.ibm.com", # TODO ensure private platform url is correct - changed mapping to private -> public # AWS GovCloud "https://wxai.ibmforusgov.com": "https://api.dai.ibmforusgov.com", "https://private.wxai.ibmforusgov.com": "https://api.dai.ibmforusgov.com", # TODO ensure private platform url is correct - changed mapping to private -> public # PreProd AWS GovCloud "https://wxai.prep.ibmforusgov.com": "https://api.dai.prep.ibmforusgov.com", "https://private.wxai.prep.ibmforusgov.com": "https://api.dai.prep.ibmforusgov.com", # TODO ensure private platform url is correct - changed mapping to private -> public # YPCR "https://yp-cr.ml.cloud.ibm.com": "https://api.dataplatform.test.cloud.ibm.com", "https://private.yp-cr.ml.cloud.ibm.com": "https://private.api.dataplatform.test.cloud.ibm.com", # MCSP QA "https://wxai-qa.ml.cloud.ibm.com": "https://api.dai.test.cloud.ibm.com", "https://private.wxai-qa.ml.cloud.ibm.com": "https://private.api.dai.test.cloud.ibm.com", # YPQA "https://yp-qa.ml.cloud.ibm.com": "https://api.dataplatform.test.cloud.ibm.com", "https://private.yp-qa.ml.cloud.ibm.com": "https://private.api.dataplatform.test.cloud.ibm.com", # MCSP DEV "https://wml-mcsp-dev.ml.test.cloud.ibm.com": "https://api.dai.dev.cloud.ibm.com", "https://private.wml-mcsp-dev.ml.test.cloud.ibm.com": "https://private.api.dai.dev.cloud.ibm.com", # FVT "https://wml-fvt.ml.test.cloud.ibm.com": "https://api.dataplatform.dev.cloud.ibm.com", "https://private.wml-fvt.ml.test.cloud.ibm.com": "https://private.api.dataplatform.dev.cloud.ibm.com", # YS1Prod "https://us-south.ml.test.cloud.ibm.com": "https://api.dataplatform.dev.cloud.ibm.com", "https://private.us-south.ml.test.cloud.ibm.com": "https://private.api.dataplatform.dev.cloud.ibm.com", # AWS DEV "https://dev.aws.wxai.ibm.com": "https://api.dev.aws.data.ibm.com", "https://private.dev.aws.wxai.ibm.com": "https://api.dev.aws.data.ibm.com", # TODO ensure private platform url is correct - changed mapping to private -> public # AWS TEST "https://test.aws.wxai.ibm.com": "https://api.test.aws.data.ibm.com", "https://private.test.aws.wxai.ibm.com": "https://api.test.aws.data.ibm.com", # TODO ensure private platform url is correct - changed mapping to private -> public } ) def __init__( self, credentials: Credentials | dict[str, str] | None = None, project_id: str | None = None, space_id: str | None = None, verify: str | Path | bool | None = None, httpx_client: httpx.Client | HttpClientConfig = DEFAULT_HTTP_CLIENT_CONFIG, async_httpx_client: ( httpx.AsyncClient | HttpClientConfig ) = DEFAULT_HTTP_CLIENT_CONFIG, **kwargs: Any, ) -> None: if (wml_credentials := kwargs.get("wml_credentials")) is not None: wml_credentials_parameter_deprecated_warning = ( "`wml_credentials` parameter is deprecated, please use `credentials`" ) warn(wml_credentials_parameter_deprecated_warning, category=DeprecationWarning) # fmt: skip if not credentials: credentials = wml_credentials if wml_credentials is None and credentials is None: raise TypeError("APIClient() missing 1 required argument: 'credentials'") self._logger = logging.getLogger(__name__) wml_full_version = "" if verify is not None: verify_parameter_deprecated_warning = ( "`verify` parameter is deprecated. " "Use `ibm_watsonx_ai.Credentials` for passing `verify` parameter." ) warn(verify_parameter_deprecated_warning, category=DeprecationWarning) if isinstance(verify, Path): verify = str(verify) if isinstance(credentials, dict): credentials_parameter_as_dict_deprecated_warning = ( "`credentials` parameter as dict is deprecated. " "Use `ibm_watsonx_ai.Credentials` for passing parameters." ) warn(credentials_parameter_as_dict_deprecated_warning, category=DeprecationWarning) # fmt: skip credentials = Credentials.from_dict(credentials, _verify=verify) if project_id is not None and space_id is not None: raise WMLClientError( "`project_id` parameter and `space_id` parameter cannot be set at the same time." ) # At this stage `credentials` has type ibm_watsonx_ai.credentials.Credentials credentials = cast(Credentials, credentials) credentials._set_env_vars_from_credentials() self._scope_validation = kwargs.get("scope_validation", True) if not self._scope_validation: warn_msg = """ When setting `scope_validation` to False, the following parameters need to be set manually or when initializing APIClient instance: - `is_git_based_project` (default: False) - `WCA` (default: False) Furthermore, when trying to get details of associated service, one need to pass instance_id explicitly to `APIClient.service_instance.get_details` method. """ warn(warn_msg) if credentials.proxies is not None: # Validate that proxies is a dictionary if not isinstance(credentials.proxies, dict): # Trigger the same error that would occur when trying to use .get() on a non-dict _ = credentials.proxies.get( "http" ) # This will raise AttributeError for non-dict types httpx_wrapper.additional_settings["proxies"] = credentials.proxies elif httpx_wrapper.additional_settings.get("proxies") is not None: del httpx_wrapper.additional_settings["proxies"] self.credentials = copy.deepcopy(credentials) self._default_space_id: str | None = None self._default_project_id: str | None = None self._project_type: str | None = None self.CLOUD_PLATFORM_SPACES = False self.PLATFORM_URL: str | None = None self.version_param = self._get_api_version_param() self.ICP_PLATFORM_SPACES = False # This will be applicable for 3.5 and later and specific to convergence functionalities self.CPD_version = CPDVersion() self._iam_id = None self._spec_ids_per_state: dict = {} self.generate_ux_tag = True self.is_git_based_project = kwargs.get("is_git_based_project", False) self.WCA: bool = kwargs.get("WCA", False) self._user_headers: dict | None = None # Used in set_headers() method # Create instance-specific copy of PLATFORM_URLS_MAP self._platform_urls_map: dict[str, str] = dict(APIClient.PLATFORM_URLS_MAP) if credentials is None: raise NoWMLCredentialsProvided() if self.credentials.url is None: raise WMLClientError(Messages.get_message(message_id="url_not_provided")) if not self.credentials.url.startswith("https://"): raise WMLClientError(Messages.get_message(message_id="invalid_url")) if self.credentials.url[-1] == "/": self.credentials.url = self.credentials.url.rstrip("/") # check whether it is Gov Cloud _validate_gov_cloud_env(cast(str, credentials.url), self._logger) if isinstance(httpx_client, HttpClientConfig): self._httpx_client: httpx.Client = _get_httpx_client( transport=_httpx_transport_params(self, limits=httpx_client.limits), timeout=httpx_client.timeout, ) else: self._httpx_client = httpx_client if isinstance(async_httpx_client, HttpClientConfig): self._async_httpx_client: httpx.AsyncClient = _get_async_httpx_client( transport=_httpx_async_transport_params( self, limits=async_httpx_client.limits ), timeout=async_httpx_client.timeout, ) else: self._async_httpx_client = async_httpx_client if self.credentials.instance_id is not None: warn( "The `instance_id` parameter is deprecated and will no longer be utilized. " "It is not considered in environment detection. " "The environment type, whether Cloud or CPD, is now automatically determined from " "the `credentials.url` parameter. Please update your configuration accordingly.", category=DeprecationWarning, ) parsed_url = urlparse(self.credentials.url) url_base = f"{parsed_url.scheme}://{parsed_url.hostname}" is_cloud_url = ( url_base in self._platform_urls_map or url_base in self._platform_urls_map.values() ) if is_cloud_url or self.credentials.platform_url: self.CLOUD_PLATFORM_SPACES = True self.ICP_PLATFORM_SPACES = False if self._internal: self.PLATFORM_URL = self.credentials.url elif self.credentials.platform_url: if not self.credentials.platform_url.startswith("https://"): raise WMLClientError( Messages.get_message(message_id="invalid_platform_url") ) self.PLATFORM_URL = self.credentials.platform_url elif self.credentials.url in self._platform_urls_map: self.PLATFORM_URL = self._platform_urls_map[self.credentials.url] else: raise WMLClientError( Messages.get_message(message_id="invalid_url_provided") ) if not self._is_IAM(): raise WMLClientError( Messages.get_message(message_id="apikey_not_provided") ) else: self.CLOUD_PLATFORM_SPACES = False self.ICP_PLATFORM_SPACES = True os.environ["DEPLOYMENT_PLATFORM"] = "private" try: response_get_wml_services = self.httpx_client.get( f"{self.credentials.url}/ml/wml_services/v2/version", headers={"User-Agent": get_user_agent_header()}, ) except Exception as e: if isinstance(e, httpx.ConnectError): raise WMLClientError( Messages.get_message(message_id="invalid_url_provided") ) from e raise if response_get_wml_services.status_code == 200: wml_full_version = response_get_wml_services.json().get("version", "") if wml_full_version: wml_version = ".".join(wml_full_version.split(".")[:2]) if self.credentials.version is None: self.credentials.version = wml_version elif self.credentials.version != wml_version: cpd_version_mismatch_warning = ( f"The provided version: {self.credentials.version} " f"is different from the current CP4D version: {wml_version}. " f"Correct the credentials with proper CP4D version number." ) warn(cpd_version_mismatch_warning) if ( self.credentials.version not in CPDVersion.supported_version_list ): raise WMLClientError( Messages.get_message( self.credentials.version, self.version, message_id="invalid_version_from_automated_check", ) ) else: self._logger.debug( "GET /ml/wml_services/v2/version failed with status code: %s.", response_get_wml_services.status_code, ) if ( response_get_wml_services.status_code >= 500 ): # raise the error only if hostname is not reachable raise WMLClientError( Messages.get_message(message_id="invalid_url_provided") ) # Condition for CAMS related changes to take effect (Might change) if self.credentials.version is None: raise WMLClientError( Messages.get_message( CPDVersion.supported_version_list, message_id="version_not_provided", ) ) if self.credentials.version.lower() in CPDVersion.supported_version_list: self.CPD_version.cpd_version = self.credentials.version.lower() os.environ["DEPLOYMENT_PRIVATE"] = "icp4d" if self.credentials.bedrock_url is None and self.CPD_version: namespace_from_url = "-".join( self.credentials.url.split(".")[0].split("-")[1:] ) route = "cpd" if self.CPD_version >= 5.1 else "cp-console" bedrock_prefix = f"https://{route}-{namespace_from_url}" self.credentials.bedrock_url = ".".join( [bedrock_prefix] + self.credentials.url.split(".")[1:] ) self._is_bedrock_url_autogenerated = True else: self.ICP_PLATFORM_SPACES = False raise WMLClientError( Messages.get_message( ", ".join(CPDVersion.supported_version_list), message_id="invalid_version", ) ) self._href_definitions = _create_href_definitions(self) self._auth_method = get_auth_method(self) self._auth_method.get_token() # For cloud, service_instance.details will be set during space creation( if instance is associated ) or # while patching a space with an instance self._service_instance: ServiceInstance | None = None self._set: Set | None = None self.__ai_services: AIServices | None = None self._wml_full_version = wml_full_version if project_id: if self._scope_validation: self.set.default_project(project_id) # recognizes project type else: self.default_project_id = project_id elif space_id: if self._scope_validation: self.set.default_space(space_id) else: self.default_space_id = space_id self._logger.info( Messages.get_message(message_id="client_successfully_initialized") ) @property def service_instance(self) -> ServiceInstance: if self._service_instance is None: self._service_instance = ServiceInstance(self) if self.ICP_PLATFORM_SPACES: self._service_instance._refresh_details = True return self._service_instance @service_instance.setter def service_instance(self, value: ServiceInstance) -> None: self._service_instance = value @cached_property def volumes(self) -> Volume: return Volume(self) @cached_property def foundation_models(self) -> FoundationModelsManager: return FoundationModelsManager(self) @cached_property def set(self) -> Set: return Set(self) @cached_property def spaces(self) -> Spaces: return Spaces(self) @cached_property def projects(self) -> Projects: return Projects(self) @cached_property def export_assets(self) -> Export: return Export(self) @cached_property def import_assets(self) -> Import: return Import(self) @cached_property def shiny(self) -> Shiny: if not self.ICP_PLATFORM_SPACES: raise WMLClientError("Shiny is only available for ICP platform spaces") return Shiny(self) @cached_property def trashed_assets(self) -> TrashedAssets: if not self.ICP_PLATFORM_SPACES: raise WMLClientError( "Trashed assets is only available for ICP platform spaces" ) return TrashedAssets(self) @cached_property def runtime_definitions(self) -> RuntimeDefinitions: if not self.ICP_PLATFORM_SPACES: raise WMLClientError( "Runtime definitions is only available for ICP platform spaces" ) return RuntimeDefinitions(self) @cached_property def script(self) -> Script: return Script(self) @cached_property def model_definitions(self) -> ModelDefinition: return ModelDefinition(self) @cached_property def package_extensions(self) -> PkgExtn: return PkgExtn(self) @cached_property def software_specifications(self) -> SwSpec: return SwSpec(self) @cached_property def hardware_specifications(self) -> HwSpec: return HwSpec(self) @cached_property def connections(self) -> Connections: return Connections(self) @cached_property def training(self) -> Training: return Training(self) @cached_property def data_assets(self) -> Assets: return Assets(self) @cached_property def folder_assets(self) -> FolderAssets: return FolderAssets(self) @cached_property def deployments(self) -> Deployments: return Deployments(self) @cached_property def factsheets(self) -> Factsheets: if not self.CLOUD_PLATFORM_SPACES: raise WMLClientError( "Factsheets is only available for Cloud platform spaces" ) return Factsheets(self) @cached_property def task_credentials(self) -> TaskCredentials: if not self.CLOUD_PLATFORM_SPACES: raise WMLClientError( "Task credentials is only available for Cloud platform spaces" ) return TaskCredentials(self) @cached_property def repository(self) -> Repository: return Repository(self) @cached_property def _models(self) -> Models: return Models(self) @cached_property def pipelines(self) -> Pipelines: return Pipelines(self) @cached_property def experiments(self) -> Experiments: return Experiments(self) @cached_property def _functions(self) -> Functions: return Functions(self) @cached_property def parameter_sets(self) -> ParameterSets: return ParameterSets(self) @property def default_space_id(self) -> str | None: return self._default_space_id @default_space_id.setter def default_space_id(self, value: str | None) -> None: self._default_space_id = value @property def default_project_id(self) -> str | None: return self._default_project_id @default_project_id.setter def default_project_id(self, value: str | None) -> None: self._default_project_id = value
[docs] def get_copy(self) -> APIClient: """Prepares clean copy of APIClient. The clean copy contains no token, password, api key data. It is used in AI services scenarios, when the client is used in deployed code, and can be reused between users. The copy needs to be set with current user token in the inner function of AI service. :returns: APIClient which is 2-level copy of the current one, without user secrets :rtype: APIClient **Example:** .. code-block:: python def deployable_ai_service(context, params={"k1": "v1"}, **kwargs): # imports from ibm_watsonx_ai import Credentials, APIClient from ibm_watsonx_ai.foundation_models import ModelInference task_token = context.generate_token() outer_context = context client = APIClient( Credentials( url="https://us-south.ml.cloud.ibm.com", token=task_token ) ) # operations with client def generate(context): user_client = client.get_copy() user_client.set_token(context.generate_token()) # operations with user_client return {"body": response_body} return generate stored_ai_service_details = client._ai_services.store( deployable_ai_service, meta_props ) """ excluded = [ "_href_definitions", "_httpx_client", "_async_httpx_client", ] client_copy = copy.copy(self) for key, value in client_copy.__dict__.items(): if key in excluded: continue client_copy.__dict__[key] = copy.copy(value) if ( hasattr(client_copy.__dict__[key], "__dict__") and "_client" in client_copy.__dict__[key].__dict__ ): client_copy.__dict__[key].__dict__["_client"] = client_copy client_copy._auth_method = TokenRemovedDuringClientCopyPlaceholder() from ibm_watsonx_ai.libs.repo.mlrepositoryclient import MLRepositoryClient client_copy.repository._ml_repository_client = MLRepositoryClient( client_copy.credentials.url ) client_copy.credentials.api_key = None client_copy.credentials.password = None return client_copy
@property def wml_credentials(self) -> dict[str, str]: wml_credentials_attribute_deprecated = ( "`wml_credentials` attribute is deprecated, " "please use `client.credentials` instead" ) warn(wml_credentials_attribute_deprecated, DeprecationWarning) return self.credentials.to_dict() @wml_credentials.setter def wml_credentials(self, value: dict[str, str]) -> None: wml_credentials_attribute_deprecated = ( "`wml_credentials` attribute is deprecated, " "please use `client.credentials` instead" ) warn(wml_credentials_attribute_deprecated, DeprecationWarning) self.credentials = Credentials.from_dict(value) @property def wml_token(self) -> str | None: wml_token_attribute_deprecated = ( "`wml_token` attribute is deprecated, please use `client.token` instead" ) warn(wml_token_attribute_deprecated, DeprecationWarning) return self.token @wml_token.setter def wml_token(self, value: str) -> None: wml_token_attribute_deprecated = ( "`wml_token` attribute is deprecated, please use `client.token` instead" ) warn(wml_token_attribute_deprecated, DeprecationWarning) self.token = value @property def token(self) -> str: return self._auth_method.get_token() @token.setter def token(self, value: str) -> None: self._auth_method._token = value @property def _ai_services(self) -> AIServices: if self.CLOUD_PLATFORM_SPACES or ( self.CPD_version >= 5.1 and self._is_ai_services_endpoint_available() ): if self.__ai_services is None: if self.CPD_version < 5.1 or self._wml_full_version == "5.1.0": raise WMLClientError( error_msg="AI service is unsupported for this release." ) self.__ai_services = AIServices(self) return self.__ai_services else: raise WMLClientError( error_msg="AI service is unsupported for this release." ) @property def proceed(self) -> bool: warn( ( "`APIClient.proceed` is deprecated and will be removed in future. To use `proceed` scenario, " "pass `token` into credentials without `apikey` or `password`, or use `APIClient.set_token` function." ), category=DeprecationWarning, ) return isinstance(self._auth_method, TokenAuth) @property def is_git_based_project(self) -> bool: return self.project_type == "local_git_storage" @is_git_based_project.setter def is_git_based_project(self, value: bool) -> None: if value is True: self.project_type = "local_git_storage" elif value is False and self.is_git_based_project: self.project_type = None @property def project_type(self) -> str | None: return self._project_type @project_type.setter def project_type(self, value: str | None) -> None: self._project_type = value if hasattr(self, "_href_definitions") and self._project_type is not None: self._href_definitions.project_type = ( self._project_type ) # update information about project type in HrefDefinition @property def httpx_client(self) -> httpx.Client: return self._httpx_client @httpx_client.setter def httpx_client(self, value: httpx.Client) -> None: if hasattr(self, "_httpx_client") and self._httpx_client is not None: self._httpx_client.close() self._httpx_client = value @property def async_httpx_client(self) -> httpx.AsyncClient: return self._async_httpx_client @async_httpx_client.setter def async_httpx_client(self, value: httpx.AsyncClient) -> None: old_async_httpx_client = self._async_httpx_client self._async_httpx_client = value if old_async_httpx_client: asyncio.create_task(old_async_httpx_client.aclose()) @staticmethod def _get_api_version_param() -> str: try: file_name = "API_VERSION_PARAM" path = Path(ibm_watsonx_ai.utils.__file__).parent return (path / file_name).read_text().strip() except Exception: return "2021-06-21" def _check_if_either_is_set(self) -> None: if self.default_space_id is None and self.default_project_id is None: raise WMLClientError( Messages.get_message( message_id="it_is_mandatory_to_set_the_space_project_id" ) ) def _check_if_space_is_set(self) -> None: if self.default_space_id is None: raise WMLClientError( Messages.get_message(message_id="it_is_mandatory_to_set_the_space_id") ) def _params( self, skip_space_project_chk: bool = False, skip_for_create: bool = False, skip_userfs: bool = False, ) -> dict: params = {} params.update({"version": self.version_param}) if not skip_for_create: if self.default_space_id is not None: params.update({"space_id": self.default_space_id}) elif self.default_project_id is not None: params.update({"project_id": self.default_project_id}) else: # For system software/hardware specs if skip_space_project_chk is False: raise WMLClientError( Messages.get_message( message_id="it_is_mandatory_to_set_the_space_project_id" ) ) if self.default_project_id and self.is_git_based_project and not skip_userfs: params.update({"userfs": "true"}) if self._iam_id: params.update({"iam_id": str(self._iam_id)}) if ( not self.default_project_id or not self.is_git_based_project or skip_userfs ) and "userfs" in params: del params["userfs"] return params def _get_headers( self, content_type: str = "application/json", no_content_type: bool = False, zen: bool = False, projects_token: bool = False, _token: str | None = None, ) -> dict: headers = {} if not no_content_type: headers["Content-Type"] = content_type if projects_token and self.credentials.projects_token is not None: token_to_use = self.credentials.projects_token elif _token is not None: token_to_use = _token else: token_to_use = self.token if len(token_to_use.split(".")) == 1: headers["Authorization"] = f"Basic {token_to_use}" else: headers["Authorization"] = f"Bearer {token_to_use}" if not zen: headers["User-Agent"] = get_user_agent_header() if not self.generate_ux_tag: headers["X-WX-UX"] = "true" self.generate_ux_tag = True if self.WCA: headers["IBM-WATSONXAI-CONSUMER"] = "wca" if client_headers_env := os.environ.get("IBM_SDK_API_CLIENT_HEADERS"): headers.update( json.loads(base64.b64decode(client_headers_env).decode("utf-8")) ) if self._user_headers: headers.update(self._user_headers) return headers async def _aget_headers( self, content_type: str = "application/json", no_content_type: bool = False, zen: bool = False, projects_token: bool = False, ) -> dict: return self._get_headers( content_type, no_content_type, zen, projects_token, _token=await self._auth_method.aget_token(), )
[docs] def get_headers( self, content_type: str | None = "application/json", include_user_agent: bool = False, ) -> dict: """Get HTTP headers used during requests. :param content_type: value for `Content-Type` header, defaults to `application/json` :type name: str, optional :param include_user_agent: whether the result should include `User-Agent` header, defaults to `False` :type name: bool, optional :return: headers used during requests :rtype: dict """ return self._get_headers( content_type=content_type or "", no_content_type=content_type is None, zen=not include_user_agent, )
[docs] def set_token(self, token: str) -> None: """ Method which allows refresh/set new User Authorization Token. .. note:: * Using this function will cause that token will not be automatically refreshed anymore, if `password` or `apikey` were passed. The user needs to take care of token refresh using `set_token` function from that point in time until they finish using the client instance. * If ``trusted_profile_id`` and ``token`` were passed in credentials, the ``trusted_profile_id`` will be used for generating a new trusted profile token based on token passed to this method until the client lifecycle. The generating process takes place when retrieving a token. :param token: User Authorization Token :type token: str **Examples** .. code-block:: python client.set_token("<USER AUTHORIZATION TOKEN>") """ self.credentials.token = token if isinstance(self._auth_method, TokenAuth) or ( isinstance(self._auth_method, TrustedProfileAuth) and isinstance(self._auth_method._internal_auth_method, TokenAuth) ): self._auth_method.set_token(token) else: # the auth method type was changed to TokenAuth authentication_method_changed_warning = ( "Authentication method changed to TokenAuth. " "The token will not be automatically refreshed from this point of time. " "Use `APIClient.set_token` function to manually update token." ) warn(authentication_method_changed_warning) self._auth_method = TokenAuth(token=token) self._auth_method._on_token_set = self.repository._refresh_repo_client self._auth_method._on_token_set()
[docs] def set_headers(self, headers: dict) -> None: """ Method which allows refresh/set new User Request Headers. :param headers: User Request Headers :type headers: dict **Examples** .. code-block:: python headers = { "Authorization": "Bearer <USER AUTHORIZATION TOKEN>", "User-Agent": "ibm-watsonx-ai/1.0.1 (lang=python; arch=x86_64; os=darwin; python.version=3.10.13)", "Content-Type": "application/json", } client.set_headers(headers) """ self._user_headers = headers
def _get_icptoken(self) -> str: return self.token def _is_default_space_set(self) -> bool: if self.default_space_id is not None: return True return False def _is_IAM(self) -> bool: if self.credentials.api_key is not None: if self.credentials.api_key != "": return True else: raise WMLClientError( Messages.get_message(message_id="apikey_value_cannot_be_empty") ) elif self.credentials.token is not None: if self.credentials.token != "": return True else: raise WMLClientError( Messages.get_message(message_id="token_value_cannot_be_empty") ) else: return False def _is_ai_services_endpoint_available(self) -> bool: try: url = self._href_definitions.get_ai_services_href() response_ai_services_api = self.httpx_client.get( url=f"{url}?limit=1", params=self._params(), headers=self._get_headers(), ) return response_ai_services_api.status_code != 404 except Exception: return False