# -----------------------------------------------------------------------------------------
# (C) Copyright IBM Corp. 2023-2024.
# https://opensource.org/licenses/BSD-3-Clause
# -----------------------------------------------------------------------------------------
from __future__ import annotations
from typing import TYPE_CHECKING, Callable, Any, Literal, TypeAlias, cast
from dataclasses import dataclass
from requests import Response
from ibm_watsonx_ai._wrappers import requests
from ibm_watsonx_ai.experiments import Experiments
from ibm_watsonx_ai.functions import Functions
from ibm_watsonx_ai.ai_services import AIServices
from ibm_watsonx_ai.libs.repo.mlrepositoryclient import MLRepositoryClient
from ibm_watsonx_ai.lifecycle import SpecStates
from ibm_watsonx_ai.messages.messages import Messages
from ibm_watsonx_ai.metanames import (
ExperimentMetaNames,
FunctionMetaNames,
PipelineMetanames,
SpacesMetaNames,
ModelMetaNames,
MemberMetaNames,
AIServiceMetaNames,
)
from ibm_watsonx_ai.models import Models
from ibm_watsonx_ai.pipelines import Pipelines
from ibm_watsonx_ai.utils import inherited_docstring, get_url, get_user_agent_header
from ibm_watsonx_ai.utils.utils import _get_id_from_deprecated_uid
from ibm_watsonx_ai.wml_client_error import WMLClientError
from ibm_watsonx_ai.wml_resource import WMLResource
if TYPE_CHECKING:
from ibm_watsonx_ai import APIClient
import numpy
import pandas
import pyspark
LabelColumnNamesType: TypeAlias = (
numpy.ndarray[Any, numpy.dtype[numpy.str_]] | list[str]
)
TrainingDataType: TypeAlias = (
pandas.DataFrame | numpy.ndarray | pyspark.sql.Dataframe | list
)
TrainingTargetType: TypeAlias = (
pandas.DataFrame | pandas.Series | numpy.ndarray | list
)
FeatureNamesArrayType: TypeAlias = numpy.ndarray | list
_DEFAULT_LIST_LENGTH = 50
[docs]
class Repository(WMLResource):
"""Store and manage models, functions, spaces, pipelines, and experiments
using the Watson Machine Learning Repository.
To view ModelMetaNames, use:
.. code-block:: python
client.repository.ModelMetaNames.show()
To view ExperimentMetaNames, use:
.. code-block:: python
client.repository.ExperimentMetaNames.show()
To view FunctionMetaNames, use:
.. code-block:: python
client.repository.FunctionMetaNames.show()
To view PipelineMetaNames, use:
.. code-block:: python
client.repository.PipelineMetaNames.show()
To view AIServiceMetaNames, use:
.. code-block:: python
client.repository.AIServiceMetaNames.show()
"""
[docs]
@dataclass
class ModelAssetTypes:
"""Data class with supported model asset types."""
DO_DOCPLEX_20_1: str = "do-docplex_20.1"
DO_OPL_20_1: str = "do-opl_20.1"
DO_CPLEX_20_1: str = "do-cplex_20.1"
DO_CPO_20_1: str = "do-cpo_20.1"
DO_DOCPLEX_22_1: str = "do-docplex_22.1"
DO_OPL_22_1: str = "do-opl_22.1"
DO_CPLEX_22_1: str = "do-cplex_22.1"
DO_CPO_22_1: str = "do-cpo_22.1"
WML_HYBRID_0_1: str = "wml-hybrid_0.1"
PMML_4_2_1: str = "pmml_4.2.1"
PYTORCH_ONNX_1_12: str = "pytorch-onnx_1.12"
PYTORCH_ONNX_RT22_2: str = "pytorch-onnx_rt22.2"
PYTORCH_ONNX_2_0: str = "pytorch-onnx_2.0"
PYTORCH_ONNX_RT23_1: str = "pytorch-onnx_rt23.1"
SCIKIT_LEARN_1_1: str = "scikit-learn_1.1"
MLLIB_3_3: str = "mllib_3.3"
SPSS_MODELER_17_1: str = "spss-modeler_17.1"
SPSS_MODELER_18_1: str = "spss-modeler_18.1"
SPSS_MODELER_18_2: str = "spss-modeler_18.2"
TENSORFLOW_2_9: str = "tensorflow_2.9"
TENSORFLOW_RT22_2: str = "tensorflow_rt22.2"
TENSORFLOW_2_12: str = "tensorflow_2.12"
TENSORFLOW_RT23_1: str = "tensorflow_rt23.1"
XGBOOST_1_6: str = "xgboost_1.6"
PROMPT_TUNE_1_0: str = "prompt_tune_1.0"
CUSTOM_FOUNDATION_MODEL_1_0: str = "custom_foundation_model_1.0"
cloud_platform_spaces = False
icp_platform_spaces = False
def __init__(self, client: APIClient) -> None:
WMLResource.__init__(self, __name__, client)
self._ml_repository_client: MLRepositoryClient | None = None
self.ExperimentMetaNames = ExperimentMetaNames()
self.FunctionMetaNames = FunctionMetaNames()
self.PipelineMetaNames = PipelineMetanames()
self.SpacesMetaNames = SpacesMetaNames()
self.ModelMetaNames = ModelMetaNames()
self.MemberMetaNames = MemberMetaNames()
self.AIServiceMetaNames = AIServiceMetaNames()
self._refresh_repo_client() # regular token is initialized in service_instance
def _refresh_repo_client(self) -> None:
# If apiKey is passed in credentials then refresh repoclient with IAM token else MLToken
self._ml_repository_client = MLRepositoryClient(self._credentials.url)
if self._client.proceed is True:
if self._client.service_instance._is_iam() is not None:
self._ml_repository_client.authorize_with_token(self._client.token)
self._ml_repository_client._add_header(
"User-Agent", get_user_agent_header()
)
if self._client.default_project_id is not None:
self._ml_repository_client._add_header(
"X-Watson-Project-ID", self._client.default_project_id
)
else:
self._ml_repository_client.authorize_with_iamtoken(
self._client.token,
self._credentials.instance_id,
True,
)
self._ml_repository_client._add_header(
"User-Agent", get_user_agent_header()
)
if self._client.default_project_id is not None:
self._ml_repository_client._add_header(
"X-Watson-Project-ID", self._client.default_project_id
)
else:
if self._client._is_IAM():
self._ml_repository_client.authorize_with_iamtoken(
self._client.token,
self._credentials.instance_id,
True,
)
self._ml_repository_client._add_header(
"User-Agent", get_user_agent_header()
)
if self._client.default_project_id is not None:
self._ml_repository_client._add_header(
"X-Watson-Project-ID", self._client.default_project_id
)
else:
if self._client.ICP_PLATFORM_SPACES:
self._repotoken = self._client.service_instance._get_token()
self._ml_repository_token = self._repotoken.replace("Bearer", "")
self._ml_repository_client.authorize_with_token(
self._ml_repository_token
)
else:
self._ml_repository_client.authorize(
self._credentials.username,
self._credentials.password,
)
self._ml_repository_client._add_header(
"User-Agent", get_user_agent_header()
)
if self._client.default_project_id is not None:
self._ml_repository_client._add_header(
"X-Watson-Project-ID", self._client.default_project_id
)
[docs]
@inherited_docstring(
Experiments.store, {"experiments.get_href": "repository.get_experiment_href"}
)
def store_experiment(self, meta_props: dict) -> dict:
return self._client.experiments.store(meta_props)
[docs]
@inherited_docstring(Pipelines.store)
def store_pipeline(self, meta_props: dict) -> dict:
return self._client.pipelines.store(meta_props)
[docs]
@inherited_docstring(Models.store, {"store()": "store_model()"})
def store_model(
self,
model: str | object | None = None,
meta_props: dict | None = None,
training_data: TrainingDataType | None = None,
training_target: TrainingTargetType | None = None,
pipeline: object | None = None,
feature_names: FeatureNamesArrayType | None = None,
label_column_names: LabelColumnNamesType | None = None,
subtrainingId: str | None = None,
round_number: int | None = None,
experiment_metadata: dict | None = None,
training_id: str | None = None,
) -> dict:
return self._client._models.store(
model=model,
meta_props=meta_props,
training_data=training_data,
training_target=training_target,
pipeline=pipeline,
feature_names=feature_names,
label_column_names=label_column_names,
subtrainingId=subtrainingId,
round_number=round_number,
experiment_metadata=experiment_metadata,
training_id=training_id,
)
def clone(
self,
artifact_id: str,
space_id: str | None = None,
action: str = "copy",
rev_id: str | None = None,
) -> dict:
raise WMLClientError(Messages.get_message(message_id="cloning_not_supported"))
[docs]
@inherited_docstring(Functions.store)
def store_function(
self, function: str | Callable, meta_props: str | dict[str, Any]
) -> dict:
return self._client._functions.store(function, meta_props)
[docs]
@inherited_docstring(Models.create_revision)
def create_model_revision(self, model_id: str | None = None, **kwargs: Any) -> dict:
model_id = _get_id_from_deprecated_uid(kwargs, model_id, "model")
return self._client._models.create_revision(model_id=model_id)
[docs]
@inherited_docstring(Pipelines.create_revision)
def create_pipeline_revision(
self, pipeline_id: str | None = None, **kwargs: Any
) -> dict:
pipeline_id = _get_id_from_deprecated_uid(kwargs, pipeline_id, "pipeline")
return self._client.pipelines.create_revision(pipeline_id=pipeline_id)
[docs]
@inherited_docstring(Functions.create_revision)
def create_function_revision(
self, function_id: str | None = None, **kwargs: Any
) -> dict:
return self._client._functions.create_revision(
function_id=function_id, **kwargs
)
[docs]
@inherited_docstring(Experiments.create_revision)
def create_experiment_revision(self, experiment_id: str) -> dict:
return self._client.experiments.create_revision(experiment_id=experiment_id)
[docs]
@inherited_docstring(Models.update, {"meta_props": "updated_meta_props"})
def update_model(
self,
model_id: str | None = None,
updated_meta_props: dict | None = None,
update_model: Any | None = None,
**kwargs: Any,
) -> dict:
model_id = _get_id_from_deprecated_uid(kwargs, model_id, "model")
return self._client._models.update(model_id, updated_meta_props, update_model)
[docs]
@inherited_docstring(Experiments.update)
def update_experiment(
self,
experiment_id: str | None = None,
changes: dict | None = None,
**kwargs: Any,
) -> dict:
return self._client.experiments.update(experiment_id, changes, **kwargs)
[docs]
@inherited_docstring(Functions.update)
def update_function(
self,
function_id: str | None,
changes: dict | None = None,
update_function: str | Callable | None = None,
**kwargs: Any,
) -> dict:
return self._client._functions.update(
function_id, changes, update_function, **kwargs
)
[docs]
@inherited_docstring(Pipelines.update)
def update_pipeline(
self,
pipeline_id: str | None = None,
changes: dict | None = None,
rev_id: str | None = None,
**kwargs: Any,
) -> dict:
pipeline_id = _get_id_from_deprecated_uid(kwargs, pipeline_id, "pipeline")
return self._client.pipelines.update(pipeline_id, changes, rev_id, **kwargs)
[docs]
def load(self, artifact_id: str | None = None, **kwargs: Any) -> object:
"""Load a model from the repository to object in a local environment.
.. note::
The use of the load() method is restricted and not permitted for AutoAI models.
:param artifact_id: ID of the stored model
:type artifact_id: str
:return: trained model
:rtype: object
**Example**
.. code-block:: python
model = client.repository.load(model_id)
"""
artifact_id = _get_id_from_deprecated_uid(kwargs, artifact_id, "artifact")
return self._client._models.load(artifact_id)
[docs]
def download(
self,
artifact_id: str | None = None,
filename: str = "downloaded_artifact.tar.gz",
rev_id: str | None = None,
format: str | None = None,
**kwargs: Any,
) -> str:
"""Download the configuration file for an artifact with the specified ID.
:param artifact_id: unique ID of the model or function
:type artifact_id: str
:param filename: name of the file to which the artifact content will be downloaded
:type filename: str, optional
:param rev_id: revision ID
:type rev_id: str, optional
:param format: format of the content, applicable for models
:type format: str, optional
:return: path to the downloaded artifact content
:rtype: str
**Examples**
.. code-block:: python
client.repository.download(model_id, 'my_model.tar.gz')
client.repository.download(model_id, 'my_model.json') # if original model was saved as json, works only for xgboost 1.3
"""
artifact_id = _get_id_from_deprecated_uid(kwargs, artifact_id, "artifact")
rev_id = _get_id_from_deprecated_uid(kwargs, rev_id, "rev", can_be_none=True)
self._validate_type(artifact_id, "artifact_id", str, True)
self._validate_type(filename, "filename", str, True)
res = self._check_artifact_type(str(artifact_id))
if res["model"] is True:
return self._client._models.download(artifact_id, filename, rev_id, format)
elif res["function"]:
return self._client._functions.download(artifact_id, filename, rev_id)
elif res["ai_service"]:
return self._client._ai_services.download(artifact_id, filename, rev_id)
else:
raise WMLClientError(
"Unexpected type of artifact to download or Artifact with artifact_id: '{}' does not exist.".format(
artifact_id
)
)
[docs]
def delete(
self, artifact_id: str | None = None, **kwargs: Any
) -> Literal["SUCCESS"]:
"""Delete a model, experiment, pipeline, function, or AI service from the repository.
:param artifact_id: unique ID of the stored model, experiment, function, pipeline, or AI service
:type artifact_id: str
:return: status "SUCCESS" if deletion is successful
:rtype: Literal["SUCCESS"]
**Example:**
.. code-block:: python
client.repository.delete(artifact_id)
"""
artifact_id = _get_id_from_deprecated_uid(kwargs, artifact_id, "artifact")
Repository._validate_type(artifact_id, "artifact_id", str, True)
if self._if_deployment_exist_for_asset(artifact_id):
raise WMLClientError(
"Cannot delete artifact that has existing deployments. Please delete all associated deployments and try again"
)
params = self._client._params()
params.update({"purge_on_delete": "true"})
response = requests.delete(
self._client.service_instance._href_definitions.get_asset_href(artifact_id),
params=params,
headers=self._client._get_headers(),
)
if response.status_code == 200 or response.status_code == 204:
if response.status_code == 200:
response = self._handle_response(200, "delete assets", response)
return response
else:
response = self._handle_response(204, "delete assets", response)
return response
else:
if response.status_code == 404:
raise WMLClientError(
"Artifact with artifact_id: '{}' does not exist.".format(
artifact_id
)
)
else:
raise WMLClientError(
"Deletion error for the given id : ", response.text
)
[docs]
def get_details(
self,
artifact_id: str | None = None,
spec_state: SpecStates | None = None,
artifact_name: str | None = None,
**kwargs: Any,
) -> dict:
"""Get metadata of stored artifacts. If `artifact_id` and `artifact_name` are not specified,
the metadata of all models, experiments, functions, pipelines, and ai services is returned.
If only `artifact_name` is specified, metadata of all artifacts with the name is returned.
:param artifact_id: unique ID of the stored model, experiment, function, or pipeline
:type artifact_id: str, optional
:param spec_state: software specification state, can be used only when `artifact_id` is None
:type spec_state: SpecStates, optional
:param artifact_name: name of the stored model, experiment, function, pipeline, or ai service
can be used only when `artifact_id` is None
:type artifact_name: str, optional
:return: metadata of the stored artifact(s)
:rtype:
- dict (if artifact_id is not None)
- {"models": dict, "experiments": dict, "pipeline": dict, "functions": dict, "ai_service": dict} (if artifact_id is None)
**Examples**
.. code-block:: python
details = client.repository.get_details(artifact_id)
details = client.repository.get_details(artifact_name='Sample_model')
details = client.repository.get_details()
Example of getting all repository assets with deprecated software specifications:
.. code-block:: python
from ibm_watsonx_ai.lifecycle import SpecStates
details = client.repository.get_details(spec_state=SpecStates.DEPRECATED)
"""
artifact_id = _get_id_from_deprecated_uid(
kwargs, artifact_id, "artifact", can_be_none=True
)
Repository._validate_type(artifact_id, "artifact_id", str, False)
Repository._validate_type(artifact_name, "artifact_name", str, False)
if artifact_id is None:
model_details = self._client._models.get_details(
spec_state=spec_state, model_name=artifact_name
)
experiment_details = (
self.get_experiment_details(experiment_name=artifact_name)
if not spec_state
else {"resources": []}
)
pipeline_details = (
self.get_pipeline_details(pipeline_name=artifact_name)
if not spec_state
else {"resources": []}
)
function_details = self._client._functions.get_details(
spec_state=spec_state, function_name=artifact_name
)
try:
ai_service_details = self._client._ai_services.get_details(
spec_state=spec_state, ai_service_name=artifact_name
)
except WMLClientError:
ai_service_details = None
details = {
"models": model_details,
"experiments": experiment_details,
"pipeline": pipeline_details,
"functions": function_details,
}
if ai_service_details is not None:
details["ai_service"] = ai_service_details
else:
artifact_type = self._check_artifact_type(str(artifact_id))
if artifact_type["model"] is True:
details = self.get_model_details(artifact_id)
elif artifact_type["experiment"] is True:
details = self.get_experiment_details(artifact_id)
elif artifact_type["pipeline"] is True:
details = self.get_pipeline_details(artifact_id)
elif artifact_type["function"] is True:
details = self.get_function_details(artifact_id)
elif artifact_type["ai_service"] is True:
details = self.get_ai_service_details(artifact_id)
else:
raise WMLClientError(
"Getting artifact details failed. Artifact id: '{}' not found.".format(
artifact_id
)
)
return details
[docs]
def get_id_by_name(self, artifact_name: str) -> str:
"""Get the ID of a stored artifact by name.
:param artifact_name: name of the stored artifact
:type artifact_name: str
:return: ID of the stored artifact if exactly one with the 'artifact_name' exists. Otherwise, raise an error.
:rtype: str
**Example:**
.. code-block:: python
artifact_id = client.repository.get_id_by_name(artifact_name)
"""
details = self.get_details(artifact_name=artifact_name)
# Check whether 0, 1, or more artifacts were found in 'details' results
details_by_name = {}
for artifact_type, artifact_details in details.items():
if len(artifact_details["resources"]) == 1 and not details_by_name:
# Found first artifact
details_by_name = artifact_details["resources"][0]
elif len(artifact_details["resources"]) > 0 and details_by_name:
# Found another artifact of different type
raise WMLClientError(
Messages.get_message(
artifact_name,
message_id="multiple_artifacts_found_by_name",
)
)
elif len(artifact_details["resources"]) > 1:
# Found more than 1 artifact of a specific type
raise WMLClientError(
Messages.get_message(
artifact_name,
message_id="multiple_artifacts_found_by_name",
)
)
if not details_by_name:
raise WMLClientError(
f"Artifact with artifact_name: '{artifact_name}' does not exist."
)
return details_by_name["metadata"]["id"]
[docs]
@inherited_docstring(Models.get_details)
def get_model_details(
self,
model_id: str | None = None,
limit: int | None = None,
asynchronous: bool = False,
get_all: bool = False,
spec_state: SpecStates | None = None,
model_name: str | None = None,
**kwargs: Any,
) -> dict:
model_id = _get_id_from_deprecated_uid(
kwargs, model_id, "model", can_be_none=True
)
return self._client._models.get_details(
model_id=model_id,
limit=limit,
asynchronous=asynchronous,
get_all=get_all,
spec_state=spec_state,
model_name=model_name,
)
[docs]
@inherited_docstring(Models.get_revision_details)
def get_model_revision_details(
self, model_id: str | None = None, rev_id: str | None = None, **kwargs: Any
) -> dict:
model_id = _get_id_from_deprecated_uid(kwargs, model_id, "model")
rev_id = _get_id_from_deprecated_uid(kwargs, rev_id, "rev")
return self._client._models.get_revision_details(model_id, rev_id)
[docs]
@inherited_docstring(Experiments.get_details)
def get_experiment_details(
self,
experiment_id: str | None = None,
limit: int | None = None,
asynchronous: bool = False,
get_all: bool = False,
experiment_name: str | None = None,
**kwargs: Any,
) -> dict:
return self._client.experiments.get_details(
experiment_id=experiment_id,
limit=limit,
asynchronous=asynchronous,
get_all=get_all,
experiment_name=experiment_name,
**kwargs,
)
[docs]
@inherited_docstring(Experiments.get_revision_details)
def get_experiment_revision_details(
self, experiment_id: str, rev_id: str, **kwargs: Any
) -> dict:
return self._client.experiments.get_revision_details(
experiment_id, rev_id, **kwargs
)
[docs]
@inherited_docstring(Functions.get_details)
def get_function_details(
self,
function_id: str | None = None,
limit: int | None = None,
asynchronous: bool = False,
get_all: bool = False,
spec_state: SpecStates | None = None,
function_name: str | None = None,
**kwargs: Any,
) -> dict:
return self._client._functions.get_details(
function_id=function_id,
limit=limit,
asynchronous=asynchronous,
get_all=get_all,
spec_state=spec_state,
function_name=function_name,
**kwargs,
)
[docs]
@inherited_docstring(Functions.get_revision_details)
def get_function_revision_details(
self, function_id: str, rev_id: str, **kwargs: Any
) -> dict:
return self._client._functions.get_revision_details(
function_id, rev_id, **kwargs
)
[docs]
@inherited_docstring(Pipelines.get_details)
def get_pipeline_details(
self,
pipeline_id: str | None = None,
limit: int | None = None,
asynchronous: bool = False,
get_all: bool = False,
pipeline_name: str | None = None,
**kwargs: Any,
) -> dict:
pipeline_id = _get_id_from_deprecated_uid(
kwargs, pipeline_id, "pipeline", can_be_none=True
)
Repository._validate_type(pipeline_id, "pipeline_id", str, False)
Repository._validate_type(limit, "limit", int, False)
Repository._validate_type(asynchronous, "asynchronous", bool, False)
Repository._validate_type(get_all, "get_all", bool, False)
return self._client.pipelines.get_details(
pipeline_id=pipeline_id,
limit=limit,
asynchronous=asynchronous,
get_all=get_all,
pipeline_name=pipeline_name,
**kwargs,
)
[docs]
@inherited_docstring(Pipelines.get_revision_details)
def get_pipeline_revision_details(
self, pipeline_id: str | None = None, rev_id: str | None = None, **kwargs: Any
) -> dict:
pipeline_id = _get_id_from_deprecated_uid(kwargs, pipeline_id, "pipeline")
return self._client.pipelines.get_revision_details(
pipeline_id, rev_id, **kwargs
)
[docs]
@staticmethod
@inherited_docstring(Models.get_href)
def get_model_href(model_details: dict) -> str:
return Models.get_href(model_details)
[docs]
@staticmethod
@inherited_docstring(Models.get_id)
def get_model_id(model_details: dict) -> str:
return Models.get_id(model_details)
[docs]
@staticmethod
@inherited_docstring(
Experiments.get_id,
{"experiments.get_details": "repository.get_experiment_details"},
)
def get_experiment_id(experiment_details: dict) -> str:
return Experiments.get_id(experiment_details)
[docs]
@staticmethod
@inherited_docstring(
Experiments.get_href,
{"experiments.get_details": "repository.get_experiment_details"},
)
def get_experiment_href(experiment_details: dict) -> str:
return Experiments.get_href(experiment_details)
[docs]
@staticmethod
@inherited_docstring(Functions.get_id)
def get_function_id(function_details: dict) -> str:
return Functions.get_id(function_details)
[docs]
@staticmethod
@inherited_docstring(Functions.get_href)
def get_function_href(function_details: dict) -> str:
return Functions.get_href(function_details)
[docs]
@staticmethod
@inherited_docstring(
Pipelines.get_href, {"pipelines.get_details": "repository.get_pipeline_details"}
)
def get_pipeline_href(pipeline_details: dict) -> str:
return Pipelines.get_href(pipeline_details)
[docs]
@staticmethod
@inherited_docstring(Pipelines.get_id)
def get_pipeline_id(pipeline_details: dict) -> str:
return Pipelines.get_id(pipeline_details)
[docs]
def list(self, framework_filter: str | None = None) -> pandas.DataFrame:
"""Get and list stored models, pipelines, functions, experiments, and AI services in a table/DataFrame format.
If limit is set to None, only the first 50 records are shown.
:param framework_filter: get only the frameworks with the desired names
:type framework_filter: str, optional
:return: DataFrame with listed names and IDs of stored models
:rtype: pandas.DataFrame
**Example:**
.. code-block:: python
client.repository.list()
client.repository.list(framework_filter='prompt_tune')
"""
params = self._client._params()
params.update({"limit": 1000})
# params = {u'limit': 1000} # TODO - should be unlimited, if results not sorted
isIcp = self._client.ICP_PLATFORM_SPACES
endpoints = {
"model": self._client.service_instance._href_definitions.get_published_models_href(),
"experiment": self._client.service_instance._href_definitions.get_experiments_href(),
"pipeline": self._client.service_instance._href_definitions.get_pipelines_href(),
"function": self._client.service_instance._href_definitions.get_functions_href(),
"ai_service": self._client.service_instance._href_definitions.get_ai_services_href(),
}
artifact_get = {}
for artifact in endpoints:
params = self._client._params()
artifact_get[artifact] = get_url(
endpoints[artifact], self._client._get_headers(), params, isIcp
)
resources: dict[str, list] = {artifact: [] for artifact in endpoints}
for artifact in endpoints:
try:
response = artifact_get[artifact]
response_text = self._handle_response(
200, "getting all {}s".format(artifact), response
)
resources[artifact] = response_text["resources"]
except Exception as e:
self._logger.error(e)
sw_spec_info = {
s["id"]: s
for s in self._client.software_specifications.get_details(state_info=True)[
"resources"
]
}
def get_spec_info(spec_id: str, prop: str) -> str:
if spec_id and spec_id in sw_spec_info:
return sw_spec_info[spec_id].get(prop, "")
else:
return ""
values = []
for t in endpoints.keys():
values += [
(
m["metadata"]["id"],
m["metadata"]["name"],
m["metadata"]["created_at"],
m["entity"]["type"] if t == "model" else "-",
(
t
if t != "function" or t != "ai_service"
else m["entity"]["type"] + " function"
),
get_spec_info(
m["entity"].get("software_spec", {}).get("id"), "state"
),
get_spec_info(
m["entity"].get("software_spec", {}).get("id"),
"replacement",
),
)
for m in resources[t]
]
columns = [
"ID",
"NAME",
"CREATED",
"FRAMEWORK",
"TYPE",
"SPEC_STATE",
"SPEC_REPLACEMENT",
]
from pandas import DataFrame
table = DataFrame(data=values, columns=columns)
table = table.sort_values(by=["CREATED"], ascending=False).reset_index(
drop=True
)
if framework_filter:
table = table[table["FRAMEWORK"].str.contains(framework_filter)]
if len(values) > _DEFAULT_LIST_LENGTH:
print(
"Note: Only first {} records were displayed. To display more use more specific list functions.".format(
_DEFAULT_LIST_LENGTH
)
)
return table[:_DEFAULT_LIST_LENGTH]
[docs]
@inherited_docstring(Models.list)
def list_models(
self,
limit: int | None = None,
asynchronous: bool = False,
get_all: bool = False,
) -> pandas.DataFrame:
return self._client._models.list(
limit=limit, asynchronous=asynchronous, get_all=get_all
)
[docs]
@inherited_docstring(Experiments.list)
def list_experiments(self, limit: int | None = None) -> pandas.DataFrame:
return self._client.experiments.list(limit=limit)
[docs]
@inherited_docstring(Functions.list)
def list_functions(self, limit: int | None = None) -> pandas.DataFrame:
return self._client._functions.list(limit=limit)
[docs]
@inherited_docstring(Pipelines.list)
def list_pipelines(self, limit: int | None = None) -> pandas.DataFrame:
return self._client.pipelines.list(limit=limit)
def _check_artifact_type(self, artifact_id: str) -> dict[str, bool]:
Repository._validate_type(artifact_id, "artifact_id", str, True)
def _artifact_exists(response: Response | None) -> bool:
return (
(response is not None)
and ("status_code" in dir(response))
and (response.status_code == 200)
)
isIcp = self._client.ICP_PLATFORM_SPACES
endpoints = {
"model": self._client.service_instance._href_definitions.get_model_last_version_href(
artifact_id
),
"pipeline": self._client.service_instance._href_definitions.get_pipeline_href(
artifact_id
),
"experiment": self._client.service_instance._href_definitions.get_experiment_href(
artifact_id
),
"function": self._client.service_instance._href_definitions.get_function_href(
artifact_id
),
"ai_service": self._client.service_instance._href_definitions.get_ai_service_href(
artifact_id
),
}
artifact_get = {}
for artifact in endpoints:
params = self._client._params()
artifact_get[artifact] = get_url(
endpoints[artifact], self._client._get_headers(), params, isIcp
)
response_get: dict[str, Response | None] = {
artifact: None for artifact in endpoints
}
for artifact in endpoints:
try:
response_get[artifact] = artifact_get[artifact]
artifact_res = cast(Response, response_get[artifact])
self._logger.debug(
"Response({})[{}]: {}".format(
endpoints[artifact],
artifact_res.status_code,
artifact_res.text,
)
)
except Exception as e:
self._logger.debug("Error during checking artifact type: " + str(e))
artifact_type = {
artifact: _artifact_exists(response_get[artifact])
for artifact in response_get
}
return artifact_type
[docs]
def create_revision(self, artifact_id: str | None = None, **kwargs: Any) -> dict:
"""Create a revision for passed `artifact_id`.
:param artifact_id: unique ID of a stored model, experiment, function, or pipelines
:type artifact_id: str
:return: artifact new revision metadata
:rtype: dict
**Example:**
.. code-block:: python
details = client.repository.create_revision(artifact_id)
"""
artifact_id = _get_id_from_deprecated_uid(kwargs, artifact_id, "artifact")
Repository._validate_type(artifact_id, "artifact_id", str, True)
artifact_type = self._check_artifact_type(str(artifact_id))
if artifact_type["experiment"] is True:
return self._client.experiments.create_revision(artifact_id)
elif artifact_type["pipeline"] is True:
return self._client.pipelines.create_revision(artifact_id)
elif artifact_type["ai_service"] is True:
return self._client._ai_services.create_revision(artifact_id)
else:
raise WMLClientError(
"Getting artifact details failed. Artifact id: '{}' not found.".format(
artifact_id
)
)
def _get_revision_details(
self, artifact_id: str | None = None, **kwargs: Any
) -> dict:
"""Get metadata of the stored artifacts revisions.
:param artifact_id: unique ID of a stored model, experiment, function, pipelines
:type artifact_id: str
:return: stored artifacts metadata
:rtype: dict
**Example:**
.. code-block:: python
details = client.repository.get_revision_details(artifact_id)
"""
artifact_id = _get_id_from_deprecated_uid(kwargs, artifact_id, "artifact")
Repository._validate_type(artifact_id, "artifact_id", str, True)
artifact_type = self._check_artifact_type(str(artifact_id))
if artifact_type["experiment"] is True:
details = self._client.experiments.get_revision_details(artifact_id)
elif artifact_type["pipeline"] is True:
details = self._client.pipelines.get_revision_details(artifact_id)
else:
raise WMLClientError(
"Getting artifact details failed. Artifact id: '{}' not found.".format(
artifact_id
)
)
return details
[docs]
@inherited_docstring(Models.list_revisions)
def list_models_revisions(
self, model_id: str | None = None, limit: int | None = None, **kwargs: Any
) -> pandas.DataFrame:
model_id = _get_id_from_deprecated_uid(kwargs, model_id, "model")
return self._client._models.list_revisions(model_id, limit=limit, **kwargs)
[docs]
@inherited_docstring(Pipelines.list_revisions)
def list_pipelines_revisions(
self, pipeline_id: str | None = None, limit: int | None = None, **kwargs: Any
) -> pandas.DataFrame:
pipeline_id = _get_id_from_deprecated_uid(kwargs, pipeline_id, "pipeline")
return self._client.pipelines.list_revisions(pipeline_id, limit=limit)
[docs]
@inherited_docstring(Functions.list_revisions)
def list_functions_revisions(
self, function_id: str | None = None, limit: int | None = None, **kwargs: Any
) -> pandas.DataFrame:
return self._client._functions.list_revisions(
function_id, limit=limit, **kwargs
)
[docs]
@inherited_docstring(Experiments.list_revisions)
def list_experiments_revisions(
self, experiment_id: str | None = None, limit: int | None = None, **kwargs: Any
) -> pandas.DataFrame:
return self._client.experiments.list_revisions(
experiment_id, limit=limit, **kwargs
)
[docs]
@inherited_docstring(AIServices.store)
def store_ai_service(
self, ai_service: str | Callable, meta_props: dict[str, Any]
) -> dict:
return self._client._ai_services.store(ai_service, meta_props)
[docs]
@inherited_docstring(AIServices.get_details)
def get_ai_service_details(
self,
ai_service_id: str | None = None,
limit: int | None = None,
asynchronous: bool = False,
get_all: bool = False,
spec_state: SpecStates | None = None,
ai_service_name: str | None = None,
**kwargs: Any,
) -> dict:
return self._client._ai_services.get_details(
ai_service_id=ai_service_id,
limit=limit,
asynchronous=asynchronous,
get_all=get_all,
spec_state=spec_state,
ai_service_name=ai_service_name,
)
[docs]
@inherited_docstring(AIServices.update)
def update_ai_service(
self,
ai_service_id: str,
changes: dict,
update_ai_service: str | Callable | None = None,
) -> dict:
return self._client._ai_services.update(
ai_service_id, changes, update_ai_service
)
[docs]
@staticmethod
@inherited_docstring(AIServices.get_id)
def get_ai_service_id(ai_service_details: dict) -> str:
return AIServices.get_id(ai_service_details)
[docs]
@inherited_docstring(AIServices.list)
def list_ai_services(self, limit: int | None = None) -> pandas.DataFrame:
return self._client._ai_services.list(limit=limit)
[docs]
@inherited_docstring(AIServices.create_revision)
def create_ai_service_revision(self, ai_service_id: str, **kwargs: Any) -> dict:
return self._client._ai_services.create_revision(
ai_service_id=ai_service_id, **kwargs
)
[docs]
@inherited_docstring(AIServices.get_revision_details)
def get_ai_service_revision_details(
self, ai_service_id: str, rev_id: str, **kwargs: Any
) -> dict:
return self._client._ai_services.get_revision_details(
ai_service_id, rev_id, **kwargs
)
[docs]
@inherited_docstring(AIServices.list_revisions)
def list_ai_service_revisions(
self, ai_service_id: str, limit: int | None = None
) -> pandas.DataFrame:
return self._client._ai_services.list_revisions(ai_service_id, limit=limit)