Source code for ibm_watsonx_ai.deployments

#  -----------------------------------------------------------------------------------------
#  (C) Copyright IBM Corp. 2023-2024.
#  https://opensource.org/licenses/BSD-3-Clause
#  -----------------------------------------------------------------------------------------

from __future__ import annotations
from typing import (
    Literal,
    Iterable,
    Callable,
    Any,
    cast,
    TYPE_CHECKING,
    NoReturn,
    Generator,
)
import numpy as np
import json
from warnings import warn

import ibm_watsonx_ai._wrappers.requests as requests
from ibm_watsonx_ai.utils import (
    print_text_header_h1,
    print_text_header_h2,
    StatusLogger,
)
from ibm_watsonx_ai.utils.utils import _get_id_from_deprecated_uid
from ibm_watsonx_ai.wml_client_error import (
    WMLClientError,
    MissingValue,
    InvalidValue,
)
from ibm_watsonx_ai.href_definitions import is_id
from ibm_watsonx_ai.wml_resource import WMLResource
from ibm_watsonx_ai.messages.messages import Messages
from ibm_watsonx_ai.metanames import (
    ScoringMetaNames,
    DecisionOptimizationMetaNames,
    DeploymentMetaNames,
)
from ibm_watsonx_ai.libs.repo.util.library_imports import LibraryChecker
from ibm_watsonx_ai.utils.autoai.utils import all_logging_disabled

if TYPE_CHECKING:
    from ibm_watsonx_ai import APIClient
    from ibm_watsonx_ai.lifecycle import SpecStates
    import pandas

lib_checker = LibraryChecker()


[docs] class Deployments(WMLResource): """Deploy and score published artifacts (models and functions).""" def __init__(self, client: APIClient): WMLResource.__init__(self, __name__, client) self.ConfigurationMetaNames = DeploymentMetaNames() self.ScoringMetaNames = ScoringMetaNames() self.DecisionOptimizationMetaNames = DecisionOptimizationMetaNames() def _deployment_status_errors_handling( self, deployment_details: dict, operation_name: str, deployment_id: str ) -> NoReturn: try: if "failure" in deployment_details["entity"]["status"]: errors = deployment_details["entity"]["status"]["failure"]["errors"] for error in errors: if type(error) == str: try: error_obj = json.loads(error) print(error_obj["message"]) except: print(error) elif type(error) == dict: print(error["message"]) else: print(error) raise WMLClientError( "Deployment " + operation_name + " failed for deployment id: " + deployment_id + ". Errors: " + str(errors) ) else: print(deployment_details["entity"]["status"]) raise WMLClientError( "Deployment " + operation_name + " failed for deployment id: " + deployment_id + ". Error: " + str(deployment_details["entity"]["status"]["state"]) ) except WMLClientError as e: raise e except Exception as e: self._logger.debug("Deployment " + operation_name + " failed: " + str(e)) print(deployment_details["entity"]["status"]["failure"]) raise WMLClientError( "Deployment " + operation_name + " failed for deployment id: " + deployment_id + "." ) # TODO model_id and artifact_id should be changed to artifact_id only
[docs] def create( self, artifact_id: str | None = None, meta_props: dict | None = None, rev_id: str | None = None, **kwargs: dict, ) -> dict: """Create a deployment from an artifact. An artifact is a model or function that can be deployed. :param artifact_id: ID of the published artifact (the model or function ID) :type artifact_id: str :param meta_props: meta props. To see the available list of meta names, use: .. code-block:: python client.deployments.ConfigurationMetaNames.get() :type meta_props: dict, optional :param rev_id: revision ID of the deployment :type rev_id: str, optional :return: metadata of the created deployment :rtype: dict **Example** .. code-block:: python meta_props = { client.deployments.ConfigurationMetaNames.NAME: "SAMPLE DEPLOYMENT NAME", client.deployments.ConfigurationMetaNames.ONLINE: {}, client.deployments.ConfigurationMetaNames.HARDWARE_SPEC : { "id": "e7ed1d6c-2e89-42d7-aed5-8sb972c1d2b"}, client.deployments.ConfigurationMetaNames.SERVING_NAME : 'sample_deployment' } deployment_details = client.deployments.create(artifact_id, meta_props) """ artifact_id = _get_id_from_deprecated_uid( kwargs=kwargs, resource_id=artifact_id, resource_name="artifact" ) # Backward compatibility in past `rev_id` was an int. if isinstance(rev_id, int): warn( f"`rev_id` parameter type as int is deprecated, please convert to str instead", category=DeprecationWarning, ) rev_id = str(rev_id) Deployments._validate_type(artifact_id, "artifact_id", str, True) if self._client.ICP_PLATFORM_SPACES: predictionUrl = self._credentials.url if meta_props is None: raise WMLClientError("Invalid input. meta_props can not be empty.") if self._client.CLOUD_PLATFORM_SPACES and "r_shiny" in meta_props: raise WMLClientError("Shiny is not supported in this release") if self._client.CPD_version >= 4.8 or self._client.CLOUD_PLATFORM_SPACES: from ibm_watsonx_ai.foundation_models.utils.enums import ModelTypes base_model_id = meta_props.get(self.ConfigurationMetaNames.BASE_MODEL_ID) if isinstance(base_model_id, ModelTypes): meta_props[self.ConfigurationMetaNames.BASE_MODEL_ID] = ( base_model_id.value ) metaProps = self.ConfigurationMetaNames._generate_resource_metadata(meta_props) if ( "serving_name" in str(metaProps) and meta_props.get("serving_name", False) and "r_shiny" in str(metaProps) ): if "parameters" in metaProps["r_shiny"]: metaProps["r_shiny"]["parameters"]["serving_name"] = meta_props[ "serving_name" ] else: metaProps["r_shiny"]["parameters"] = { "serving_name": meta_props["serving_name"] } if "online" in metaProps: del metaProps["online"] if "wml_instance_id" in meta_props: metaProps.update({"wml_instance_id": meta_props["wml_instance_id"]}) ##Check if default space is set metaProps["asset"] = ( metaProps.get("asset") if metaProps.get("asset") else {"id": artifact_id} ) if rev_id is not None: metaProps["asset"].update({"rev": rev_id}) if self._client.default_project_id: metaProps["project_id"] = self._client.default_project_id else: metaProps["space_id"] = self._client.default_space_id # note: checking if artifact_id points to prompt_template if self._client.CPD_version >= 4.8 or self._client.CLOUD_PLATFORM_SPACES: with all_logging_disabled(): try: from ibm_watsonx_ai.foundation_models.prompts import ( PromptTemplateManager, ) model_id = ( PromptTemplateManager(api_client=self._client) .load_prompt(artifact_id) .model_id ) except Exception: pass # Foundation models scenario should not impact other ML models' deployment scenario. else: metaProps.pop("asset") metaProps["prompt_template"] = {"id": artifact_id} if ( DeploymentMetaNames.BASE_MODEL_ID not in metaProps and DeploymentMetaNames.BASE_DEPLOYMENT_ID not in metaProps ): metaProps.update({DeploymentMetaNames.BASE_MODEL_ID: model_id}) # --- end note url = self._client.service_instance._href_definitions.get_deployments_href() response = requests.post( url, json=metaProps, params=self._client._params(), # version is mandatory headers=self._client._get_headers(), ) ## Post Deployment call executed if response.status_code == 202: deployment_details = response.json() if self._client.ICP_PLATFORM_SPACES: if "online_url" in deployment_details["entity"]["status"]: scoringUrl = ( deployment_details.get("entity") .get("status") .get("online_url") .get("url") .replace("https://ibm-nginx-svc:443", predictionUrl) ) deployment_details["entity"]["status"]["online_url"][ "url" ] = scoringUrl deployment_id = self.get_id(deployment_details) import time print_text_header_h1( "Synchronous deployment creation for id: '{}' started".format( artifact_id ) ) status = deployment_details["entity"]["status"]["state"] notifications = [] with StatusLogger(status) as status_logger: while True: time.sleep(5) deployment_details = self._client.deployments.get_details( deployment_id, _silent=True ) # this is wrong , needs to update for ICP if "system" in deployment_details: notification = deployment_details["system"]["warnings"][0][ "message" ] if notification not in notifications: print("\nNote: " + notification) notifications.append(notification) status = deployment_details["entity"]["status"]["state"] status_logger.log_state(status) if status != "DEPLOY_IN_PROGRESS" and status != "initializing": break if status == "DEPLOY_SUCCESS" or status == "ready": print("") print_text_header_h2( "Successfully finished deployment creation, deployment_id='{}'".format( deployment_id ) ) return deployment_details else: print_text_header_h2("Deployment creation failed") self._deployment_status_errors_handling( deployment_details, "creation", deployment_id ) else: error_msg = "Deployment creation failed" reason = response.text print(reason) print_text_header_h2(error_msg) raise WMLClientError( error_msg + ". Error: " + str(response.status_code) + ". " + reason )
[docs] @staticmethod def get_uid(deployment_details: dict) -> str: """Get deployment_uid from the deployment details. *Deprecated:* Use ``get_id(deployment_details)`` instead. :param deployment_details: metadata of the deployment :type deployment_details: dict :return: deployment UID that is used to manage the deployment :rtype: str **Example** .. code-block:: python deployment_uid = client.deployments.get_uid(deployment) """ warn( ( "`get_uid()` is deprecated and will be removed in future. " "Instead, please use `get_id()`." ), category=DeprecationWarning, ) return Deployments.get_id(deployment_details)
[docs] @staticmethod def get_id(deployment_details: dict) -> str: """Get the deployment ID from the deployment details. :param deployment_details: metadata of the deployment :type deployment_details: dict :return: deployment ID that is used to manage the deployment :rtype: str **Example** .. code-block:: python deployment_id = client.deployments.get_id(deployment) """ Deployments._validate_type(deployment_details, "deployment_details", dict, True) try: if "id" in deployment_details["metadata"]: id = deployment_details.get("metadata", {}).get("id") else: id = deployment_details.get("metadata", {}).get("guid") except Exception as e: raise WMLClientError( "Getting deployment ID from deployment details failed.", e ) if id is None: raise MissingValue("deployment_details.metadata.id") return id
[docs] @staticmethod def get_href(deployment_details: dict) -> str: """Get deployment_href from the deployment details. :param deployment_details: metadata of the deployment :type deployment_details: dict :return: deployment href that is used to manage the deployment :rtype: str **Example** .. code-block:: python deployment_href = client.deployments.get_href(deployment) """ Deployments._validate_type(deployment_details, "deployment_details", dict, True) try: if "href" in deployment_details["metadata"]: url = deployment_details.get("metadata", {}).get("href") else: url = "/ml/v4/deployments/{}".format( deployment_details["metadata"]["id"] ) except Exception as e: raise WMLClientError( "Getting deployment url from deployment details failed.", e ) if url is None: raise MissingValue("deployment_details.metadata.href") return url
def _get_serving_name_info(self, serving_name: str) -> tuple: """Get info about the serving name :param serving_name: serving name that filters deployments :type serving_name: str :return: information about the serving name: (<status_code>, <response json if any>) :rtype: tuple **Example** .. code-block:: python is_available = client.deployments.is_serving_name_available('test') """ params = { "serving_name": serving_name, "conflict": "true", "version": self._client.version_param, } url = self._client.service_instance._href_definitions.get_deployments_href() res = requests.get(url, headers=self._client._get_headers(), params=params) if res.status_code == 409: response = res.json() else: response = None return (res.status_code, response)
[docs] def is_serving_name_available(self, serving_name: str) -> bool: """Check if the serving name is available for use. :param serving_name: serving name that filters deployments :type serving_name: str :return: information about whether the serving name is available :rtype: bool **Example** .. code-block:: python is_available = client.deployments.is_serving_name_available('test') """ status_code, _ = self._get_serving_name_info(serving_name) return status_code != 409
[docs] def get_details( self, deployment_id: str | None = None, serving_name: str | None = None, limit: int | None = None, asynchronous: bool = False, get_all: bool = False, spec_state: SpecStates | None = None, _silent: bool = False, **kwargs: Any, ) -> dict: """Get information about deployment(s). If deployment_id is not passed, all deployment details are returned. :param deployment_id: unique ID of the deployment :type deployment_id: str, optional :param serving_name: serving name that filters deployments :type serving_name: str, optional :param limit: limit number of fetched records :type limit: int, optional :param asynchronous: if True, it will work as a generator :type asynchronous: bool, optional :param get_all: if True, it will get all entries in 'limited' chunks :type get_all: bool, optional :param spec_state: software specification state, can be used only when `deployment_id` is None :type spec_state: SpecStates, optional :return: metadata of the deployment(s) :rtype: dict (if deployment_id is not None) or {"resources": [dict]} (if deployment_id is None) **Example** .. code-block:: python deployment_details = client.deployments.get_details(deployment_id) deployment_details = client.deployments.get_details(deployment_id=deployment_id) deployments_details = client.deployments.get_details() deployments_details = client.deployments.get_details(limit=100) deployments_details = client.deployments.get_details(limit=100, get_all=True) deployments_details = [] for entry in client.deployments.get_details(limit=100, asynchronous=True, get_all=True): deployments_details.extend(entry) """ deployment_id = _get_id_from_deprecated_uid( kwargs=kwargs, resource_id=deployment_id, resource_name="deployment", can_be_none=True, ) if not self._client.CLOUD_PLATFORM_SPACES and self._client.CPD_version < 4.8: self._client._check_if_space_is_set() Deployments._validate_type(deployment_id, "deployment_id", str, False) if deployment_id is not None and not is_id(deployment_id): raise WMLClientError( "'deployment_id' is not an id: '{}'".format(deployment_id) ) url = self._client.service_instance._href_definitions.get_deployments_href() query_params = self._client._params() if serving_name: query_params["serving_name"] = serving_name if deployment_id is None: filter_func = ( self._get_filter_func_by_spec_state(spec_state) if spec_state else None ) deployment_details = self._get_artifact_details( url, deployment_id, limit, "deployments", query_params=query_params, _async=asynchronous, _all=get_all, _filter_func=filter_func, ) else: deployment_details = self._get_artifact_details( url, deployment_id, limit, "deployments", query_params=query_params ) if ( not isinstance(deployment_details, Generator) and "system" in deployment_details and not _silent ): print("Note: " + deployment_details["system"]["warnings"][0]["message"]) return deployment_details
[docs] @staticmethod def get_scoring_href(deployment_details: dict) -> str: """Get scoring URL from deployment details. :param deployment_details: metadata of the deployment :type deployment_details: dict :return: scoring endpoint URL that is used to make scoring requests :rtype: str **Example** .. code-block:: python scoring_href = client.deployments.get_scoring_href(deployment) """ Deployments._validate_type(deployment_details, "deployment", dict, True) scoring_url = None try: url = deployment_details["entity"]["status"].get("online_url") if url is not None: scoring_url = deployment_details["entity"]["status"]["online_url"][ "url" ] else: raise MissingValue( "Getting scoring url for deployment failed. This functionality is available only for sync deployments" ) except Exception as e: raise WMLClientError( "Getting scoring url for deployment failed. This functionality is available only for sync deployments", e, ) if scoring_url is None: raise MissingValue("scoring_url missing in online_predictions") return scoring_url
[docs] @staticmethod def get_serving_href(deployment_details: dict) -> str: """Get serving URL from the deployment details. :param deployment_details: metadata of the deployment :type deployment_details: dict :return: serving endpoint URL that is used to make scoring requests :rtype: str **Example** .. code-block:: python scoring_href = client.deployments.get_serving_href(deployment) """ Deployments._validate_type(deployment_details, "deployment", dict, True) try: serving_name = ( deployment_details["entity"]["online"] .get("parameters") .get("serving_name") ) serving_url = [ url for url in deployment_details["entity"] .get("status") .get("serving_urls") if serving_name == url.split("/")[-2] ][0] if serving_url: return serving_url else: raise MissingValue( "Getting serving url for deployment failed. This functionality is available only for sync deployments with serving name." ) except Exception as e: raise WMLClientError( "Getting serving url for deployment failed. This functionality is available only for sync deployments with serving name.", e, )
[docs] def delete(self, deployment_id: str | None = None, **kwargs: Any) -> str: """Delete deployment. :param deployment_id: unique ID of the deployment :type deployment_id: str :return: status ("SUCCESS" or "FAILED") :rtype: str **Example** .. code-block:: python client.deployments.delete(deployment_id) """ deployment_id = _get_id_from_deprecated_uid( kwargs=kwargs, resource_id=deployment_id, resource_name="deployment" ) if not self._client.CLOUD_PLATFORM_SPACES and self._client.CPD_version < 4.8: self._client._check_if_space_is_set() Deployments._validate_type(deployment_id, "deployment_id", str, True) if deployment_id is not None and not is_id(deployment_id): raise WMLClientError( "'deployment_id' is not an id: '{}'".format(deployment_id) ) deployment_url = ( self._client.service_instance._href_definitions.get_deployment_href( deployment_id ) ) response_delete = requests.delete( deployment_url, params=self._client._params(), headers=self._client._get_headers(), ) return self._handle_response(204, "deployment deletion", response_delete, False)
[docs] def score( self, deployment_id: str, meta_props: dict, transaction_id: str | None = None ) -> dict: """Make scoring requests against the deployed artifact. :param deployment_id: unique ID of the deployment to be scored :type deployment_id: str :param meta_props: meta props for scoring, use ``client.deployments.ScoringMetaNames.show()`` to view the list of ScoringMetaNames :type meta_props: dict :param transaction_id: transaction ID to be passed with the records during payload logging :type transaction_id: str, optional :return: scoring result that contains prediction and probability :rtype: dict .. note:: * *client.deployments.ScoringMetaNames.INPUT_DATA* is the only metaname valid for sync scoring. * The valid payloads for scoring input are either list of values, pandas or numpy dataframes. **Example** .. code-block:: python scoring_payload = {client.deployments.ScoringMetaNames.INPUT_DATA: [{'fields': ['GENDER','AGE','MARITAL_STATUS','PROFESSION'], 'values': [ ['M',23,'Single','Student'], ['M',55,'Single','Executive'] ] }] } predictions = client.deployments.score(deployment_id, scoring_payload) """ if not self._client.CLOUD_PLATFORM_SPACES and self._client.CPD_version < 4.8: self._client._check_if_space_is_set() Deployments._validate_type(deployment_id, "deployment_id", str, True) Deployments._validate_type(meta_props, "meta_props", dict, True) if meta_props.get(self.ScoringMetaNames.INPUT_DATA) is None: raise WMLClientError( "Scoring data input 'ScoringMetaNames.INPUT_DATA' is mandatory for synchronous " "scoring" ) scoring_data = meta_props[self.ScoringMetaNames.INPUT_DATA] if scoring_data is not None: score_payload = [] for each_score_request in scoring_data: lib_checker.check_lib(lib_name="pandas") import pandas as pd scoring_values = each_score_request["values"] # Check feature types, currently supporting pandas df, numpy.ndarray, python lists and Dmatrix if isinstance(scoring_values, pd.DataFrame): scoring_values = scoring_values.where( pd.notnull(scoring_values), None ) fields_names = scoring_values.columns.values.tolist() values = scoring_values.values.tolist() try: values[pd.isnull(values)] = None # note: above code fails when there is no null values in a dataframe except TypeError: pass each_score_request["values"] = values if fields_names is not None: each_score_request["fields"] = fields_names ## If payload is a numpy dataframe elif isinstance(scoring_values, np.ndarray): values = scoring_values.tolist() each_score_request["values"] = values score_payload.append(each_score_request) ##See if it is scoring or DecisionOptimizationJob payload = {} payload["input_data"] = score_payload if meta_props.get(self.ScoringMetaNames.SCORING_PARAMETERS) is not None: payload["scoring_parameters"] = meta_props.get( self.ScoringMetaNames.SCORING_PARAMETERS ) headers = self._client._get_headers() if transaction_id is not None: headers.update({"x-global-transaction-id": transaction_id}) scoring_url = ( self._credentials.url + "/ml/v4/deployments/" + deployment_id + "/predictions" ) params = self._client._params() del params["space_id"] response_scoring = requests.post( scoring_url, json=payload, params=params, # version parameter is mandatory headers=headers, ) return self._handle_response(200, "scoring", response_scoring)
#########################################
[docs] def get_download_url(self, deployment_details: dict) -> str: """Get deployment_download_url from the deployment details. :param deployment_details: created deployment details :type deployment_details: dict :return: deployment download URL that is used to get file deployment (for example: Core ML) :rtype: str **Example** .. code-block:: python deployment_url = client.deployments.get_download_url(deployment) """ if self._client.ICP_PLATFORM_SPACES: raise WMLClientError( "Downloading virtual deployment is no longer supported in Cloud Pak for Data, versions 3.5 and later." ) if self._client.CLOUD_PLATFORM_SPACES: raise WMLClientError( "Downloading virtual deployment is no longer supported in Cloud Pak for Data as a Service." ) Deployments._validate_type(deployment_details, "deployment_details", dict, True) try: virtual_deployment_detaails = ( deployment_details.get("entity", {}) .get("status", {}) .get("virtual_deployment_downloads") ) if virtual_deployment_detaails is not None: url = virtual_deployment_detaails[0].get("url") else: url = None except Exception as e: raise WMLClientError( "Getting download url from deployment details failed.", e ) if url is None: raise MissingValue( "deployment_details.entity.virtual_deployment_downloads.url" ) return url
[docs] def list( self, limit: int | None = None, artifact_type: str | None = None ) -> pandas.DataFrame: """Returns deployments in a table format. If limit is set to None, only the first 50 records are shown. :param limit: limit number of fetched records :type limit: int, optional :param artifact_type: return only deployments with the specified artifact_type :type artifact_type: str, optional :return: pandas.DataFrame with the listed deployments :rtype: pandas.DataFrame **Example** .. code-block:: python client.deployments.list() """ if not self._client.CLOUD_PLATFORM_SPACES and self._client.CPD_version < 4.8: self._client._check_if_space_is_set() details = self.get_details(limit=limit) resources = details["resources"] values = [] index = 0 sw_spec_info = { s["id"]: s for s in self._client.software_specifications.get_details(state_info=True)[ "resources" ] } def enrich_asset_with_type(asset_details: dict, asset_type: str) -> dict: if asset_type: asset_details["metadata"]["asset_type"] = asset_type return asset_details asset_info = { el["metadata"]["id"]: enrich_asset_with_type(el, asset_type) for asset_type, resources in { "model": self._client._models.get_details(get_all=True), "function": self._client._functions.get_details(get_all=True), }.items() for el in resources["resources"] } def get_spec_info(spec_id: str, prop: str) -> str: if spec_id and spec_id in sw_spec_info: return sw_spec_info[spec_id].get(prop, "") else: return "" for m in resources: # Deployment service currently doesn't support limit querying # As a workaround, its filtered in python client # Ideally this needs to be on the server side if limit is not None and index == limit: break spec_id = ( asset_info.get( m["entity"].get("asset", m["entity"].get("prompt_template"))["id"], {}, ) .get("entity", {}) .get("software_spec", {}) .get("id") ) if ( artifact_type and m["entity"].get("deployed_asset_type", "unknown") != artifact_type ): pass # filter by artifact_type else: values.append( ( ( m["metadata"]["guid"] if "guid" in m["metadata"] else m["metadata"]["id"] ), m["entity"]["name"], m["entity"]["status"]["state"], m["metadata"]["created_at"], m["entity"].get("deployed_asset_type", "unknown"), get_spec_info(spec_id, "state"), get_spec_info(spec_id, "replacement"), ) ) index = index + 1 table = self._list( values, [ "ID", "NAME", "STATE", "CREATED", "ARTIFACT_TYPE", "SPEC_STATE", "SPEC_REPLACEMENT", ], limit, 50, ) return table
[docs] def list_jobs(self, limit: int | None = None) -> pandas.DataFrame: """Return the async deployment jobs in a table format. If the limit is set to None, only the first 50 records are shown. :param limit: limit number of fetched records :type limit: int, optional :return: pandas.DataFrame with listed deployment jobs :rtype: pandas.DataFrame .. note:: This method list only async deployment jobs created for WML deployment. **Example** .. code-block:: python client.deployments.list_jobs() """ details = self.get_job_details(limit=limit) resources = details["resources"] values = [] index = 0 for m in resources: # Deployment service currently doesn't support limit querying # As a workaround, its filtered in python client if limit is not None and index == limit: break if "scoring" in m["entity"]: state = m["entity"]["scoring"]["status"]["state"] else: state = m["entity"]["decision_optimization"]["status"]["state"] deploy_id = m["entity"]["deployment"]["id"] values.append( (m["metadata"]["id"], state, m["metadata"]["created_at"], deploy_id) ) index = index + 1 table = self._list( values, ["JOB-ID", "STATE", "CREATED", "DEPLOYMENT-ID"], limit, 50 ) return table
def _get_deployable_asset_type(self, details: dict) -> str: url = details["entity"]["asset"]["id"] if "model" in url: return "model" elif "function" in url: return "function" else: return "unknown"
[docs] def update( self, deployment_id: str | None = None, changes: dict | None = None, **kwargs: Any, ) -> dict | None: """Updates existing deployment metadata. If ASSET is patched, then 'id' field is mandatory and it starts a deployment with the provided asset id/rev. Deployment ID remains the same. :param deployment_id: unique ID of deployment to be updated :type deployment_id: str :param changes: elements to be changed, where keys are ConfigurationMetaNames :type changes: dict :return: metadata of the updated deployment :rtype: dict or None **Examples** .. code-block:: python metadata = {client.deployments.ConfigurationMetaNames.NAME:"updated_Deployment"} updated_deployment_details = client.deployments.update(deployment_id, changes=metadata) metadata = {client.deployments.ConfigurationMetaNames.ASSET: { "id": "ca0cd864-4582-4732-b365-3165598dc945", "rev":"2" }} deployment_details = client.deployments.update(deployment_id, changes=metadata) """ deployment_id = _get_id_from_deprecated_uid( kwargs=kwargs, resource_id=deployment_id, resource_name="deployment" ) if changes is None: raise TypeError( "update() missing 1 required positional argument: 'changes'" ) Deployments._validate_type(changes, "changes", dict, True) if not self._client.CLOUD_PLATFORM_SPACES and self._client.CPD_version < 4.8: self._client._check_if_space_is_set() ret202 = False ## In case of passing 'AUTO_ONLINE_DEPLOYMENT' as true, we need to poll for deployment to be either 'deploy_success' or 'update_success'. Deployments._validate_type(deployment_id, "deployment_id", str, True) if ("asset" in changes and not changes["asset"]) and ( "prompt_template" in changes and not changes["prompt_template"] ): msg = "ASSET/PROMPT_TEMPLATE cannot be empty. 'id' and 'rev' (only ASSET) fields are supported. 'id' is mandatory" print(msg) raise WMLClientError(msg) patch_job = ( changes.get("asset") is not None or self.ConfigurationMetaNames.PROMPT_TEMPLATE in changes or self.ConfigurationMetaNames.SERVING_NAME in changes or self.ConfigurationMetaNames.OWNER in changes ) patch_job_field = None if patch_job: if changes.get("asset") is not None: patch_job_field = "ASSET" elif self.ConfigurationMetaNames.PROMPT_TEMPLATE in changes: patch_job_field = "PROMPT_TEMPLATE" elif self.ConfigurationMetaNames.SERVING_NAME in changes: patch_job_field = "SERVING_NAME" elif self.ConfigurationMetaNames.OWNER in changes: patch_job_field = "OWNER" if patch_job_field is None: raise WMLClientError("Unexpected patch job element.") if patch_job and (len(changes) > 1): msg = ( f"When {patch_job_field} is being updated/patched, other fields cannot be updated. If other fields are to be " f"updated, try without {patch_job_field} update. {patch_job_field} update triggers deployment with the new asset retaining " "the same deployment_id" ) print(msg) raise WMLClientError(msg) deployment_details = self.get_details(deployment_id) serving_name_change = False new_serving_name = None if self.ConfigurationMetaNames.SERVING_NAME in changes: new_serving_name = changes.pop(self.ConfigurationMetaNames.SERVING_NAME) serving_name_change = True patch_payload = self.ConfigurationMetaNames._generate_patch_payload( deployment_details, changes, with_validation=True ) if serving_name_change: replace = "serving_name" in deployment_details["entity"].get("online").get( "parameters", [] ) patch_payload.append( { "op": "replace" if replace else "add", "path": "/online/parameters", "value": {"serving_name": new_serving_name}, } ) ## As auto_online_deployment and auto_redeploy values are passed as 'bool' but service needs them in 'str' format to patch. for ele in patch_payload: if ( "auto_online_deployment" in ele["path"] or "auto_redeploy" in ele["path"] ): ele["value"] = str(ele["value"]).lower() url = self._client.service_instance._href_definitions.get_deployment_href( deployment_id ) response = requests.patch( url, json=patch_payload, params=self._client._params(), headers=self._client._get_headers(), ) if patch_job and response.status_code == 202: updated_details = self._handle_response( 202, "deployment asset patch", response ) ret202 = True print( f"Since {patch_job_field} is patched, deployment with new asset id/rev is being started. " "Monitor the status using deployments.get_details(deployment_id) api" ) elif response.status_code == 202: updated_details = self._handle_response(202, "deployment scaling", response) ret202 = True else: updated_details = self._handle_response(200, "deployment patch", response) if "auto_online_deployment" in changes: if response is not None: if response.status_code == 200: deployment_details = self.get_details(deployment_id) import time print_text_header_h1( " deployment update for id: '{}' started".format(deployment_id) ) status = deployment_details["entity"]["status"]["state"] with StatusLogger(status) as status_logger: while True: time.sleep(5) deployment_details = self.get_details(deployment_id) status = deployment_details["entity"]["status"]["state"] status_logger.log_state(status) if ( status != "DEPLOY_IN_PROGRESS" and status != "UPDATE_IN_PROGRESS" ): break if status == "DEPLOY_SUCCESS" or status == "UPDATE_SUCCESS": print("") print_text_header_h2( "Successfully finished deployment update, deployment_id='{}'".format( deployment_id ) ) return deployment_details else: print_text_header_h2("Deployment update failed") if deployment_id is not None: self._deployment_status_errors_handling( deployment_details, "update", deployment_id ) else: error_msg = "Deployment update failed" reason = response.text print(reason) print_text_header_h2(error_msg) raise WMLClientError( error_msg + ". Error: " + str(response.status_code) + ". " + reason ) if not ret202: return updated_details return None
## Below functions are for async scoring. They are just dummy functions. def _score_async( self, deployment_id: str, scoring_payload: dict, transaction_id: str | None = None, retention: int | None = None, ) -> str | dict: Deployments._validate_type(deployment_id, "deployment_id", str, True) Deployments._validate_type(scoring_payload, "scoring_payload", dict, True) headers = self._client._get_headers() if transaction_id is not None: headers.update({"x-global-transaction-id": transaction_id}) # making change - connection keep alive scoring_url = ( self._client.service_instance._href_definitions.get_async_deployment_job_href() ) params = self._client._params() if not self._client.ICP_PLATFORM_SPACES and retention is not None: if not isinstance(retention, int) or retention < -1: raise TypeError( "`retention` takes integer values greater or equal than -1." ) params.update({"retention": retention}) response_scoring = requests.post( scoring_url, params=params, json=scoring_payload, headers=headers ) return self._handle_response(202, "scoring asynchronously", response_scoring)
[docs] def create_job( self, deployment_id: str, meta_props: dict, retention: int | None = None, transaction_id: str | None = None, _asset_id: str | None = None, ) -> str | dict: """Create an asynchronous deployment job. :param deployment_id: unique ID of the deployment :type deployment_id: str :param meta_props: metaprops. To see the available list of metanames, use ``client.deployments.ScoringMetaNames.get()`` or ``client.deployments.DecisionOptimizationmetaNames.get()`` :type meta_props: dict :param retention: how many job days job meta should be retained, takes integer values >= -1, supported only on Cloud :type retention: int, optional :param transaction_id: transaction ID to be passed with the payload :type transaction_id: str, optional :return: metadata of the created async deployment job :rtype: dict or str .. note:: * The valid payloads for scoring input are either list of values, pandas or numpy dataframes. **Example** .. code-block:: python scoring_payload = {client.deployments.ScoringMetaNames.INPUT_DATA: [{'fields': ['GENDER','AGE','MARITAL_STATUS','PROFESSION'], 'values': [['M',23,'Single','Student'], ['M',55,'Single','Executive']]}]} async_job = client.deployments.create_job(deployment_id, scoring_payload) """ Deployments._validate_type(deployment_id, "deployment_id", str, True) Deployments._validate_type(meta_props, "meta_props", dict, True) if _asset_id: Deployments._validate_type(_asset_id, "_asset_id", str, True) # We assume that _asset_id is the id of the asset that was deployed # in the deployment with id deployment_id, and we save one REST call asset = _asset_id else: deployment_details = self.get_details(deployment_id) asset = deployment_details["entity"]["asset"]["id"] do_model = False asset_details = self._client.data_assets.get_details(asset) if ( "wml_model" in asset_details["entity"] and "type" in asset_details["entity"]["wml_model"] ): if "do" in asset_details["entity"]["wml_model"]["type"]: do_model = True flag = 0 ## To see if it is async scoring or DecisionOptimization Job if do_model: payload = self.DecisionOptimizationMetaNames._generate_resource_metadata( meta_props, with_validation=True, client=self._client ) flag = 1 else: payload = self.ScoringMetaNames._generate_resource_metadata( meta_props, with_validation=True, client=self._client ) scoring_data = None if "scoring" in payload and "input_data" in payload["scoring"]: scoring_data = payload["scoring"]["input_data"] if ( "decision_optimization" in payload and "input_data" in payload["decision_optimization"] ): scoring_data = payload["decision_optimization"]["input_data"] if scoring_data is not None: score_payload = [] for each_score_request in scoring_data: lib_checker.check_lib(lib_name="pandas") import pandas as pd if "values" in each_score_request: scoring_values = each_score_request["values"] # Check feature types, currently supporting pandas df, numpy.ndarray, python lists and Dmatrix if isinstance(scoring_values, pd.DataFrame): fields_names = scoring_values.columns.values.tolist() values = scoring_values.where( pd.notnull(scoring_values), None ).values.tolist() # replace nan with None each_score_request["values"] = values if fields_names is not None: each_score_request["fields"] = fields_names ## If payload is a numpy dataframe elif isinstance(scoring_values, np.ndarray): # replace nan with None values = np.where(pd.notnull(scoring_values), scoring_values, None).tolist() # type: ignore[call-overload] each_score_request["values"] = values score_payload.append(each_score_request) ##See if it is scoring or DecisionOptimizationJob if flag == 0: payload["scoring"]["input_data"] = score_payload if flag == 1: payload["decision_optimization"]["input_data"] = score_payload import copy if "input_data_references" in meta_props: Deployments._validate_type( meta_props.get("input_data_references"), "input_data_references", list, True, ) modified_input_data_references = False input_data = copy.deepcopy(meta_props.get("input_data_references")) input_data = cast(Iterable[Any], input_data) for i, input_data_fields in enumerate(input_data): if "connection" not in input_data_fields: modified_input_data_references = True input_data_fields.update({"connection": {}}) if modified_input_data_references: if "scoring" in payload: payload["scoring"].update({"input_data_references": input_data}) else: payload["decision_optimization"].update( {"input_data_references": input_data} ) if "output_data_reference" in meta_props: Deployments._validate_type( meta_props.get("output_data_reference"), "output_data_reference", dict, True, ) output_data = copy.deepcopy(meta_props.get("output_data_reference")) output_data = cast(dict, output_data) if ( "connection" not in output_data ): # and output_data.get('connection', None) is not None: output_data.update({"connection": {}}) payload["scoring"].update({"output_data_reference": output_data}) if "output_data_references" in meta_props: Deployments._validate_type( meta_props.get("output_data_references"), "output_data_references", list, True, ) output_data = copy.deepcopy(meta_props.get("output_data_references")) modified_output_data_references = False output_data = cast(Iterable[Any], output_data) for i, output_data_fields in enumerate(output_data): if "connection" not in output_data_fields: modified_output_data_references = True output_data_fields.update({"connection": {}}) if modified_output_data_references and "decision_optimization" in payload: payload["decision_optimization"].update( {"output_data_references": output_data} ) payload.update({"deployment": {"id": deployment_id}}) if "hardware_spec" in meta_props: payload.update( {"hardware_spec": meta_props[self.ConfigurationMetaNames.HARDWARE_SPEC]} ) if "hybrid_pipeline_hardware_specs" in meta_props: payload.update( { "hybrid_pipeline_hardware_specs": meta_props[ self.ConfigurationMetaNames.HYBRID_PIPELINE_HARDWARE_SPECS ] } ) payload.update({"space_id": self._client.default_space_id}) if "name" not in payload: import uuid payload.update({"name": "name_" + str(uuid.uuid4())}) return self._score_async( deployment_id, payload, transaction_id=transaction_id, retention=retention )
[docs] def get_job_details( self, job_id: str | None = None, include: str | None = None, limit: int | None = None, **kwargs: Any, ) -> dict: """Get information about deployment job(s). If deployment job_id is not passed, all deployment jobs details are returned. :param job_id: unique ID of the job :type job_id: str, optional :param include: fields to be retrieved from 'decision_optimization' and 'scoring' section mentioned as value(s) (comma separated) as output response fields :type include: str, optional :param limit: limit number of fetched records :type limit: int, optional :return: metadata of deployment job(s) :rtype: dict (if job_id is not None) or {"resources": [dict]} (if job_id is None) **Example** .. code-block:: python deployment_details = client.deployments.get_job_details() deployments_details = client.deployments.get_job_details(job_id=job_id) """ job_id = _get_id_from_deprecated_uid( kwargs=kwargs, resource_id=job_id, resource_name="job", can_be_none=True ) if job_id is not None: Deployments._validate_type(job_id, "job_id", str, True) url = ( self._client.service_instance._href_definitions.get_async_deployment_job_href() ) params = self._client._params() if include: params["include"] = include return self._get_artifact_details( url, job_id, limit, "async deployment job" if job_id else "async deployment jobs", query_params=params, )
[docs] def get_job_status(self, job_id: str) -> dict: """Get the status of a deployment job. :param job_id: unique ID of the deployment job :type job_id: str :return: status of the deployment job :rtype: dict **Example** .. code-block:: python job_status = client.deployments.get_job_status(job_id) """ job_details = self.get_job_details(job_id) if "scoring" not in job_details["entity"]: return job_details["entity"]["decision_optimization"]["status"] return job_details["entity"]["scoring"]["status"]
[docs] def get_job_id(self, job_details: dict) -> str: """Get the unique ID of a deployment job. :param job_details: metadata of the deployment job :type job_details: dict :return: unique ID of the deployment job :rtype: str **Example** .. code-block:: python job_details = client.deployments.get_job_details(job_id=job_id) job_status = client.deployments.get_job_id(job_details) """ return job_details["metadata"]["id"]
[docs] def get_job_uid(self, job_details: dict) -> str: """Get the unique ID of a deployment job. *Deprecated:* Use ``get_job_id(job_details)`` instead. :param job_details: metadata of the deployment job :type job_details: dict :return: unique ID of the deployment job :rtype: str **Example** .. code-block:: python job_details = client.deployments.get_job_details(job_uid=job_uid) job_status = client.deployments.get_job_uid(job_details) """ warn( ( "`get_job_uid()` is deprecated and will be removed in future. " "Instead, please use `get_job_id()`." ), category=DeprecationWarning, ) return self.get_job_id(job_details)
[docs] def get_job_href(self, job_details: dict) -> str: """Get the href of a deployment job. :param job_details: metadata of the deployment job :type job_details: dict :return: href of the deployment job :rtype: str **Example** .. code-block:: python job_details = client.deployments.get_job_details(job_id=job_id) job_status = client.deployments.get_job_href(job_details) """ return "/ml/v4/deployment_jobs/{}".format(job_details["metadata"]["id"])
[docs] def delete_job( self, job_id: str | None = None, hard_delete: bool = False, **kwargs: Any ) -> str: """Delete a deployment job that is running. This method can also delete metadata details of completed or canceled jobs when hard_delete parameter is set to True. :param job_id: unique ID of the deployment job to be deleted :type job_id: str :param hard_delete: specify `True` or `False`: `True` - To delete the completed or canceled job. `False` - To cancel the currently running deployment job. :type hard_delete: bool, optional :return: status ("SUCCESS" or "FAILED") :rtype: str **Example** .. code-block:: python client.deployments.delete_job(job_id) """ job_id = _get_id_from_deprecated_uid( kwargs=kwargs, resource_id=job_id, resource_name="job" ) Deployments._validate_type(job_id, "job_id", str, True) if job_id is not None and not is_id(job_id): raise WMLClientError("'job_id' is not an id: '{}'".format(job_id)) url = self._client.service_instance._href_definitions.get_async_deployment_jobs_href( job_id ) params = self._client._params() if hard_delete is True: params.update({"hard_delete": "true"}) response_delete = requests.delete( url, headers=self._client._get_headers(), params=params ) return self._handle_response( 204, "deployment async job deletion", response_delete, False )
def _get_filter_func_by_spec_state(self, spec_state: SpecStates) -> Callable: def filter_func(resources: list) -> list[str]: asset_ids = [ i["metadata"]["id"] for key, value in { "model": self._client._models.get_details( get_all=True, spec_state=spec_state ), "function": self._client._functions.get_details( get_all=True, spec_state=spec_state ), }.items() for i in value["resources"] ] return [ r for r in resources if r["entity"].get("asset", {}).get("id") in asset_ids ] return filter_func def _get_model_inference_text( self, deployment_id: str, inference_type: Literal["text", "text_stream"], params: dict | None = None, ) -> Any: """Based on provided deployment_id and params get ModelInference object. Verify that the deployment with the given deployment_id has generating methods. """ # Import ModelInference here to avoid circular import error from ibm_watsonx_ai.foundation_models.inference import ModelInference match inference_type: case "text": generated_url = self._client.service_instance._href_definitions.get_fm_deployment_generation_href( deployment_id=deployment_id, item="text" ) case "text_stream": if self._client._use_fm_ga_api: generated_url = self._client.service_instance._href_definitions.get_fm_deployment_generation_stream_href( deployment_id=deployment_id ) else: # Remove on CPD 5.0 release generated_url = self._client.service_instance._href_definitions.get_fm_deployment_generation_href( deployment_id=deployment_id, item="text_stream" ) case _: raise InvalidValue( value_name="inference_type", reason=f"Available types: 'text', 'text_stream', got: {inference_type}.", ) inference_url_list = [ url.get("url") for url in self.get_details(deployment_id, _silent=True)["entity"] .get("status", {}) .get("inference", {}) ] if not inference_url_list: inference_url_list = ( self.get_details(deployment_id, _silent=True)["entity"] .get("status", {}) .get("serving_urls", []) ) if generated_url not in inference_url_list: for inference_url in inference_url_list: # Remove on CPD 5.0 release if ( "v1-beta/deployments" not in inference_url ): # Remove on CPD 5.0 release raise WMLClientError( Messages.get_message( deployment_id, message_id="fm_deployment_has_not_inference_for_generation", ) ) return ModelInference( deployment_id=deployment_id, params=params, api_client=self._client )
[docs] def generate( self, deployment_id: str, prompt: str | None = None, params: dict | None = None, guardrails: bool = False, guardrails_hap_params: bool | None = None, guardrails_pii_params: bool | None = None, concurrency_limit: int = 10, async_mode: bool = False, validate_prompt_variables: bool = True, ) -> dict: """Generate a raw response with `prompt` for given `deployment_id`. :param deployment_id: unique ID of the deployment :type deployment_id: str :param prompt: prompt needed for text generation. If deployment_id points to the Prompt Template asset, then the prompt argument must be None, defaults to None :type prompt: str, optional :param params: meta props for text generation, use ``ibm_watsonx_ai.metanames.GenTextParamsMetaNames().show()`` to view the list of MetaNames :type params: dict, optional :param guardrails: If True, then potentially hateful, abusive, and/or profane language (HAP) was detected filter is toggle on for both prompt and generated text, defaults to False :type guardrails: bool, optional :param guardrails_hap_params: meta props for HAP moderations, use ``ibm_watsonx_ai.metanames.GenTextModerationsMetaNames().show()`` to view the list of MetaNames :type guardrails_hap_params: dict, optional :param concurrency_limit: number of requests to be sent in parallel, maximum is 10 :type concurrency_limit: int, optional :param async_mode: If True, then yield results asynchronously (using generator). In this case both the prompt and the generated text will be concatenated in the final response - under `generated_text`, defaults to False :type async_mode: bool, optional :param validate_prompt_variables: If True, prompt variables provided in `params` are validated with the ones in Prompt Template Asset. This parameter is only applicable in a Prompt Template Asset deployment scenario and should not be changed for different cases, defaults to True :type validate_prompt_variables: bool :return: scoring result containing generated content :rtype: dict """ d_inference = self._get_model_inference_text(deployment_id, "text", params) return d_inference.generate( prompt=prompt, guardrails=guardrails, guardrails_hap_params=guardrails_hap_params, guardrails_pii_params=guardrails_pii_params, concurrency_limit=concurrency_limit, params=params, async_mode=async_mode, validate_prompt_variables=validate_prompt_variables, )
[docs] def generate_text( self, deployment_id: str, prompt: str | None = None, params: dict | None = None, raw_response: bool = False, guardrails: bool = False, guardrails_hap_params: bool | None = None, guardrails_pii_params: bool | None = None, concurrency_limit: int = 10, validate_prompt_variables: bool = True, ) -> str: """Given the selected deployment (deployment_id), a text prompt as input, and the parameters and concurrency_limit, the selected inference will generate a completion text as generated_text response. :param deployment_id: unique ID of the deployment :type deployment_id: str :param prompt: the prompt string or list of strings. If the list of strings is passed, requests will be managed in parallel with the rate of concurency_limit, defaults to None :type prompt: str, optional :param params: meta props for text generation, use ``ibm_watsonx_ai.metanames.GenTextParamsMetaNames().show()`` to view the list of MetaNames :type params: dict, optional :param raw_response: returns the whole response object :type raw_response: bool, optional :param guardrails: If True, then potentially hateful, abusive, and/or profane language (HAP) was detected filter is toggle on for both prompt and generated text, defaults to False :type guardrails: bool, optional :param guardrails_hap_params: meta props for HAP moderations, use ``ibm_watsonx_ai.metanames.GenTextModerationsMetaNames().show()`` to view the list of MetaNames :type guardrails_hap_params: dict, optional :param concurrency_limit: number of requests to be sent in parallel, maximum is 10 :type concurrency_limit: int, optional :param validate_prompt_variables: If True, prompt variables provided in `params` are validated with the ones in Prompt Template Asset. This parameter is only applicable in a Prompt Template Asset deployment scenario and should not be changed for different cases, defaults to True :type validate_prompt_variables: bool :return: generated content :rtype: str .. note:: By default only the first occurance of `HAPDetectionWarning` is displayed. To enable printing all warnings of this category, use: .. code-block:: python import warnings from ibm_watsonx_ai.foundation_models.utils import HAPDetectionWarning warnings.filterwarnings("always", category=HAPDetectionWarning) """ d_inference = self._get_model_inference_text(deployment_id, "text", params) return d_inference.generate_text( prompt=prompt, raw_response=raw_response, guardrails=guardrails, guardrails_hap_params=guardrails_hap_params, guardrails_pii_params=guardrails_pii_params, concurrency_limit=concurrency_limit, params=params, validate_prompt_variables=validate_prompt_variables, )
[docs] def generate_text_stream( self, deployment_id: str, prompt: str | None = None, params: dict | None = None, raw_response: bool = False, guardrails: bool = False, guardrails_hap_params: bool | None = None, guardrails_pii_params: bool | None = None, validate_prompt_variables: bool = True, ) -> str: """Given the selected deployment (deployment_id), a text prompt as input and parameters, the selected inference will generate a streamed text as generate_text_stream. :param deployment_id: unique ID of the deployment :type deployment_id: str :param prompt: the prompt string, defaults to None :type prompt: str, optional :param params: meta props for text generation, use ``ibm_watsonx_ai.metanames.GenTextParamsMetaNames().show()`` to view the list of MetaNames :type params: dict, optional :param raw_response: yields the whole response object :type raw_response: bool, optional :param guardrails: If True, then potentially hateful, abusive, and/or profane language (HAP) was detected filter is toggle on for both prompt and generated text, defaults to False :type guardrails: bool, optional :param guardrails_hap_params: meta props for HAP moderations, use ``ibm_watsonx_ai.metanames.GenTextModerationsMetaNames().show()`` to view the list of MetaNames :type guardrails_hap_params: dict, optional :param validate_prompt_variables: If True, prompt variables provided in `params` are validated with the ones in Prompt Template Asset. This parameter is only applicable in a Prompt Template Asset deployment scenario and should not be changed for different cases, defaults to True :type validate_prompt_variables: bool :return: generated content :rtype: str .. note:: By default only the first occurance of `HAPDetectionWarning` is displayed. To enable printing all warnings of this category, use: .. code-block:: python import warnings from ibm_watsonx_ai.foundation_models.utils import HAPDetectionWarning warnings.filterwarnings("always", category=HAPDetectionWarning) """ d_inference = self._get_model_inference_text( deployment_id, "text_stream", params ) return d_inference.generate_text_stream( prompt=prompt, params=params, raw_response=raw_response, guardrails=guardrails, guardrails_hap_params=guardrails_hap_params, guardrails_pii_params=guardrails_pii_params, validate_prompt_variables=validate_prompt_variables, )