# -----------------------------------------------------------------------------------------
# (C) Copyright IBM Corp. 2020-2024.
# https://opensource.org/licenses/BSD-3-Clause
# -----------------------------------------------------------------------------------------
import io
import os
import time
from typing import TYPE_CHECKING, Any, Dict, Union, List, Optional
import pandas as pd
from pandas import DataFrame, concat
from .base_deployment import BaseDeployment
from ..helpers import DataConnection, AssetLocation
from ..utils import StatusLogger, print_text_header_h1
from ..utils.autoai.connection import validate_source_data_connections, validate_deployment_output_connection
from ..utils.autoai.utils import init_cos_client, try_load_dataset, convert_dataframe_to_fields_values_payload
from ..utils.autoai.errors import NoneDataConnection
from ..utils.deployment.errors import BatchJobFailed, MissingScoringResults
from ..wml_client_error import WMLClientError
from ibm_watson_machine_learning.utils.autoai.enums import DataConnectionTypes
if TYPE_CHECKING:
from sklearn.pipeline import Pipeline
from pandas import DataFrame
from numpy import ndarray
from ..workspace import WorkSpace
__all__ = [
"Batch"
]
[docs]
class Batch(BaseDeployment):
"""The Batch Deployment class.
With this class object you can manage any batch deployment.
:param source_wml_credentials: credentials to Watson Machine Learning instance where training was performed
:type source_wml_credentials: dict
:param source_project_id: ID of the Watson Studio project where training was performed
:type source_project_id: str, optional
:param source_space_id: ID of the Watson Studio Space where training was performed
:type source_space_id: str, optional
:param target_wml_credentials: credentials to Watson Machine Learning instance where you want to deploy
:type target_wml_credentials: dict
:param target_project_id: ID of the Watson Studio project where you want to deploy
:type target_project_id: str, optional
:param target_space_id: ID of the Watson Studio Space where you want to deploy
:type target_space_id: str, optional
"""
def __init__(self,
source_wml_credentials: Union[dict, 'WorkSpace'] = None,
source_project_id: str = None,
source_space_id: str = None,
target_wml_credentials: Union[dict, 'WorkSpace'] = None,
target_project_id: str = None,
target_space_id: str = None,
wml_credentials: Union[dict, 'WorkSpace'] = None,
project_id: str = None,
space_id: str = None):
super().__init__(
deployment_type='batch',
source_wml_credentials=source_wml_credentials,
source_project_id=source_project_id,
source_space_id=source_space_id,
target_wml_credentials=target_wml_credentials,
target_project_id=target_project_id,
target_space_id=target_space_id,
wml_credentials=wml_credentials,
project_id=project_id,
space_id=space_id
)
self.name = None
self.id = None
self.asset_id = None
def __repr__(self):
return f"name: {self.name}, id: {self.id}, asset_id: {self.asset_id}"
def __str__(self):
return f"name: {self.name}, id: {self.id}, asset_id: {self.asset_id}"
[docs]
def score(self, **kwargs):
raise NotImplementedError("Batch deployment supports only job runs.")
[docs]
def create(self,
model: str,
deployment_name: str,
metadata: Optional[Dict] = None,
training_data: Optional[Union['DataFrame', 'ndarray']] = None,
training_target: Optional[Union['DataFrame', 'ndarray']] = None,
experiment_run_id: Optional[str] = None) -> None:
"""Create deployment from a model.
:param model: AutoAI model name
:type model: str
:param deployment_name: name of the deployment
:type deployment_name: str
:param training_data: training data for the model
:type training_data: pandas.DataFrame or numpy.ndarray, optional
:param training_target: target/label data for the model
:type training_target: pandas.DataFrame or numpy.ndarray, optional
:param metadata: model meta properties
:type metadata: dict, optional
:param experiment_run_id: ID of a training/experiment (only applicable for AutoAI deployments)
:type experiment_run_id: str, optional
**Example**
.. code-block:: python
from ibm_watson_machine_learning.deployment import Batch
deployment = Batch(
wml_credentials={
"apikey": "...",
"iam_apikey_description": "...",
"iam_apikey_name": "...",
"iam_role_crn": "...",
"iam_serviceid_crn": "...",
"instance_id": "...",
"url": "https://us-south.ml.cloud.ibm.com"
},
project_id="...",
space_id="...")
deployment.create(
experiment_run_id="...",
model=model,
deployment_name='My new deployment'
)
"""
return super().create(model=model,
deployment_name=deployment_name,
metadata=metadata,
training_data=training_data,
training_target=training_target,
experiment_run_id=experiment_run_id,
deployment_type='batch')
[docs]
@BaseDeployment._project_to_space_to_project
def get_params(self) -> Dict:
"""Get deployment parameters."""
return super().get_params()
[docs]
@BaseDeployment._project_to_space_to_project
def run_job(self,
payload: Union[DataFrame, List[DataConnection], Dict[str, DataFrame], Dict[str, DataConnection]] = pd.DataFrame(),
output_data_reference: 'DataConnection' = None,
transaction_id: str = None,
background_mode: 'bool' = True) -> Union[Dict, Dict[str, List], DataConnection]:
"""Batch scoring job on WML. Payload or Payload data reference is required.
It is passed to the WML where model have been deployed.
:param payload: DataFrame that contains data to test the model or data storage connection details
that inform the model where payload data is stored
:type payload: pandas.DataFrame or List[DataConnection] or Dict
:param output_data_reference: DataConnection to the output COS for storing predictions,
required only when DataConnections are used as a payload
:type output_data_reference: DataConnection, optional
:param transaction_id: can be used to indicate under which id the records will be saved into payload table
in IBM OpenScale
:type transaction_id: str, optional
:param background_mode: indicator if score() method will run in background (async) or (sync)
:type background_mode: bool, optional
:return: scoring job details
:rtype: dict
**Examples**
.. code-block:: python
score_details = batch_service.run_job(payload=test_data)
print(score_details['entity']['scoring'])
# Result:
# {'input_data': [{'fields': ['sepal_length',
# 'sepal_width',
# 'petal_length',
# 'petal_width'],
# 'values': [[4.9, 3.0, 1.4, 0.2]]}],
# 'predictions': [{'fields': ['prediction', 'probability'],
# 'values': [['setosa',
# [0.9999320742502246,
# 5.1519823540224506e-05,
# 1.6405926235405522e-05]]]}]
payload_reference = DataConnection(location=DSLocation(asset_id=asset_id))
score_details = batch_service.run_job(payload=payload_reference, output_data_filename = "scoring_output.csv")
score_details = batch_service.run_job(payload={'observations': payload_reference})
score_details = batch_service.run_job(payload=[payload_reference])
score_details = batch_service.run_job(payload={'observations': payload_reference, 'supporting_features': supporting_features_reference}) # supporting features time series forecasting sceanrio
"""
if isinstance(payload, dict):
observations = payload.get('observations', pd.DataFrame())
supporting_features = payload.get('supporting_features')
if isinstance(observations, DataFrame) and \
(isinstance(supporting_features, DataFrame) or supporting_features is None):
observations_payload = convert_dataframe_to_fields_values_payload(observations, return_values_only=True)
observations_payload['id'] = 'observations'
input_data = [observations_payload]
if supporting_features is not None:
supporting_features_payload = convert_dataframe_to_fields_values_payload(supporting_features, return_values_only=True)
supporting_features_payload['id'] = 'supporting_features'
input_data.append(supporting_features_payload)
scoring_payload = {
self._target_workspace.wml_client.deployments.ScoringMetaNames.INPUT_DATA: input_data
}
elif isinstance(observations, DataConnection) and \
(isinstance(supporting_features, DataConnection) or supporting_features is None):
observations.id = 'observations'
input_data = [observations]
if supporting_features is not None:
supporting_features.id = 'supporting_features'
input_data.append(supporting_features)
for data_conn in input_data:
if hasattr(data_conn, 'location') and isinstance(data_conn.location, AssetLocation):
data_conn.location.wml_client = self._target_workspace.wml_client
input_data = validate_source_data_connections(source_data_connections=input_data,
workspace=self._target_workspace,
deployment=True)
input_data = [data_connection._to_dict() for data_connection in input_data]
if output_data_reference is None:
raise ValueError("\"output_data_reference\" should be provided.")
if isinstance(output_data_reference, DataConnection):
# wml_client sets correct href for Data Assets
if hasattr(output_data_reference, 'location') and isinstance(output_data_reference.location,
AssetLocation):
output_data_reference.location.wml_client = self._target_workspace.wml_client
output_data_reference = validate_deployment_output_connection(
results_data_connection=output_data_reference,
workspace=self._target_workspace,
source_data_connections=input_data)
output_data_reference = output_data_reference._to_dict()
scoring_payload = {
self._target_workspace.wml_client.deployments.ScoringMetaNames.INPUT_DATA_REFERENCES: input_data,
self._target_workspace.wml_client.deployments.ScoringMetaNames.OUTPUT_DATA_REFERENCE:
output_data_reference}
else:
raise TypeError('Missing data observations in payload or observations '
'or supporting_features are not pandas.DataFrames.')
# note: support for DataFrame payload
elif isinstance(payload, DataFrame):
scoring_payload = {
self._target_workspace.wml_client.deployments.ScoringMetaNames.INPUT_DATA: [{'values': payload}]
}
# note: support for DataConnections and dictionaries payload
elif isinstance(payload, list):
if isinstance(payload[0], DataConnection):
if None in payload:
raise NoneDataConnection('payload')
# wml_client sets correct href for Data Assets
for data_conn in payload:
if hasattr(data_conn, 'location') and isinstance(data_conn.location, AssetLocation):
data_conn.location.wml_client = self._target_workspace.wml_client
payload = [new_conn for conn in payload for new_conn in conn._subdivide_connection()]
payload = validate_source_data_connections(source_data_connections=payload,
workspace=self._target_workspace,
deployment=True)
payload = [data_connection._to_dict() for data_connection in payload]
elif isinstance(payload[0], dict):
pass
else:
raise ValueError(f"Current payload type: list of {type(payload[0])} is not supported.")
if output_data_reference is None:
raise ValueError("\"output_data_reference\" should be provided.")
if isinstance(output_data_reference, DataConnection):
# wml_client sets correct href for Data Assets
if hasattr(output_data_reference, 'location') and isinstance(output_data_reference.location, AssetLocation):
output_data_reference.location.wml_client = self._target_workspace.wml_client
output_data_reference = validate_deployment_output_connection(
results_data_connection=output_data_reference,
workspace=self._target_workspace,
source_data_connections=payload)
output_data_reference = output_data_reference._to_dict()
scoring_payload = {
self._target_workspace.wml_client.deployments.ScoringMetaNames.INPUT_DATA_REFERENCES: payload,
self._target_workspace.wml_client.deployments.ScoringMetaNames.OUTPUT_DATA_REFERENCE:
output_data_reference}
else:
raise ValueError(
f"Incorrect payload type. Required: DataFrame or List[DataConnection], Passed: {type(payload)}")
scoring_payload['hybrid_pipeline_hardware_specs'] = [
{
'node_runtime_id': 'auto_ai.kb',
'hardware_spec': {
'name': 'M'
}
}
]
if self._obm:
scoring_payload['hybrid_pipeline_hardware_specs'].insert(
0,
{
"node_runtime_id": "auto_ai.obm",
"hardware_spec": {
"name": "M-Spark",
"num_nodes": 2
}
}
)
job_details = self._target_workspace.wml_client.deployments.create_job(self.id,
scoring_payload, _asset_id=self.asset_id)
if background_mode:
return job_details
else:
# note: monitor scoring job
job_id = self._target_workspace.wml_client.deployments.get_job_uid(job_details)
print_text_header_h1(u'Synchronous scoring for id: \'{}\' started'.format(job_id))
status = self.get_job_status(job_id)['state']
with StatusLogger(status) as status_logger:
while status not in ['failed', 'error', 'completed', 'canceled']:
time.sleep(10)
status = self.get_job_status(job_id)['state']
status_logger.log_state(status)
# --- end note
if u'completed' in status:
print(u'\nScoring job \'{}\' finished successfully.'.format(job_id))
else:
raise BatchJobFailed(job_id, f"Scoring job failed with status: {self.get_job_status(job_id)}")
return self.get_job_params(job_id)
[docs]
@BaseDeployment._project_to_space_to_project
def rerun_job(self,
scoring_job_id: str,
background_mode: bool = True) -> Union[dict, 'DataFrame', 'DataConnection']:
"""Rerun scoring job with the same parameters as job described by `scoring_job_id`.
:param scoring_job_id: Id described scoring job
:type scoring_job_id: str
:param background_mode: indicator if score_rerun() method will run in background (async) or (sync)
:type background_mode: bool, optional
:return: scoring job details
:rtype: dict
**Example**
.. code-block:: python
scoring_details = deployment.score_rerun(scoring_job_id)
"""
scoring_params = self.get_job_params(scoring_job_id)['entity']['scoring']
input_data_references = self._target_workspace.wml_client.deployments.ScoringMetaNames.INPUT_DATA_REFERENCES
output_data_reference = self._target_workspace.wml_client.deployments.ScoringMetaNames.OUTPUT_DATA_REFERENCE
if input_data_references in scoring_params:
payload_ref = [input_ref for input_ref in scoring_params[input_data_references]]
if 'href' in scoring_params[output_data_reference]['location']:
del scoring_params[output_data_reference]['location']['href']
return self.run_job(payload=payload_ref, output_data_reference=scoring_params['output_data_reference'],
background_mode=background_mode)
else:
raise NotImplementedError("'rerun_job' method supports only jobs with "
"payload passed as a list of DataConnections. If you want to rerun job "
"with payload passed directly, please use 'run_job' one more time.")
[docs]
@BaseDeployment._project_to_space_to_project
def delete(self, deployment_id: str = None) -> None:
"""Delete deployment on WML.
:param deployment_id: ID of the deployment to delete, if empty, current deployment will be deleted
:type deployment_id: str, optional
**Example**
.. code-block:: python
deployment = Batch(workspace=...)
# Delete current deployment
deployment.delete()
# Or delete a specific deployment
deployment.delete(deployment_id='...')
"""
super().delete(deployment_id=deployment_id, deployment_type='batch')
[docs]
@BaseDeployment._project_to_space_to_project
def list(self, limit=None) -> 'DataFrame':
"""List WML deployments.
:param limit: set the limit of how many deployments to list,
default is `None` (all deployments should be fetched)
:type limit: int, optional
:return: Pandas DataFrame with information about deployments
:rtype: pandas.DataFrame
**Example**
.. code-block:: python
deployment = Batch(workspace=...)
deployments_list = deployment.list()
print(deployments_list)
# Result:
# created_at ... status
# 0 2020-03-06T10:50:49.401Z ... ready
# 1 2020-03-06T13:16:09.789Z ... ready
# 4 2020-03-11T14:46:36.035Z ... failed
# 3 2020-03-11T14:49:55.052Z ... failed
# 2 2020-03-11T15:13:53.708Z ... ready
"""
return super().list(limit=limit, deployment_type='batch')
[docs]
@BaseDeployment._project_to_space_to_project
def get(self, deployment_id: str) -> None:
"""Get WML deployment.
:param deployment_id: ID of the deployment to work with
:type deployment_id: str
**Example**
.. code-block:: python
deployment = Batch(workspace=...)
deployment.get(deployment_id="...")
"""
super().get(deployment_id=deployment_id, deployment_type='batch')
[docs]
@BaseDeployment._project_to_space_to_project
def get_job_params(self, scoring_job_id: str = None) -> Dict:
"""Get batch deployment job parameters.
:param scoring_job_id: Id of scoring job
:type scoring_job_id: str
:return: parameters of the scoring job
:rtype: dict
"""
return self._target_workspace.wml_client.deployments.get_job_details(scoring_job_id)
[docs]
@BaseDeployment._project_to_space_to_project
def get_job_status(self, scoring_job_id: str) -> Dict:
"""Get status of scoring job.
:param scoring_job_id: Id of scoring job
:type scoring_job_id: str
:return: dictionary with state of scoring job (one of: [completed, failed, starting, queued])
and additional details if they exist
:rtype: dict
"""
return self._target_workspace.wml_client.deployments.get_job_status(scoring_job_id)
[docs]
@BaseDeployment._project_to_space_to_project
def get_job_result(self, scoring_job_id: str) -> 'DataFrame':
"""Get batch deployment results of job with id `scoring_job_id`.
:param scoring_job_id: Id of scoring job which results will be returned
:type scoring_job_id: str
:return: result
:rtype: pandas.DataFrame
:raises MissingScoringResults: in case of incompleted or failed job
`MissingScoringResults` scoring exception is raised
"""
scoring_params = self.get_job_params(scoring_job_id)['entity']['scoring']
if scoring_params['status']['state'] == 'completed':
if 'predictions' in scoring_params:
data = DataFrame(scoring_params['predictions'][0]['values'], columns=scoring_params['predictions'][0]['fields'])
return data
else:
conn = DataConnection._from_dict(scoring_params['output_data_reference'])
conn._wml_client = self._target_workspace.wml_client
return conn.read(raw=True) # if in future output may be excel file or with custom separator, here it should be recognized
else:
raise MissingScoringResults(scoring_job_id, reason="Scoring is not completed.")
[docs]
@BaseDeployment._project_to_space_to_project
def get_job_id(self, batch_scoring_details):
"""Get id from batch scoring details."""
return self._target_workspace.wml_client.deployments.get_job_uid(batch_scoring_details)
[docs]
@BaseDeployment._project_to_space_to_project
def list_jobs(self):
"""Returns pandas DataFrame with list of deployment jobs"""
resources = self._target_workspace.wml_client.deployments.get_job_details()['resources']
columns = [u'job id', u'state', u'creted', u'deployment id']
values = []
for scoring_details in resources:
if 'scoring' in scoring_details['entity']:
state = scoring_details['entity']['scoring']['status']['state']
score_values = (scoring_details[u'metadata'][u'id'], state,
scoring_details[u'metadata'][u'created_at'],
scoring_details['entity']['deployment']['id'])
if self.id:
if self.id == scoring_details['entity']['deployment']['id']:
values.append(score_values)
else:
values.append(score_values)
return DataFrame(values, columns=columns)
@BaseDeployment._project_to_space_to_project
def _deploy(self,
pipeline_model: 'Pipeline',
deployment_name: str,
meta_props: Dict,
serving_name=None, # Not used, but added to match unified parameters for _deploy
result_client=None,
hardware_spec=None) -> Dict: # Not used, but added to match unified parameters for _deploy
"""Deploy model into WML.
:param pipeline_model: model of the pipeline to deploy
:type pipeline_model: Pipeline or str
:param deployment_name: name of the deployment
:type deployment_name: str
:param meta_props: model meta properties
:type meta_props: dict
:param result_client: tuple with Result DataConnection object and initialized COS client
:type result_client: tuple[DataConnection, resource]
"""
deployment_details = {}
asset_uid = self._publish_model(pipeline_model=pipeline_model,
meta_props=meta_props)
self.asset_id = asset_uid
deployment_props = {
self._target_workspace.wml_client.deployments.ConfigurationMetaNames.NAME: deployment_name,
self._target_workspace.wml_client.deployments.ConfigurationMetaNames.BATCH: {}
}
deployment_props[self._target_workspace.wml_client.deployments.ConfigurationMetaNames.ASSET] = {
"id": asset_uid
}
deployment_props[
self._target_workspace.wml_client.deployments.ConfigurationMetaNames.HYBRID_PIPELINE_HARDWARE_SPECS] = [
{
'node_runtime_id': 'auto_ai.kb',
'hardware_spec': {
'name': 'M'
}
}
]
if self._obm:
deployment_props[
self._target_workspace.wml_client.deployments.ConfigurationMetaNames.HYBRID_PIPELINE_HARDWARE_SPECS
].insert(0, {
"node_runtime_id": "auto_ai.obm",
"hardware_spec": {
"name": "M-Spark",
"num_nodes": 2
}
})
print("Deploying model {} using V4 client.".format(asset_uid))
try:
deployment_details = self._target_workspace.wml_client.deployments.create(
artifact_uid=asset_uid,
meta_props=deployment_props)
self.deployment_id = self._target_workspace.wml_client.deployments.get_uid(deployment_details)
except WMLClientError as e:
raise e
return deployment_details