Source code for ibm_watson_machine_learning.deployments

#  -----------------------------------------------------------------------------------------
#  (C) Copyright IBM Corp. 2017-2024.
#  https://opensource.org/licenses/BSD-3-Clause
#  -----------------------------------------------------------------------------------------

from __future__ import print_function
from typing import Optional, Literal
import numpy as np
import json
import inspect

import ibm_watson_machine_learning._wrappers.requests as requests
from ibm_watson_machine_learning.utils import DEPLOYMENT_DETAILS_TYPE, INSTANCE_DETAILS_TYPE, print_text_header_h1, print_text_header_h2, \
    StatusLogger
from ibm_watson_machine_learning.utils.utils import _check_if_import_from_watsonx_ai
from ibm_watson_machine_learning.wml_client_error import (WMLClientError, MissingValue, 
                                                          NoVirtualDeploymentSupportedForICP,  
                                                          InvalidValue, MissingArgument, PromptVariablesError)
from ibm_watson_machine_learning.href_definitions import is_uid
from ibm_watson_machine_learning.wml_resource import WMLResource
from ibm_watson_machine_learning.messages.messages import Messages
from ibm_watson_machine_learning.metanames import DeploymentMetaNames, ScoringMetaNames, DecisionOptimizationMetaNames, DeploymentNewMetaNames
from ibm_watson_machine_learning.libs.repo.util.library_imports import LibraryChecker
from ibm_watson_machine_learning.utils.autoai.utils import all_logging_disabled
#from ibm_watson_machine_learning.metanames import DeploymentMetaNames, ScoreMetaNames, EnvironmentMetaNames

lib_checker = LibraryChecker()


[docs] class Deployments(WMLResource): """Deploy and score published artifacts (models and functions). """ cloud_platform_spaces = False icp_platform_spaces = False def __init__(self, client): WMLResource.__init__(self, __name__, client) if not client.ICP and not client.CLOUD_PLATFORM_SPACES and not client.ICP_PLATFORM_SPACES: Deployments._validate_type(client.service_instance.details, u'instance_details', dict, True) Deployments._validate_type_of_details(client.service_instance.details, INSTANCE_DETAILS_TYPE) self._ICP = client.ICP if client.CLOUD_PLATFORM_SPACES or client.ICP_PLATFORM_SPACES: self.ConfigurationMetaNames = DeploymentNewMetaNames() else: self.ConfigurationMetaNames = DeploymentMetaNames() if client.CLOUD_PLATFORM_SPACES: Deployments.cloud_platform_spaces = True if client.ICP_PLATFORM_SPACES: Deployments.icp_platform_spaces = True self.ScoringMetaNames = ScoringMetaNames() self.DecisionOptimizationMetaNames = DecisionOptimizationMetaNames() def _deployment_status_errors_handling(self, deployment_details, operation_name, deployment_id): try: if 'failure' in deployment_details['entity']['status']: errors = deployment_details[u'entity'][u'status'][u'failure'][u'errors'] for error in errors: if type(error) == str: try: error_obj = json.loads(error) print(error_obj[u'message']) except: print(error) elif type(error) == dict: print(error['message']) else: print(error) raise WMLClientError('Deployment ' + operation_name + ' failed for deployment id: ' + deployment_id +'. Errors: ' + str(errors)) else: print(deployment_details['entity']['status']) raise WMLClientError('Deployment ' + operation_name + ' failed for deployment id: ' + deployment_id +'. Error: ' + str(deployment_details['entity']['status']['state'])) except WMLClientError as e: raise e except Exception as e: self._logger.debug('Deployment ' + operation_name + ' failed: ' + str(e)) print(deployment_details['entity']['status']['failure']) raise WMLClientError('Deployment ' + operation_name + ' failed for deployment id: ' + deployment_id + '.') # TODO model_uid and artifact_uid should be changed to artifact_uid only
[docs] def create(self, artifact_uid=None, meta_props=None, rev_id=None, **kwargs): """Create a deployment from an artifact. As artifact, we understand model or function which may be deployed. :param artifact_uid: published artifact UID (model or function uid) :type artifact_uid: str :param meta_props: metaprops, to see the available list of metanames use: .. code-block:: python client.deployments.ConfigurationMetaNames.get() :type meta_props: dict :return: metadata of the created deployment :rtype: dict **Example** .. code-block:: python meta_props = { wml_client.deployments.ConfigurationMetaNames.NAME: "SAMPLE DEPLOYMENT NAME", wml_client.deployments.ConfigurationMetaNames.ONLINE: {}, wml_client.deployments.ConfigurationMetaNames.HARDWARE_SPEC : { "id": "e7ed1d6c-2e89-42d7-aed5-8sb972c1d2b"}, wml_client.deployments.ConfigurationMetaNames.SERVING_NAME : 'sample_deployment' } deployment_details = client.deployments.create(artifact_uid, meta_props) """ ##To be removed once deployments adds support for projects WMLResource._chk_and_block_create_update_for_python36(self) Deployments._validate_type(artifact_uid, u'artifact_uid', str, True) if self._ICP: predictionUrl = self._wml_credentials[u'url'] if meta_props is None: raise WMLClientError("Invalid input. meta_props can not be empty.") if self._client.CLOUD_PLATFORM_SPACES and 'r_shiny' in meta_props: raise WMLClientError('Shiny is not supported in this release') if self._client.CPD_version >= 4.8 or self._client.CLOUD_PLATFORM_SPACES: base_model_id = meta_props.get(self.ConfigurationMetaNames.BASE_MODEL_ID) ModelTypes = _check_if_import_from_watsonx_ai(self.__module__, 'ibm_watson_machine_learning.foundation_models.utils.enums', 'ModelTypes') if isinstance(base_model_id, ModelTypes): meta_props[self.ConfigurationMetaNames.BASE_MODEL_ID] = base_model_id.value metaProps = self.ConfigurationMetaNames._generate_resource_metadata(meta_props) if 'serving_name' in str(metaProps) and meta_props.get('serving_name', False) and 'r_shiny' in str(metaProps): if 'parameters' in metaProps['r_shiny']: metaProps['r_shiny']['parameters']['serving_name'] = meta_props['serving_name'] else: metaProps['r_shiny']['parameters'] = {'serving_name': meta_props['serving_name']} if 'online' in metaProps: del metaProps['online'] if (self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES) and 'wml_instance_id' in meta_props: metaProps.update({'wml_instance_id': meta_props['wml_instance_id']}) if not self._client.CLOUD_PLATFORM_SPACES and not self._client.ICP_PLATFORM_SPACES: ## Remove new meataprop from older one, just in case if user provides. if self.ConfigurationMetaNames.HARDWARE_SPEC in metaProps: metaProps.pop(self.ConfigurationMetaNames.HARDWARE_SPEC) if self.ConfigurationMetaNames.HYBRID_PIPELINE_HARDWARE_SPECS in metaProps: metaProps.pop(self.ConfigurationMetaNames.HYBRID_PIPELINE_HARDWARE_SPECS) if self.ConfigurationMetaNames.R_SHINY in metaProps: metaProps.pop(self.ConfigurationMetaNames.R_SHINY) artifact_details = self._client.repository.get_details(artifact_uid) artifact_href = artifact_details['metadata']['href'] artifact_revision_id = artifact_href.split('=')[1] if 'model' in artifact_href: if self._client.CAMS: details = "/v4/models/"+artifact_uid + "?space_id=" + self._client.default_space_id else: details = "/v4/models/" + artifact_uid + "?rev=" + artifact_revision_id elif 'function' in artifact_href: if self._client.CAMS: details = "/v4/functions/" + artifact_uid + "?space_id=" + self._client.default_space_id else: details = "/v4/functions/" + artifact_uid + "?rev=" + artifact_revision_id else: raise WMLClientError('Unexpected artifact type: \'{}\'.'.format(artifact_uid)) if 'model' in artifact_href: if "tensorflow_1.11" in artifact_details["entity"]["type"] or "tensorflow_1.5" in \ artifact_details["entity"]["type"]: print("Note: Model of framework tensorflow and versions 1.5/1.11 has been deprecated. These versions will not be supported after 26th Nov 2019.") metaProps['asset'] = {'href': details} if self._client.CAMS: if self._client.default_space_id is not None: metaProps['space'] = {'href': "/v4/spaces/" + self._client.default_space_id} else: raise WMLClientError( "It is mandatory is set the space. Use client.set.default_space(<SPACE_GUID>) to set the space.") ##Check if default space is set else: metaProps['asset'] = metaProps.get('asset') if metaProps.get('asset') else {'id': artifact_uid} if rev_id is not None: metaProps['asset'].update({'rev': rev_id}) if self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES: if self._client.default_project_id: metaProps['project_id'] = self._client.default_project_id else: metaProps['space_id'] = self._client.default_space_id else: if 'space' not in metaProps: if self._client.default_space_id is not None: metaProps['space_id'] = self._client.default_space_id else: raise WMLClientError( "It is mandatory is set the space. Use client.set.default_space(<SPACE_GUID>) to set the space.") # note: checking if artifact_uid points to prompt_template if self._client.CPD_version >= 4.8 or self._client.CLOUD_PLATFORM_SPACES: with all_logging_disabled(): try: PromptTemplateManager = _check_if_import_from_watsonx_ai(self.__module__, 'ibm_watson_machine_learning.foundation_models.prompts', 'PromptTemplateManager') model_id = PromptTemplateManager(api_client=self._client).load_prompt(artifact_uid).model_id except Exception: pass # Foundation models scenario should not impact other ML models' deployment scenario. else: metaProps.pop("asset") metaProps['prompt_template'] = {"id": artifact_uid} if DeploymentNewMetaNames.BASE_MODEL_ID not in metaProps and \ DeploymentNewMetaNames.BASE_DEPLOYMENT_ID not in metaProps: metaProps.update({DeploymentNewMetaNames.BASE_MODEL_ID: model_id}) # --- end note url = self._client.service_instance._href_definitions.get_deployments_href() if self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES: response = requests.post( url, json=metaProps, params=self._client._params(), # version is mandatory headers=self._client._get_headers()) else: response = requests.post( url, json=metaProps, headers=self._client._get_headers()) ## Post Deployment call executed if response.status_code == 202: deployment_details = response.json() #print(deployment_details) if self._ICP: if 'online_url' in deployment_details["entity"]["status"]: scoringUrl = deployment_details.get(u'entity').get(u'status').get('online_url').get('url').replace('https://ibm-nginx-svc:443', predictionUrl) deployment_details[u'entity'][u'status']['online_url']['url'] = scoringUrl deployment_uid = self.get_uid(deployment_details) import time print_text_header_h1(u'Synchronous deployment creation for uid: \'{}\' started'.format(artifact_uid)) status = deployment_details[u'entity'][u'status']['state'] notifications = [] with StatusLogger(status) as status_logger: while True: time.sleep(5) deployment_details = self._client.deployments.get_details(deployment_uid, _silent=True) #this is wrong , needs to update for ICP if "system" in deployment_details: notification = deployment_details['system']['warnings'][0]['message'] if notification not in notifications: print("\nNote: " + notification) notifications.append(notification) if self._ICP and not self._client.ICP_PLATFORM_SPACES: scoringUrl = deployment_details.get(u'entity').get(u'asset').get('href').replace('https://wml-os-envoy:16600', predictionUrl) deployment_details[u'entity'][u'asset']['href'] = scoringUrl status = deployment_details[u'entity'][u'status'][u'state'] status_logger.log_state(status) if status != u'DEPLOY_IN_PROGRESS' and status != "initializing": break if status == u'DEPLOY_SUCCESS' or status == u'ready': print(u'') print_text_header_h2( u'Successfully finished deployment creation, deployment_uid=\'{}\''.format(deployment_uid)) return deployment_details else: print_text_header_h2(u'Deployment creation failed') self._deployment_status_errors_handling(deployment_details, 'creation', deployment_uid) else: error_msg = u'Deployment creation failed' reason = response.text print(reason) print_text_header_h2(error_msg) raise WMLClientError(error_msg + '. Error: ' + str(response.status_code) + '. ' + reason)
#return self._handle_response(202, u'created deployment', response)
[docs] @staticmethod def get_uid(deployment_details): """Get deployment_uid from deployment details. *Deprecated:* Use ``get_id(deployment_details)`` instead. :param deployment_details: metadata of the deployment :type deployment_details: dict :return: deployment UID that is used to manage the deployment :rtype: str **Example** .. code-block:: python deployment_uid = client.deployments.get_uid(deployment) """ Deployments._validate_type(deployment_details, u'deployment_details', dict, True) if not Deployments.cloud_platform_spaces and not Deployments.icp_platform_spaces: Deployments._validate_type_of_details(deployment_details, DEPLOYMENT_DETAILS_TYPE) try: if 'id' in deployment_details[u'metadata']: uid = deployment_details.get(u'metadata').get(u'id') else: uid = deployment_details.get(u'metadata').get(u'guid') except Exception as e: raise WMLClientError(u'Getting deployment UID from deployment details failed.', e) if uid is None: raise MissingValue(u'deployment_details.metadata.guid') return uid
[docs] @staticmethod def get_id(deployment_details): """Get deployment id from deployment details. :param deployment_details: metadata of the deployment :type deployment_details: dict :return: deployment ID that is used to manage the deployment :rtype: str **Example** .. code-block:: python deployment_id = client.deployments.get_id(deployment) """ return Deployments.get_uid(deployment_details)
[docs] @staticmethod def get_href(deployment_details): """Get deployment_href from deployment details. :param deployment_details: metadata of the deployment. :type deployment_details: dict :return: deployment href that is used to manage the deployment :rtype: str **Example** .. code-block:: python deployment_href = client.deployments.get_href(deployment) """ Deployments._validate_type(deployment_details, u'deployment_details', dict, True) if not Deployments.cloud_platform_spaces and not Deployments.icp_platform_spaces: Deployments._validate_type_of_details(deployment_details, DEPLOYMENT_DETAILS_TYPE) try: if 'href' in deployment_details[u'metadata']: url = deployment_details.get(u'metadata').get(u'href') else: url = "/ml/v4/deployments/{}".format(deployment_details[u'metadata'][u'id']) except Exception as e: raise WMLClientError(u'Getting deployment url from deployment details failed.', e) if url is None: raise MissingValue(u'deployment_details.metadata.href') return url
def _get_serving_name_info(self, serving_name: str) -> tuple: """Get info about serving name. :param serving_name: serving name to filter deployments :type serving_name: str :return: information about serving name: (<status_code>, <response json if any>) :rtype: tuple **Example** .. code-block:: python is_available = client.deployments.is_serving_name_available('test') """ params = { 'serving_name': serving_name, 'conflict': 'true', 'version': self._client.version_param } url = self._client.service_instance._href_definitions.get_deployments_href() res = requests.get(url, headers=self._client._get_headers(), params=params) if res.status_code == 409: response = res.json() else: response = None return (res.status_code, response)
[docs] def is_serving_name_available(self, serving_name: str) -> bool: """Check if serving name is available for usage. :param serving_name: serving name to filter deployments :type serving_name: str :return: information if serving name is available :rtype: bool **Example** .. code-block:: python is_available = client.deployments.is_serving_name_available('test') """ status_code, _ = self._get_serving_name_info(serving_name) return status_code != 409
[docs] def get_details(self, deployment_uid=None, serving_name=None, limit=None, asynchronous=False, get_all=False, spec_state=None, _silent=False): """Get information about deployment(s). If deployment_uid is not passed, all deployment details are fetched. :param deployment_uid: Unique Id of Deployment :type deployment_uid: str, optional :param serving_name: serving name to filter deployments :type serving_name: str, optional :param limit: limit number of fetched records :type limit: int, optional :param asynchronous: if True, it will work as a generator :type asynchronous: bool, optional :param get_all: if True, it will get all entries in 'limited' chunks :type get_all: bool, optional :param spec_state: software specification state, can be used only when `deployment_uid` is None :type spec_state: SpecStates, optional :return: metadata of deployment(s) :rtype: dict (if deployment_uid is not None) or {"resources": [dict]} (if deployment_uid is None) **Example** .. code-block:: python deployment_details = client.deployments.get_details(deployment_uid) deployment_details = client.deployments.get_details(deployment_uid=deployment_uid) deployments_details = client.deployments.get_details() deployments_details = client.deployments.get_details(limit=100) deployments_details = client.deployments.get_details(limit=100, get_all=True) deployments_details = [] for entry in client.deployments.get_details(limit=100, asynchronous=True, get_all=True): deployments_details.extend(entry) """ if not self._client.CLOUD_PLATFORM_SPACES and self._client.CPD_version < 4.8: self._client._check_if_space_is_set() Deployments._validate_type(deployment_uid, u'deployment_uid', str, False) if deployment_uid is not None and not is_uid(deployment_uid): raise WMLClientError(u'\'deployment_uid\' is not an uid: \'{}\''.format(deployment_uid)) url = self._client.service_instance._href_definitions.get_deployments_href() query_params = self._client._params() if serving_name: query_params['serving_name'] = serving_name if deployment_uid is None: filter_func = self._get_filter_func_by_spec_state(spec_state) if spec_state else None deployment_details = self._get_artifact_details(url, deployment_uid, limit, 'deployments', query_params=query_params, _async=asynchronous, _all=get_all, _filter_func=filter_func) else: deployment_details = self._get_artifact_details(url, deployment_uid, limit, 'deployments', query_params=query_params) if type(deployment_details).__name__ != 'NextResourceGenerator' and "system" in deployment_details \ and not _silent: print("Note: " + deployment_details['system']['warnings'][0]['message']) return deployment_details
[docs] @staticmethod def get_scoring_href(deployment_details): """Get scoring url from deployment details. :param deployment_details: metadata of the deployment :type deployment_details: dict :return: scoring endpoint url that is used for making scoring requests :rtype: str **Example** .. code-block:: python scoring_href = client.deployments.get_scoring_href(deployment) """ Deployments._validate_type(deployment_details, u'deployment', dict, True) if not Deployments.cloud_platform_spaces and not Deployments.icp_platform_spaces: Deployments._validate_type_of_details(deployment_details, DEPLOYMENT_DETAILS_TYPE) scoring_url = None try: url = deployment_details['entity']['status'].get('online_url') if url is not None: scoring_url = deployment_details['entity']['status']['online_url']['url'] else: raise MissingValue(u'Getting scoring url for deployment failed. This functionality is available only for sync deployments') except Exception as e: raise WMLClientError(u'Getting scoring url for deployment failed. This functionality is available only for sync deployments', e) if scoring_url is None: raise MissingValue(u'scoring_url missing in online_predictions') return scoring_url
[docs] @staticmethod def get_serving_href(deployment_details): """Get serving url from deployment details. :param deployment_details: metadata of the deployment :type deployment_details: dict :return: serving endpoint url that is used for making scoring requests :rtype: str **Example** .. code-block:: python scoring_href = client.deployments.get_serving_href(deployment) """ Deployments._validate_type(deployment_details, u'deployment', dict, True) if not Deployments.cloud_platform_spaces and not Deployments.icp_platform_spaces: Deployments._validate_type_of_details(deployment_details, DEPLOYMENT_DETAILS_TYPE) try: serving_name = deployment_details['entity']['online'].get('parameters').get('serving_name') serving_url = [url for url in deployment_details['entity'].get('status').get('serving_urls') if serving_name == url.split('/')[-2]][0] if serving_url: return serving_url else: raise MissingValue( u'Getting serving url for deployment failed. This functionality is available only for sync deployments with serving name.') except Exception as e: raise WMLClientError( u'Getting serving url for deployment failed. This functionality is available only for sync deployments with serving name.', e)
[docs] def delete(self, deployment_uid): """Delete deployment. :param deployment_uid: Unique Id of Deployment :type deployment_uid: str :return: status ("SUCCESS" or "FAILED") :rtype: str **Example** .. code-block:: python client.deployments.delete(deployment_uid) """ if not self._client.CLOUD_PLATFORM_SPACES and self._client.CPD_version < 4.8: self._client._check_if_space_is_set() Deployments._validate_type(deployment_uid, u'deployment_uid', str, True) if deployment_uid is not None and not is_uid(deployment_uid): raise WMLClientError(u'\'deployment_uid\' is not an uid: \'{}\''.format(deployment_uid)) deployment_url = self._client.service_instance._href_definitions.get_deployment_href(deployment_uid) if not self._ICP or self._client.ICP_PLATFORM_SPACES: response_delete = requests.delete( deployment_url, params=self._client._params(), headers=self._client._get_headers()) else: response_delete = requests.delete( deployment_url, #TODO: The below line needs to uncommeted after /v4/deployment accespts query param # params=self._client._params(), headers=self._client._get_headers()) return self._handle_response(204, u'deployment deletion', response_delete, False)
[docs] def score(self, deployment_id, meta_props, transaction_id=None): """Make scoring requests against deployed artifact. :param deployment_id: Unique Id of the deployment to be scored :type deployment_id: str :param meta_props: meta props for scoring, use ``client.deployments.ScoringMetaNames.show()`` to view the list of ScoringMetaNames :type meta_props: dict :param transaction_id: transaction id to be passed with records during payload logging :type transaction_id: str, optional :return: scoring result containing prediction and probability :rtype: dict .. note:: * *client.deployments.ScoringMetaNames.INPUT_DATA* is the only metaname valid for sync scoring. * The valid payloads for scoring input are either list of values, pandas or numpy dataframes. **Example** .. code-block:: python scoring_payload = {wml_client.deployments.ScoringMetaNames.INPUT_DATA: [{'fields': ['GENDER','AGE','MARITAL_STATUS','PROFESSION'], 'values': [ ['M',23,'Single','Student'], ['M',55,'Single','Executive'] ] }] } predictions = client.deployments.score(deployment_id, scoring_payload) """ if not self._client.CLOUD_PLATFORM_SPACES and self._client.CPD_version < 4.8: self._client._check_if_space_is_set() Deployments._validate_type(deployment_id, u'deployment_id', str, True) Deployments._validate_type(meta_props, u'meta_props', dict, True) if meta_props.get(self.ScoringMetaNames.INPUT_DATA) is None: raise WMLClientError("Scoring data input 'ScoringMetaNames.INPUT_DATA' is mandatory for synchronous " "scoring") scoring_data = meta_props[self.ScoringMetaNames.INPUT_DATA] if scoring_data is not None: score_payload = [] for each_score_request in scoring_data: lib_checker.check_lib(lib_name='pandas') import pandas as pd scoring_values = each_score_request["values"] # Check feature types, currently supporting pandas df, numpy.ndarray, python lists and Dmatrix if (isinstance(scoring_values, pd.DataFrame)): scoring_values = scoring_values.where(pd.notnull(scoring_values), None) fields_names = scoring_values.columns.values.tolist() values = scoring_values.values.tolist() try: values[pd.isnull(values)] = None # note: above code fails when there is no null values in a dataframe except TypeError: pass each_score_request["values"] = values if fields_names is not None: each_score_request["fields"] = fields_names ## If payload is a numpy dataframe elif (isinstance(scoring_values, np.ndarray)): values = scoring_values.tolist() each_score_request["values"] = values # else: # payload_score["values"] = each_score_request["values"] # if "fields" in each_score_request: # payload_score["fields"] = each_score_request["fields"] score_payload.append(each_score_request) ##See if it is scoring or DecisionOptimizationJob payload = {} payload["input_data"] = score_payload headers = self._client._get_headers() if transaction_id is not None: headers.update({'x-global-transaction-id': transaction_id}) if self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES: scoring_url = self._wml_credentials["url"] + "/ml/v4/deployments/" + deployment_id + "/predictions" else: scoring_url = self._wml_credentials["url"] + "/v4/deployments/" + deployment_id + "/predictions" if self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES: params = self._client._params() del params[u'space_id'] response_scoring = requests.post( scoring_url, json=payload, params=params, # version parameter is mandatory headers=headers) else: response_scoring = requests.post( scoring_url, json=payload, headers=headers) return self._handle_response(200, u'scoring', response_scoring)
#########################################
[docs] def get_download_url(self, deployment_details): """Get deployment_download_url from deployment details. :param deployment_details: created deployment details :type deployment_details: dict :return: deployment download URL that is used to get file deployment (for example: Core ML) :rtype: str **Example** .. code-block:: python deployment_url = client.deployments.get_download_url(deployment) """ if self._client.ICP_PLATFORM_SPACES: raise WMLClientError(u'Downloading virtual deployment is no longer supported in Cloud Pak for Data, versions 3.5 and later.') if self._client.CLOUD_PLATFORM_SPACES: raise WMLClientError(u'Downloading virtual deployment is no longer supported in Cloud Pak for Data as a Service.') Deployments._validate_type(deployment_details, u'deployment_details', dict, True) try: virtual_deployment_detaails = deployment_details.get(u'entity').get(u'status').get(u'virtual_deployment_downloads') if virtual_deployment_detaails is not None: url = virtual_deployment_detaails[0].get(u'url') else: url = None except Exception as e: raise WMLClientError(u'Getting download url from deployment details failed.', e) if url is None: raise MissingValue(u'deployment_details.entity.virtual_deployment_downloads.url') return url
[docs] def list(self, limit=None, return_as_df=True, artifact_type=None): """Print deployments in a table format. If limit is set to None there will be only first 50 records shown. :param limit: limit number of fetched records :type limit: int, optional :param return_as_df: determinate if table should be returned as pandas.DataFrame object, default: True :type return_as_df: bool, optional :param artifact_type: return only deployments with the specified artifact_type :type artifact_type: str, optional :return: pandas.DataFrame with listed deployments or None if return_as_df is False :rtype: pandas.DataFrame or None **Example** .. code-block:: python client.deployments.list() """ if not self._client.CLOUD_PLATFORM_SPACES and self._client.CPD_version < 4.8: self._client._check_if_space_is_set() details = self.get_details(limit=limit) resources = details[u'resources'] values = [] index = 0 if self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES: sw_spec_info = {s['id']: s for s in self._client.software_specifications.get_details(state_info=True)['resources']} def enrich_asset_with_type(asset_details, asset_type): if asset_type: asset_details['metadata']['asset_type'] = asset_type return asset_details asset_info = {el['metadata']['id']: enrich_asset_with_type(el, asset_type) for asset_type, resources in {'model': self._client._models.get_details(get_all=True), 'function': self._client._functions.get_details(get_all=True) }.items() for el in resources['resources']} def get_spec_info(spec_id, prop): if spec_id and spec_id in sw_spec_info: return sw_spec_info[spec_id].get(prop, '') else: return '' for m in resources: # Deployment service currently doesn't support limit querying # As a workaround, its filtered in python client # Ideally this needs to be on the server side if limit is not None and index == limit: break if not self._client.CLOUD_PLATFORM_SPACES and not self._client.ICP_PLATFORM_SPACES: if 'guid' in m[u'metadata']: values.append((m[u'metadata'][u'guid'], m[u'entity'][u'name'], m[u'entity'][u'status']['state'], m[u'metadata'][u'created_at'], self._get_deployable_asset_type(m))) else: values.append((m[u'metadata'][u'id'], m[u'entity'][u'name'], m[u'entity'][u'status']['state'], m[u'metadata'][u'created_at'], self._get_deployable_asset_type(m))) else: spec_id = asset_info.get(m['entity'].get('asset', m['entity'].get('prompt_template'))['id'], {})\ .get('entity', {}).get('software_spec', {}).get('id') if artifact_type and m['entity'].get('deployed_asset_type', 'unknown') != artifact_type: pass # filter by artifact_type else: values.append( (m['metadata']['guid'] if 'guid' in m['metadata'] else m['metadata']['id'], m['entity']['name'], m['entity']['status']['state'], m['metadata']['created_at'], m['entity'].get('deployed_asset_type', 'unknown'), get_spec_info(spec_id, 'state'), get_spec_info(spec_id, 'replacement'))) index = index + 1 if not self._client.CLOUD_PLATFORM_SPACES and not self._client.ICP_PLATFORM_SPACES: table = self._list(values, [u'GUID', u'NAME', u'STATE', u'CREATED', u'ARTIFACT_TYPE'], limit, 50) else: table = self._list(values, ['GUID', 'NAME', 'STATE', 'CREATED', 'ARTIFACT_TYPE', 'SPEC_STATE', 'SPEC_REPLACEMENT'], limit, 50) if return_as_df: return table
[docs] def list_jobs(self, limit=None, return_as_df=True): """Print the async deployment jobs in a table format. If limit is set to None there will be only first 50 records shown. :param limit: limit number of fetched records :type limit: int, optional :param return_as_df: determinate if table should be returned as pandas.DataFrame object, default: True :type return_as_df: bool, optional :return: pandas.DataFrame with listed deployment jobs or None :rtype: pandas.DataFrame or None if return_as_df is False .. note:: This method list only async deployment jobs created for WML deployment. **Example** .. code-block:: python client.deployments.list_jobs() """ details = self.get_job_details(limit=limit) resources = details[u'resources'] values = [] index = 0 for m in resources: # Deployment service currently doesn't support limit querying # As a workaround, its filtered in python client if limit is not None and index == limit: break if 'scoring' in m['entity']: state = m['entity']['scoring']['status']['state'] else: state = m['entity']['decision_optimization']['status']['state'] if self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES: deploy_id = m['entity']['deployment']['id'] values.append((m[u'metadata'][u'id'], state, m[u'metadata'][u'created_at'], deploy_id)) else: deploy_id = m['entity']['deployment']['href'].split("/")[3] values.append((m[u'metadata'][u'guid'], state, m[u'metadata'][u'created_at'], deploy_id)) index = index + 1 table = self._list(values, ['JOB-UID', 'STATE', 'CREATED', 'DEPLOYMENT-ID'], limit, 50) if return_as_df: return table
def _get_deployable_asset_type(self, details): url = details[u'entity'][u'asset']['id'] if 'model' in url: return 'model' elif 'function' in url: return 'function' else: return 'unknown'
[docs] def update(self, deployment_uid, changes): """Updates existing deployment metadata. If ASSET is patched, then 'id' field is mandatory and it starts a deployment with the provided asset id/rev. Deployment id remains the same. :param deployment_uid: Unique Id of deployment which should be updated :type deployment_uid: str :param changes: elements which should be changed, where keys are ConfigurationMetaNames :type changes: dict :return: metadata of updated deployment :rtype: dict **Examples** .. code-block:: python metadata = {client.deployments.ConfigurationMetaNames.NAME:"updated_Deployment"} updated_deployment_details = client.deployments.update(deployment_uid, changes=metadata) metadata = {client.deployments.ConfigurationMetaNames.ASSET: { "id": "ca0cd864-4582-4732-b365-3165598dc945", "rev":"2" }} deployment_details = client.deployments.update(deployment_uid, changes=metadata) """ WMLResource._chk_and_block_create_update_for_python36(self) if not self._client.CLOUD_PLATFORM_SPACES and self._client.CPD_version < 4.8: self._client._check_if_space_is_set() ret202 = False ## In case of passing 'AUTO_ONLINE_DEPLOYMENT' as true, we need to poll for deployment to be either 'deploy_success' or 'update_success'. Deployments._validate_type(deployment_uid, 'deployment_uid', str, True) if ('asset' in changes and not changes[u'asset']) and \ ('prompt_template' in changes and not changes[u'prompt_template']): msg = "ASSET/PROMPT_TEMPLATE cannot be empty. 'id' and 'rev' (only ASSET) fields are supported. 'id' is mandatory" print(msg) raise WMLClientError(msg) # if changes.get('asset') is not None and (changes.get('name') is not None or changes.get('description') is not None): patch_job = changes.get('asset') is not None or self.ConfigurationMetaNames.PROMPT_TEMPLATE in changes or \ self.ConfigurationMetaNames.SERVING_NAME in changes or self.ConfigurationMetaNames.OWNER in changes patch_job_field = None if patch_job: if changes.get('asset') is not None: patch_job_field = "ASSET" elif self.ConfigurationMetaNames.PROMPT_TEMPLATE in changes: patch_job_field = "PROMPT_TEMPLATE" elif self.ConfigurationMetaNames.SERVING_NAME in changes: patch_job_field = "SERVING_NAME" elif self.ConfigurationMetaNames.OWNER in changes: patch_job_field = "OWNER" if patch_job_field is None: raise WMLClientError("Unexpected patch job element.") if patch_job and (len(changes) > 1): msg = f"When {patch_job_field} is being updated/patched, other fields cannot be updated. If other fields are to be " \ f"updated, try without {patch_job_field} update. {patch_job_field} update triggers deployment with the new asset retaining " \ "the same deployment_id" print(msg) raise WMLClientError(msg) deployment_details = self.get_details(deployment_uid) serving_name_change = False new_serving_name = None if self.ConfigurationMetaNames.SERVING_NAME in changes: new_serving_name = changes.pop(self.ConfigurationMetaNames.SERVING_NAME) serving_name_change = True patch_payload = self.ConfigurationMetaNames._generate_patch_payload(deployment_details, changes, with_validation=True) if serving_name_change: replace = 'serving_name' in deployment_details['entity'].get('online').get('parameters', []) patch_payload.append({ "op": "replace" if replace else "add", "path": "/online/parameters", "value": {"serving_name": new_serving_name}}) ## As auto_online_deployment and auto_redeploy values are passed as 'bool' but service needs them in 'str' format to patch. for ele in patch_payload: if 'auto_online_deployment' in ele['path'] or 'auto_redeploy' in ele['path']: ele['value'] = str(ele['value']).lower() url = self._client.service_instance._href_definitions.get_deployment_href(deployment_uid) response = requests.patch(url, json=patch_payload, params=self._client._params(), headers=self._client._get_headers()) if patch_job and response.status_code == 202: updated_details = self._handle_response(202, u'deployment asset patch', response) ret202 = True print(f"Since {patch_job_field} is patched, deployment with new asset id/rev is being started. " \ "Monitor the status using deployments.get_details(deployment_uid) api") elif response.status_code == 202: updated_details = self._handle_response(202, u'deployment scaling', response) ret202 = True else: updated_details = self._handle_response(200, u'deployment patch', response) if('auto_online_deployment' in changes): if response is not None: if response.status_code == 200: deployment_details = self.get_details(deployment_uid) import time print_text_header_h1(u' deployment update for uid: \'{}\' started'.format(deployment_uid)) status = deployment_details[u'entity'][u'status'][u'state'] with StatusLogger(status) as status_logger: while True: time.sleep(5) deployment_details = self.get_details(deployment_uid) status = deployment_details[u'entity'][u'status'][u'state'] status_logger.log_state(status) if status != u'DEPLOY_IN_PROGRESS' and status != u'UPDATE_IN_PROGRESS': break if status == u'DEPLOY_SUCCESS' or status == u'UPDATE_SUCCESS': print(u'') print_text_header_h2( u'Successfully finished deployment update, deployment_uid=\'{}\''.format(deployment_uid)) return deployment_details else: print_text_header_h2(u'Deployment update failed') self._deployment_status_errors_handling(deployment_details, 'update', deployment_uid) else: error_msg = u'Deployment update failed' reason = response.text print(reason) print_text_header_h2(error_msg) raise WMLClientError(error_msg + u'. Error: ' + str(response.status_code) + '. ' + reason) if not ret202: return updated_details
## Below functions are for async scoring. They are just dummy functions. def _score_async(self,deployment_uid, scoring_payload, transaction_id=None, retention=None): Deployments._validate_type(deployment_uid, u'deployment_id', str, True) Deployments._validate_type(scoring_payload, u'scoring_payload', dict, True) headers = self._client._get_headers() if transaction_id is not None: headers.update({'x-global-transaction-id': transaction_id}) # making change - connection keep alive scoring_url = self._client.service_instance._href_definitions.get_async_deployment_job_href() if self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES: params = self._client._params() else: params = None if not self._client.ICP and retention is not None: if not isinstance(retention, int) or retention < -1: raise TypeError("`retention` takes integer values greater or equal than -1.") params.update({'retention': retention}) response_scoring = requests.post( scoring_url, params=params, json=scoring_payload, headers=headers) return self._handle_response(202, u'scoring asynchronously', response_scoring)
[docs] def create_job(self, deployment_id, meta_props, retention=None, transaction_id=None, _asset_id=None): """Create an asynchronous deployment job. :param deployment_id: Unique Id of Deployment :type deployment_id: str :param meta_props: metaprops. To see the available list of metanames use ``client.deployments.ScoringMetaNames.get()`` or ``client.deployments.DecisionOptimizationmetaNames.get()`` :type meta_props: dict :param retention: how many job days job meta should be retained, takes integer values >= -1, supported only on Cloud :type retention: int, optional :return: metadata of the created async deployment job :rtype: dict .. note:: * The valid payloads for scoring input are either list of values, pandas or numpy dataframes. **Example** .. code-block:: python scoring_payload = {wml_client.deployments.ScoringMetaNames.INPUT_DATA: [{'fields': ['GENDER','AGE','MARITAL_STATUS','PROFESSION'], 'values': [['M',23,'Single','Student'], ['M',55,'Single','Executive']]}]} async_job = client.deployments.create_job(deployment_id, scoring_payload) """ WMLResource._chk_and_block_create_update_for_python36(self) Deployments._validate_type(deployment_id, u'deployment_uid', str, True) Deployments._validate_type(meta_props, u'meta_props', dict, True) if _asset_id: Deployments._validate_type(_asset_id, u'_asset_id', str, True) # We assume that _asset_id is the id of the asset that was deployed # in the deployment with id deployment_id, and we save one REST call asset = _asset_id else: deployment_details = self.get_details(deployment_id) if self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES: asset = deployment_details["entity"]["asset"]['id'] else: asset = deployment_details["entity"]["asset"]["href"].split("/")[-1] do_model = False if self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES: asset_details = self._client.data_assets.get_details(asset) if 'wml_model' in asset_details["entity"] and 'type' in asset_details['entity']['wml_model']: if 'do' in asset_details['entity']['wml_model']['type']: do_model = True else: asset_details = self._client.repository.get_details(asset) if "type" in asset_details["entity"]: if "do" in asset_details["entity"]["type"]: do_model = True flag = 0 ## To see if it is async scoring or DecisionOptimization Job if do_model: payload = self.DecisionOptimizationMetaNames._generate_resource_metadata(meta_props, with_validation=True, client=self._client) flag = 1 else: payload = self.ScoringMetaNames._generate_resource_metadata(meta_props, with_validation=True, client=self._client) scoring_data = None if "scoring" in payload and "input_data" in payload["scoring"]: scoring_data = payload["scoring"]["input_data"] if "decision_optimization" in payload and "input_data" in payload["decision_optimization"]: scoring_data = payload["decision_optimization"]["input_data"] if scoring_data is not None: score_payload = [] for each_score_request in scoring_data: lib_checker.check_lib(lib_name='pandas') import pandas as pd if "values" in each_score_request: scoring_values = each_score_request["values"] # Check feature types, currently supporting pandas df, numpy.ndarray, python lists and Dmatrix if (isinstance(scoring_values, pd.DataFrame)): fields_names = scoring_values.columns.values.tolist() values = scoring_values.where(pd.notnull(scoring_values), None).values.tolist() #replace nan with None each_score_request["values"] = values if fields_names is not None: each_score_request["fields"] = fields_names ## If payload is a numpy dataframe elif (isinstance(scoring_values, np.ndarray)): values = np.where(pd.notnull(scoring_values), scoring_values, None).tolist() #replace nan with None each_score_request["values"] = values # else: # payload_score["values"] = each_score_request["values"] # if "fields" in each_score_request: # payload_score["fields"] = each_score_request["fields"] score_payload.append(each_score_request) ##See if it is scoring or DecisionOptimizationJob if flag == 0: payload["scoring"]["input_data"] = score_payload if flag == 1: payload["decision_optimization"]["input_data"] = score_payload if self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES: import copy if 'input_data_references' in meta_props: Deployments._validate_type(meta_props.get('input_data_references'), u'input_data_references', list, True) modified_input_data_references=False input_data = copy.deepcopy(meta_props.get('input_data_references')) for i, input_data_fields in enumerate(input_data): if 'connection' not in input_data_fields: modified_input_data_references=True input_data_fields.update({'connection': {}}) if modified_input_data_references: if 'scoring' in payload: payload['scoring'].update({'input_data_references': input_data}) else: payload['decision_optimization'].update({'input_data_references': input_data}) if 'output_data_reference' in meta_props: Deployments._validate_type(meta_props.get('output_data_reference'), u'output_data_reference', dict, True) output_data = copy.deepcopy(meta_props.get('output_data_reference')) if 'connection' not in output_data: # and output_data.get('connection', None) is not None: output_data.update({'connection': {}}) payload['scoring'].update({'output_data_reference': output_data}) if 'output_data_references' in meta_props: Deployments._validate_type(meta_props.get('output_data_references'), u'output_data_references', list, True) output_data = copy.deepcopy(meta_props.get('output_data_references')) modified_output_data_references = False for i, output_data_fields in enumerate(output_data): if 'connection' not in output_data_fields: modified_output_data_references = True output_data_fields.update({'connection': {}}) if modified_output_data_references and 'decision_optimization' in payload: payload['decision_optimization'].update({'output_data_references': output_data}) payload.update({"deployment": {"id": deployment_id}}) if 'hardware_spec' in meta_props: payload.update({"hardware_spec": meta_props[self.ConfigurationMetaNames.HARDWARE_SPEC]}) if 'hybrid_pipeline_hardware_specs' in meta_props: payload.update({"hybrid_pipeline_hardware_specs": meta_props[self.ConfigurationMetaNames.HYBRID_PIPELINE_HARDWARE_SPECS]}) if self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES: payload.update({'space_id': self._client.default_space_id}) if 'name' not in payload: import uuid payload.update({'name': 'name_' + str(uuid.uuid4())}) else: payload.update({"deployment": {"href": "/v4/deployments/" + deployment_id}}) return self._score_async(deployment_id, payload, transaction_id=transaction_id, retention=retention)
[docs] def get_job_details(self, job_uid=None, include=None, limit=None): """Get information about deployment job(s). If deployment job_uid is not passed, all deployment jobs details are fetched. :param job_uid: Unique Job ID :type job_uid: str, optional :param include: fields to be retrieved from 'decision_optimization' and 'scoring' section mentioned as value(s) (comma separated) as output response fields :type include: str, optional :param limit: limit number of fetched records :type limit: int, optional :return: metadata of deployment job(s) :rtype: dict (if job_uid is not None) or {"resources": [dict]} (if job_uid is None) **Example** .. code-block:: python deployment_details = client.deployments.get_job_details() deployments_details = client.deployments.get_job_details(job_uid=job_uid) """ if job_uid is not None: Deployments._validate_type(job_uid, u'job_uid', str, True) url = self._client.service_instance._href_definitions.get_async_deployment_job_href() params = self._client._params() if include: params['include'] = include return self._get_artifact_details(url, job_uid, limit, 'async deployment job' if job_uid else 'async deployment jobs', query_params=params)
[docs] def get_job_status(self, job_id): """Get the status of the deployment job. :param job_id: Unique Id of the deployment job :type job_id: str :return: status of the deployment job :rtype: dict **Example** .. code-block:: python job_status = client.deployments.get_job_status(job_uid) """ job_details = self.get_job_details(job_id) if 'scoring' not in job_details['entity']: return job_details['entity']['decision_optimization']['status'] return job_details['entity']['scoring']['status']
[docs] def get_job_uid(self, job_details): """Get the Unique Id of the deployment job. :param job_details: metadata of the deployment job :type job_details: dict :return: Unique Id of the deployment job :rtype: str **Example** .. code-block:: python job_details = client.deployments.get_job_details(job_uid=job_uid) job_status = client.deployments.get_job_uid(job_details) """ if self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES: return job_details['metadata']['id'] else: return job_details['metadata']['guid']
[docs] def get_job_href(self, job_details): """Get the href of the deployment job. :param job_details: metadata of the deployment job :type job_details: dict :return: href of the deployment job :rtype: str **Example** .. code-block:: python job_details = client.deployments.get_job_details(job_uid=job_uid) job_status = client.deployments.get_job_href(job_details) """ if self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES: return "/ml/v4/deployment_jobs/{}".format(job_details[u'metadata'][u'id']) else: return job_details['metadata']['href']
[docs] def delete_job(self, job_uid, hard_delete=False): """Cancels a deployment job that is currenlty running. This method is also be used to delete metadata details of the completed or canceled jobs when hard_delete parameter is set to True. :param job_uid: Unique Id of deployment job which should be canceled :type job_uid: str :param hard_delete: specify `True` or `False`: `True` - To delete the completed or canceled job. `False` - To cancel the currently running deployment job. :type hard_delete: bool, optional :return: status ("SUCCESS" or "FAILED") :rtype: str **Example** .. code-block:: python client.deployments.delete_job(job_uid) """ Deployments._validate_type(job_uid, u'job_uid', str, True) if job_uid is not None and not is_uid(job_uid): raise WMLClientError(u'\'job_uid\' is not an uid: \'{}\''.format(job_uid)) url = self._client.service_instance._href_definitions.get_async_deployment_jobs_href(job_uid) if self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES: params = self._client._params() else: params = {} if hard_delete is True: params.update({'hard_delete': u'true'}) response_delete = requests.delete( url, headers=self._client._get_headers(), params=params) return self._handle_response(204, u'deployment async job deletion', response_delete, False)
def _get_filter_func_by_spec_state(self, spec_state): def filter_func(resources): asset_ids = [i['metadata']['id'] for key, value in {'model': self._client._models.get_details(get_all=True, spec_state=spec_state), 'function': self._client._functions.get_details(get_all=True, spec_state=spec_state) }.items() for i in value['resources']] return [r for r in resources if r['entity'].get('asset', {}).get('id') in asset_ids] return filter_func def _get_model_inference_text(self, deployment_id, inference_type: Literal["text", "text_stream"], params=None): """Based on provided deployment_id and params get ModelInference object. Verify that the deployment with the given deployment_id has generating methods. """ # Import ModelInference here to avoid circular import error # Check from which package, wx or wml, is call ModelInference = _check_if_import_from_watsonx_ai(self.__module__, 'ibm_watson_machine_learning.foundation_models.inference.model_inference', 'ModelInference') match inference_type: case "text": generated_url = self._client.service_instance._href_definitions.get_fm_deployment_generation_href( deployment_id=deployment_id, item='text') case "text_stream": if self._client._use_fm_ga_api: generated_url = self._client.service_instance._href_definitions.get_fm_deployment_generation_stream_href( deployment_id=deployment_id) else: # Remove on CPD 5.0 release generated_url = self._client.service_instance._href_definitions.get_fm_deployment_generation_href( deployment_id=deployment_id, item="text_stream") case _: raise InvalidValue(value_name="inference_type", reason=f"Available types: 'text', 'text_stream', got: {inference_type}.") inference_url_list = [url.get("url") for url in self.get_details(deployment_id, _silent=True)['entity'].get('status', {}).get("inference", {})] if not inference_url_list: inference_url_list = self.get_details(deployment_id, _silent=True)['entity'].get('status', {}).get('serving_urls', []) if generated_url not in inference_url_list: for inference_url in inference_url_list: # Remove on CPD 5.0 release if "v1-beta/deployments" not in inference_url: # Remove on CPD 5.0 release raise WMLClientError(Messages.get_message(deployment_id, message_id="fm_deployment_has_not_inference_for_generation")) return ModelInference(deployment_id=deployment_id, params=params, api_client=self._client)
[docs] def generate(self, deployment_id, prompt=None, params=None, guardrails=False, guardrails_hap_params=None, guardrails_pii_params=None, concurrency_limit=10, async_mode=False) -> dict: """Generate a raw response with `prompt` for given `deployment_id`. :param deployment_id: Id of deployment :type deployment_id: str :param prompt: prompt needed for text generation. If deployment_id points to Prompt Template asset then prompt argument must be None, defaults to None :type prompt: (str | None), optional :param params: meta props for text generation, use ``ibm_watson_machine_learning.metanames.GenTextParamsMetaNames().show()`` to view the list of MetaNames :type params: dict :param guardrails: If True then potentially hateful, abusive, and/or profane language (HAP) detection filter is toggle on for both prompt and generated text, defaults to False :type guardrails: bool :param guardrails_hap_params: meta props for HAP moderations, use ``ibm_watson_machine_learning.metanames.GenTextModerationsMetaNames().show()`` to view the list of MetaNames :type guardrails_hap_params: dict :param concurrency_limit: number of requests that will be sent in parallel, max is 10 :type concurrency_limit: int, optional :param async_mode: If True then yield results asynchronously (using generator). In this case both prompt and generated text will be concatenated in the final response - under `generated_text`, defaults to False :type async_mode: bool :return: scoring result containing generated content. :rtype: dict """ d_inference = self._get_model_inference_text(deployment_id, "text", params) return d_inference.generate(prompt=prompt, guardrails=guardrails, guardrails_hap_params=guardrails_hap_params, guardrails_pii_params=guardrails_pii_params, concurrency_limit=concurrency_limit, params=params, async_mode=async_mode)
[docs] def generate_text(self, deployment_id, prompt=None, params=None, raw_response=False, guardrails=False, guardrails_hap_params=None, guardrails_pii_params=None, concurrency_limit=10) -> str: """Given the selected deployment (deployment_id), a text prompt as input, parameters and concurrency_limit, the selected inference will generate a completion text as generated_text response. :param deployment_id: Id of deployment :type deployment_id: str :param prompt: the prompt string or list of strings. If list of strings is passed requests will be managed in parallel with the rate of concurency_limit, defaults to None :type prompt: (str | None), optional :param params: meta props for text generation, use ``ibm_watson_machine_learning.metanames.GenTextParamsMetaNames().show()`` to view the list of MetaNames :type params: dict :param raw_response: return the whole response object :type raw_response: bool, optional :param guardrails: If True then potentially hateful, abusive, and/or profane language (HAP) detection filter is toggle on for both prompt and generated text, defaults to False :type guardrails: bool :param guardrails_hap_params: meta props for HAP moderations, use ``ibm_watson_machine_learning.metanames.GenTextModerationsMetaNames().show()`` to view the list of MetaNames :type guardrails_hap_params: dict :param concurrency_limit: number of requests that will be sent in parallel, max is 10 :type concurrency_limit: int :return: generated content :rtype: str .. note:: By default only the first occurance of `HAPDetectionWarning` is displayed. To enable printing all warnings of this category, use: .. code-block:: python import warnings from ibm_watson_machine_learning.foundation_models.utils import HAPDetectionWarning warnings.filterwarnings("always", category=HAPDetectionWarning) """ d_inference = self._get_model_inference_text(deployment_id, "text", params) return d_inference.generate_text(prompt=prompt, raw_response=raw_response, guardrails=guardrails, guardrails_hap_params=guardrails_hap_params, guardrails_pii_params=guardrails_pii_params, concurrency_limit=concurrency_limit, params=params)
[docs] def generate_text_stream(self, deployment_id, prompt=None, params=None, raw_response=False, guardrails=False, guardrails_hap_params=None, guardrails_pii_params=None): """Given the selected deployment (deployment_id), a text prompt as input and parameters, the selected inference will generate a streamed text as generate_text_stream. :param deployment_id: Id of deployment :type deployment_id: str :param prompt: the prompt string, defaults to None :type prompt: (str | None), optional :param params: meta props for text generation, use ``ibm_watson_machine_learning.metanames.GenTextParamsMetaNames().show()`` to view the list of MetaNames :type params: dictl :param raw_response: yields the whole response object :type raw_response: bool, optional :param guardrails: If True then potentially hateful, abusive, and/or profane language (HAP) detection filter is toggle on for both prompt and generated text, defaults to False :type guardrails: bool :param guardrails_hap_params: meta props for HAP moderations, use ``ibm_watson_machine_learning.metanames.GenTextModerationsMetaNames().show()`` to view the list of MetaNames :type guardrails_hap_params: dict :return: generated content :rtype: str .. note:: By default only the first occurance of `HAPDetectionWarning` is displayed. To enable printing all warnings of this category, use: .. code-block:: python import warnings from ibm_watson_machine_learning.foundation_models.utils import HAPDetectionWarning warnings.filterwarnings("always", category=HAPDetectionWarning) """ d_inference = self._get_model_inference_text(deployment_id, "text_stream", params) return d_inference.generate_text_stream(prompt=prompt, params=params, raw_response=raw_response, guardrails=guardrails, guardrails_hap_params=guardrails_hap_params, guardrails_pii_params=guardrails_pii_params)